ヒトリ歩き

愚痴とかいろいろ書きます

RabbitMQでのメッセージ消失の対策

前々回にRabbitMQを触っていた続き。 少しだけQueueのところに触れておく。

kotapontan.hatenablog.com

www.rabbitmq.com

メッセージの処理中の考慮

タスクの処理中にワーカー側が終了した場合、タスクを別のワーカーに処理してもらいたい。 そのため、RabbitMQではメッセージの応答確認をサポートしている。 ackは、メッセージを受信し、処理後にRabbitMQが削除できることをRabbitMQに伝えるため、コンシューマーから通知する。 ackを送信せずに、コンシューマーが終了した場合、RabbitMQはメッセージが処理されなかったと認識して、メッセージを再度キューに入れる。 配信確認には、タイムアウト(デフォルト30分)となっている。

RabbitMQにメッセージを削除してよい通知をする場合は、コールバック関数の中で、channel.basic_ackを実行する。

試しに、basic_ackをコメントアウトして実行したところ、ワーカーはキューからメッセージを取得したが、キューにはメッセージが残った状態となった。

def callback(channel, method, properties, message):
    print(f"  [x] Received {message.decode()}")
    time.sleep(message.count(b"."))
    print("   [x] Done")
    channel.basic_ack(delivery_tag=method.delivery_tag) # ここをコメントアウト

メッセージの永続化

キューにメッセージがある状態で停止すると、キューのメッセージが喪失する。 そのような状況を防ぐためには、2つのことが必要。 1つ目は、quere_declare関数のパラメータにdurable=Trueを設定する。

channel.queue_declare(queue="hello", durable=True)

2つ目は、channel.basic_publishのpropertiesに永続化するプロパティを指定すること。

    channel.basic_publish(
        exchange="",
        routing_key="hello",
        body=message,
        properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
    )

注意が必要なのは、コンテナの再起動ではキューやメッセージは永続化されているが、コンテナを停止すると キューとメッセージは消えてしまう。

メッセージのディスパッチ

ジーではないワーカーにメッセージを割り振るには、channel.basic_qos(prefetch_count=1) を設定する。

試しにメッセージを受け取って、短いスリープをするワーカーと長いスリープをするワーカーに分けて実行してみた。 `channel.basic_qos(prefetch_count=1) を含めてない場合、均等にメッセージが割り振られているのがわかる。

# sleep 1
  [x] Received Message No.0
   [x] Done
  [x] Received Message No.2
   [x] Done
  [x] Received Message No.4
   [x] Done
  [x] Received Message No.6
   [x] Done
  [x] Received Message No.8
   [x] Done
  [x] Received Message No.10
   [x] Done
  [x] Received Message No.12
   [x] Done

# sleep 5
  [x] Received Message No.3
   [x] Done
  [x] Received Message No.5
   [x] Done
  [x] Received Message No.7
   [x] Done
  [x] Received Message No.9
   [x] Done
  [x] Received Message No.11
   [x] Done
  [x] Received Message No.13
   [x] Done
  [x] Received Message No.15
   [x] Done

次にchannel.basic_qos(prefetch_count=1)を設定したケース。 スリープが短いワーカーの処理が終わり次第、次のメッセージを処理していることがわかる。

# sleep 1
  [x] Received Message No.1
   [x] Done
  [x] Received Message No.3
   [x] Done
  [x] Received Message No.4
   [x] Done
  [x] Received Message No.5
   [x] Done
  [x] Received Message No.6
   [x] Done
  [x] Received Message No.7
   [x] Done
  [x] Received Message No.9
   [x] Done
  [x] Received Message No.10
   [x] Done
  [x] Received Message No.11
   [x] Done
  [x] Received Message No.12
   [x] Done
  [x] Received Message No.13
   [x] Done
  [x] Received Message No.15
   [x] Done
  [x] Received Message No.16
   [x] Done
  [x] Received Message No.17
   [x] Done
  [x] Received Message No.18
   [x] Done
  [x] Received Message No.20
   [x] Done
  [x] Received Message No.21
   [x] Done

# sleep 5
  [x] Received Message No.0
   [x] Done
  [x] Received Message No.2
   [x] Done
  [x] Received Message No.8
   [x] Done
  [x] Received Message No.14
   [x] Done
  [x] Received Message No.19
   [x] Done

最後に

メッセージの処理中にワーカー側が停止した場合やRabbitMQ側が停止した場合などのメッセージの消失に関して考慮されていた。 ただ、コンテナで起動している場合はメッセージが消失していたので、別のやり方があるのか、自分の確認が悪かったのかはどこかで確認してみる。