ヒトリ歩き

愚痴とかいろいろ書きます

Pub/Subパターンを実現するPythonライブラリ:PyPubSub

ロバストPythonで出てきたライブラリの1つを使ってみた。

pypubsubとは

PyPubSubは、単一プロセスアプリケーションでObserverパターンを使用するためのインフラストラクチャを提供するライブラリ。 メッセージを送信側/受信側がお互いを意識する必要がないため、モジュール同士を疎結合することが出来る。 また、疎結合にすることで、テストも容易になる。

使ってみた

親トピックと子トピックの関係

親子関係があるトピックの子トピックに配信した場合、親トピックと子トピックにメッセージが配信される。 下記のように、rootTopicにlistener1関数を登録し、rootTopic.childにlistener2関数を登録する。 そして、rootTopic.childにメッセージを送信した場合、 listener1関数とlistener2関数が実行される。

pub.subscribe(listener1, "rootTopic")
pub.subscribe(listener2, "rootTopic.child")

pub.sendMessage("rootTopic.child", arg1 = 123, arg2 = anObj)

subscribeの解除

unsubscribe関数を使用することで、パラメータのトピックからパラメータのリスナーインスタンスを削除できる。 戻り値は削除したリスナーインスタンス。 下記の場合、listner2関数しか実行されない

pub.subscribe(listener1, "topic1")
pub.subscribe(listener2, "topic1")

response = pub.unsubscribe(listener1, "topic1")
pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")

subscribeされているか?

トピックにリスナーインスタンスが登録されているかの判定をisSubscribed関数で確認できる。

pub.subscribe(listener1, "topic1")
pub.subscribe(listener2, "topic2")

print(pub.isSubscribed(listener1, "topic2"))
pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")

メッセージがサブスクライブ出来るか?

リスナーが指定のトピックのメッセージをサブスクラブ出来るかを確認するには、isValid関数を使用する。 サブスクライブ出来る場合、Trueが返ってくる。

pub.subscribe(listener1, "topic1", a=2, b=3)

pub.isValid(listener1, "topic1", curriedArgNames=None)) # False
pub.isValid(listener1, "topic1", curriedArgNames=["a", "b"]) # True

エラーハンドラー

sendMessageでリスナー側で例外が発生した際に、実行するハンドラーをsetListnerExcHandler関数で設定できる。 ハンドラーを設定していない場合、listener1関数とlistener2関数がサブスクライブされている際に、listener1で例外が発生するとlistener2関数は実行されず、 sendMessageが例外をスローする。

from pubsub import pub

class MyHandler(pub.IListenerExcHandler):
    def __call__(self, listerID, topicObj):
        print("listerID = " + str(listerID))
        print("topicObj = " + str(topicObj))

def listener1(arg1, arg2=None):
    print("Call listener1")
    raise Exception("aa")

def listener2(arg1, arg2=None):
    print("Call listener2")
    raise Exception("aa")

def start_pubsub():

    #pub.setListenerExcHandler(MyHandler())

    print("subscribe")
    pub.subscribe(listener1, "topic1")
    pub.subscribe(listener2, "topic1")

    print("sendMessage!!")
    try:
        pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")
    except Exception as e:
        print("Occur Exception")


start_pubsub()

ハンドラーを設定している場合、例外が発生するとハンドラーに登録している関数が実行され、トピックのリスナーはすべて実行される。 以下の場合、listener1もlistener2も実行される。 トピック単位でハンドラーの設定は出来ない。 トピック単位で例外発生時の処理は変わるので、ハンドラーの設定はトピック単位で出来たほうがいいと思う。

from pubsub import pub

class MyHandler(pub.IListenerExcHandler):
    def __call__(self, listerID, topicObj):
        print("listerID = " + str(listerID))
        print("topicObj = " + str(topicObj))

def listener1(arg1, arg2=None):
    print("Call listener1")
    raise Exception("aa")

def listener2(arg1, arg2=None):
    print("Call listener2")
    raise Exception("aa")

def start_pubsub():

    pub.setListenerExcHandler(MyHandler())

    print("subscribe")
    pub.subscribe(listener1, "topic1")
    pub.subscribe(listener2, "topic1")

    print("sendMessage!!")
    try:
        pub.sendMessage("topic1", arg1="hoge1", arg2="hoge2")
    except Exception as e:
        print("Occur Exception")


start_pubsub()

pub/subのトレース

何か問題が発生した際に、処理をトレースする必要がある。 その際に、PubSubをトレースするために役立つ仕組みも取り入れられている。

pypubsub.readthedocs.io

INotificationHandlerを継承したクラスを用意し、addNotificationHandler関数で継承したクラスを登録する。 setNotificationFlags関数でトレースするアクティビティにフラグを設定する。 以下のソースは、subscribeとsendMessageをトレースする例になっている。

from pubsub import pub
from pubsub.core import INotificationHandler
from pubsub.core.listener import Listener
from pubsub.core.topicobj import Topic

class MyNotifHandler(INotificationHandler):
    def notifySubscribe(self, *args, **kwargs):
        print("notifySubscribe#MyNotifHandler called")

    def notifySend(self, stage: str, topicObj: Topic, pubListener: Listener = None):
        print("notifySend#MyNotifHandler called")


def listener1(arg1, arg2=None):
    print("Call listener1")


pub.addNotificationHandler(MyNotifHandler())
pub.setNotificationFlags(subscribe=True, sendMessage=True)

print("Call subscribe function")
pub.subscribe(listener1, "topic1")
print("Call sendMessage")
pub.sendMessage("topic1", arg1="sendmessage")

実行すると、notifySendが3回呼ばれている。3回呼ばれている理由は不明。 時間があれば、実装を見てみる。

Call subscribe function
notifySubscribe#MyNotifHandler called
Call sendMessage
notifySend#MyNotifHandler called
notifySend#MyNotifHandler called
Call listener1
notifySend#MyNotifHandler called

INotificationHandlerがない?

ドキュメントでは、INotificationHandlerは pubモジュールにあるようだけど、実際はcoreの 方にあり、ドキュメントが間違ってた。

pypubsub.readthedocs.io

最後に

今回、ロバストPythonにこのpub/subのライブラリが載っていたので、使ってみた。 簡単にpub/subの仕組みが取り入れられて、簡単に実装することが出来て、勉強になった。 ドキュメントの誤り?も見つけられたので、少しだけ貢献できたかな・・・。 プルリクエストしても、最近メンテされてないようなので、時間があったら出してみよう。

issue出した

github.com

publishとブロードキャストの違いも載ってる

pypubsub.readthedocs.io