RabbitMQを使って、メッセージキューの理解を深めていく。
RabbitMQとは
オープンソースのメッセージ指向ミドルウェアで、メッセージキューを利用してアプリケーション間の非同期通信を実現するためのソフトウェア。 AMQAと呼ばれるプロトコルを使用する。
Kafkaとの違い
やりたいこと
RabbitMQを介して、クライアントAからメッセージを送信して、クライアントBにメッセージを渡す。
RabbitMQのセットアップ
RabbitMQはコンテナで起動する。 RabbitMQのコンテナは、管理プラグインが有効となっているものとなっていないものでコンテナが異なる。 今回は、管理画面でキューを確認したいので、管理プラグインが有効になっているものを使用する。
https://hub.docker.com/_/rabbitmq
version: "3" services: rabbitmq: image: rabbitmq:3-management environment: - 8080:15672 - 5672:5672
http://localhost:8080
にアクセスし、UsernameとPasswordにguestを入力し、ログインすると管理画面が表示できる。
もし、画面が表示できない場合は少し待って再度アクセスすると画面が表示できる。
少し立ち上がるのが遅いみたい。
ユーザ名やパスワードを変更したい場合は、RABBITMQ_DEFAULT_USER
とRABBITMQ_DEFAULT_PASS
で変更できる。
クライアントの準備
RabbitMQにメッセージを送信するために、Pika
をインストールする。
pip install pika
RabbitMQのHallo World
PythonでのRabbitMQとの連携のチュートリアルが公式ページにあるので、実施してみる。 まずは送信側をコードを作成し、実行してみたところメッセージも出力されたので、送信できている。
import pika # RabbitMQに接続 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() # キューを作成 channel.queue_declare(queue="hello") message = "Hello World" channel.basic_publish(exchange="", routing_key="hello", body=message) print(" [x] Send " + message) # コネクションのクローズ connection.close()
RabbitMQの管理画面を確認すると、Queueにメッセージ登録されていることがわかる。 (8回実行したので、グラフ上は8メッセージ溜まっている状態)
次に受信側を実行してみる。
import sys import os import pika def callback(channel, method, properties, message): print(" [x] Received " + str(message)) print(" method: " + str(method)) print(" properties: " + str(properties)) def main(): # RabbitMQに接続 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() print(" [*] Waiting for message. To exit press CTRL+C") channel.basic_consume(queue="hello", auto_ack=True, on_message_callback=callback) channel.start_consuming() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Interrupted") try: sys.exit(0) except SystemExit: os._exit(0)
蓄積していたメッセージを全て受信し、出力できた。 RabbitMQの管理画面からもQueueにあったメッセージが全て配信されていることがわかる。
[*] Waiting for message. To exit press CTRL+C [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=1', 'exchange=', 'redelivered=True', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=2', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=3', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=4', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=5', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=6', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=7', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties> [x] Received b'Hello World' method: <Basic.Deliver(['consumer_tag=ctag1.be2617d6173e46de8dd43a444597d439', 'delivery_tag=8', 'exchange=', 'redelivered=False', 'routing_key=hello'])> properties: <BasicProperties>
最後に
RabbitMQのチュートリアルのHelloWorldをやってみた。 管理画面でキューにメッセージが蓄積されていることも確認できたし、どのように実装するのかもなんとなくイメージが掴めた。 次回は、チュートリアルの続きを進めて、RabbitMQの使い方の理解を深める。