ロバストPythonで出てきたライブラリの1つを使ってみた。
pypubsubとは
PyPubSubは、単一プロセスアプリケーションでObserverパターンを使用するためのインフラストラクチャを提供するライブラリ。 メッセージを送信側/受信側がお互いを意識する必要がないため、モジュール同士を疎結合することが出来る。 また、疎結合にすることで、テストも容易になる。
使ってみた
親トピックと子トピックの関係
親子関係があるトピックの子トピックに配信した場合、親トピックと子トピックにメッセージが配信される。
下記のように、rootTopic
にlistener1関数を登録し、rootTopic.child
にlistener2関数を登録する。
そして、rootTopic.child
にメッセージを送信した場合、 listener1
関数とlistener2
関数が実行される。
pub.subscribe(listener1, "rootTopic") pub.subscribe(listener2, "rootTopic.child") pub.sendMessage("rootTopic.child", arg1 = 123, arg2 = anObj)
subscribeの解除
unsubscribe関数を使用することで、パラメータのトピックからパラメータのリスナーインスタンスを削除できる。 戻り値は削除したリスナーインスタンス。 下記の場合、listner2関数しか実行されない
pub.subscribe(listener1, "topic1") pub.subscribe(listener2, "topic1") response = pub.unsubscribe(listener1, "topic1") pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")
subscribeされているか?
トピックにリスナーインスタンスが登録されているかの判定をisSubscribed関数で確認できる。
pub.subscribe(listener1, "topic1") pub.subscribe(listener2, "topic2") print(pub.isSubscribed(listener1, "topic2")) pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")
メッセージがサブスクライブ出来るか?
リスナーが指定のトピックのメッセージをサブスクラブ出来るかを確認するには、isValid関数を使用する。 サブスクライブ出来る場合、Trueが返ってくる。
pub.subscribe(listener1, "topic1", a=2, b=3) pub.isValid(listener1, "topic1", curriedArgNames=None)) # False pub.isValid(listener1, "topic1", curriedArgNames=["a", "b"]) # True
エラーハンドラー
sendMessageでリスナー側で例外が発生した際に、実行するハンドラーをsetListnerExcHandler関数で設定できる。 ハンドラーを設定していない場合、listener1関数とlistener2関数がサブスクライブされている際に、listener1で例外が発生するとlistener2関数は実行されず、 sendMessageが例外をスローする。
from pubsub import pub class MyHandler(pub.IListenerExcHandler): def __call__(self, listerID, topicObj): print("listerID = " + str(listerID)) print("topicObj = " + str(topicObj)) def listener1(arg1, arg2=None): print("Call listener1") raise Exception("aa") def listener2(arg1, arg2=None): print("Call listener2") raise Exception("aa") def start_pubsub(): #pub.setListenerExcHandler(MyHandler()) print("subscribe") pub.subscribe(listener1, "topic1") pub.subscribe(listener2, "topic1") print("sendMessage!!") try: pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2") except Exception as e: print("Occur Exception") start_pubsub()
ハンドラーを設定している場合、例外が発生するとハンドラーに登録している関数が実行され、トピックのリスナーはすべて実行される。 以下の場合、listener1もlistener2も実行される。 トピック単位でハンドラーの設定は出来ない。 トピック単位で例外発生時の処理は変わるので、ハンドラーの設定はトピック単位で出来たほうがいいと思う。
from pubsub import pub class MyHandler(pub.IListenerExcHandler): def __call__(self, listerID, topicObj): print("listerID = " + str(listerID)) print("topicObj = " + str(topicObj)) def listener1(arg1, arg2=None): print("Call listener1") raise Exception("aa") def listener2(arg1, arg2=None): print("Call listener2") raise Exception("aa") def start_pubsub(): pub.setListenerExcHandler(MyHandler()) print("subscribe") pub.subscribe(listener1, "topic1") pub.subscribe(listener2, "topic1") print("sendMessage!!") try: pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2") except Exception as e: print("Occur Exception") start_pubsub()
pub/subのトレース
何か問題が発生した際に、処理をトレースする必要がある。 その際に、PubSubをトレースするために役立つ仕組みも取り入れられている。
INotificationHandlerを継承したクラスを用意し、addNotificationHandler関数で継承したクラスを登録する。 setNotificationFlags関数でトレースするアクティビティにフラグを設定する。 以下のソースは、subscribeとsendMessageをトレースする例になっている。
from pubsub import pub from pubsub.core import INotificationHandler from pubsub.core.listener import Listener from pubsub.core.topicobj import Topic class MyNotifHandler(INotificationHandler): def notifySubscribe(self, *args, **kwargs): print("notifySubscribe#MyNotifHandler called") def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None): print("notifySend#MyNotifHandler called") def listener1(arg1, arg2=None): print("Call listener1") pub.addNotificationHandler(MyNotifHandler()) pub.setNotificationFlags(subscribe=True, sendMessage=True) print("Call subscribe function") pub.subscribe(listener1, "topic1") print("Call sendMessage") pub.sendMessage("topic1", arg1="sendmessage")
実行すると、notifySendが3回呼ばれている。3回呼ばれている理由は不明。 時間があれば、実装を見てみる。
Call subscribe function notifySubscribe#MyNotifHandler called Call sendMessage notifySend#MyNotifHandler called notifySend#MyNotifHandler called Call listener1 notifySend#MyNotifHandler called
INotificationHandlerがない?
ドキュメントでは、INotificationHandlerは pub
モジュールにあるようだけど、実際はcore
の
方にあり、ドキュメントが間違ってた。
最後に
今回、ロバストPythonにこのpub/subのライブラリが載っていたので、使ってみた。 簡単にpub/subの仕組みが取り入れられて、簡単に実装することが出来て、勉強になった。 ドキュメントの誤り?も見つけられたので、少しだけ貢献できたかな・・・。 プルリクエストしても、最近メンテされてないようなので、時間があったら出してみよう。