前々回にRabbitMQを触っていた続き。 少しだけQueueのところに触れておく。
メッセージの処理中の考慮
タスクの処理中にワーカー側が終了した場合、タスクを別のワーカーに処理してもらいたい。 そのため、RabbitMQではメッセージの応答確認をサポートしている。 ackは、メッセージを受信し、処理後にRabbitMQが削除できることをRabbitMQに伝えるため、コンシューマーから通知する。 ackを送信せずに、コンシューマーが終了した場合、RabbitMQはメッセージが処理されなかったと認識して、メッセージを再度キューに入れる。 配信確認には、タイムアウト(デフォルト30分)となっている。
RabbitMQにメッセージを削除してよい通知をする場合は、コールバック関数の中で、channel.basic_ack
を実行する。
試しに、basic_ackをコメントアウトして実行したところ、ワーカーはキューからメッセージを取得したが、キューにはメッセージが残った状態となった。
def callback(channel, method, properties, message): print(f" [x] Received {message.decode()}") time.sleep(message.count(b".")) print(" [x] Done") channel.basic_ack(delivery_tag=method.delivery_tag) # ここをコメントアウト
メッセージの永続化
キューにメッセージがある状態で停止すると、キューのメッセージが喪失する。
そのような状況を防ぐためには、2つのことが必要。
1つ目は、quere_declare
関数のパラメータにdurable=True
を設定する。
channel.queue_declare(queue="hello", durable=True)
2つ目は、channel.basic_publish
のpropertiesに永続化するプロパティを指定すること。
channel.basic_publish( exchange="", routing_key="hello", body=message, properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), )
注意が必要なのは、コンテナの再起動ではキューやメッセージは永続化されているが、コンテナを停止すると キューとメッセージは消えてしまう。
メッセージのディスパッチ
ビジーではないワーカーにメッセージを割り振るには、channel.basic_qos(prefetch_count=1)
を設定する。
試しにメッセージを受け取って、短いスリープをするワーカーと長いスリープをするワーカーに分けて実行してみた。
`channel.basic_qos(prefetch_count=1)
を含めてない場合、均等にメッセージが割り振られているのがわかる。
# sleep 1 [x] Received Message No.0 [x] Done [x] Received Message No.2 [x] Done [x] Received Message No.4 [x] Done [x] Received Message No.6 [x] Done [x] Received Message No.8 [x] Done [x] Received Message No.10 [x] Done [x] Received Message No.12 [x] Done # sleep 5 [x] Received Message No.3 [x] Done [x] Received Message No.5 [x] Done [x] Received Message No.7 [x] Done [x] Received Message No.9 [x] Done [x] Received Message No.11 [x] Done [x] Received Message No.13 [x] Done [x] Received Message No.15 [x] Done
次にchannel.basic_qos(prefetch_count=1)
を設定したケース。
スリープが短いワーカーの処理が終わり次第、次のメッセージを処理していることがわかる。
# sleep 1 [x] Received Message No.1 [x] Done [x] Received Message No.3 [x] Done [x] Received Message No.4 [x] Done [x] Received Message No.5 [x] Done [x] Received Message No.6 [x] Done [x] Received Message No.7 [x] Done [x] Received Message No.9 [x] Done [x] Received Message No.10 [x] Done [x] Received Message No.11 [x] Done [x] Received Message No.12 [x] Done [x] Received Message No.13 [x] Done [x] Received Message No.15 [x] Done [x] Received Message No.16 [x] Done [x] Received Message No.17 [x] Done [x] Received Message No.18 [x] Done [x] Received Message No.20 [x] Done [x] Received Message No.21 [x] Done # sleep 5 [x] Received Message No.0 [x] Done [x] Received Message No.2 [x] Done [x] Received Message No.8 [x] Done [x] Received Message No.14 [x] Done [x] Received Message No.19 [x] Done
最後に
メッセージの処理中にワーカー側が停止した場合やRabbitMQ側が停止した場合などのメッセージの消失に関して考慮されていた。 ただ、コンテナで起動している場合はメッセージが消失していたので、別のやり方があるのか、自分の確認が悪かったのかはどこかで確認してみる。