Redis の Pub/Sub を Python から使ってみたのでまとめてみる。
Contents
Pub/Sub メッセージングモデルとは?
"Pub" は Publish, "Sub" は Subscribe の略で、
日本語だと出版と購読などと訳されます。
予め仲介者に subscribe したい "チャンネル" を登録しておくと、
誰かがその "チャンネル" に向かって publish を行うと、
全ての subscriber に向かってその内容が配信されます。
pub-sub が1対1である必要はなく、1-多数 でも 複数-複数 でも問題ありません。
pub/sub の双方は、相手がどこの誰であるかを知っている必要もありません。
今回はこの pub/sub を redis を介して行います。
 
Redis における Pub/Sub
まずは redis-cli からコマンドベースで Pub/Sub を試します。
 
Subscribe
コマンドは以下です。
| 1 | > SUBSCRIBE channel [channel ...] | 
チャンネル名は複数でも構いません。
コマンドを打つと listen状態に入り、指定したチャンネルからの配信待ちになります。
 
例)
| 1 2 3 4 5 | > subscribe mychannel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "mychannel" 3) (integer) 1 | 
最初の応答は、subscribe できたというレスポンスで、
publish されたものではないので注意が必要です。
	
Publish
コマンドは以下です。
| 1 | > PUBLISH channel message | 
チャンネル名は Subscribe側と同じものを指定します。
 
例)
Publish側:
| 1 | > publish mychannel mymessage | 
Subscribe側:
| 1 2 3 4 5 6 | > subscribe mychannel Reading messages... (press Ctrl-C to quit) ... 1) "message" 2) "mychannel" 3) "mymessage" | 
Python で redis と pub/sub を利用する
redis-py モジュール
python 実装はいくつもあるようですが、
おそらく一番普及している redis-py モジュールを用いました。
Anaconda からは  
			$ conda install redis-py で入ります。
あとは、
| 1 2 3 | import redis rc=redis.Redis(host=myhost, port=...) | 
みたいな感じで生成できます。
 
Publish
Publish 側は簡単です。
先に生成したオブジェクトに対して、
| 1 | rc.publish("mychannel", "mymessage") | 
だけです。
	
Subscribe
Subscribe 側はちょっとだけ作業が必要です。
| 1 2 3 4 | ps=rc.pubsub() ps.subscribe("mychannel") ps.listen() ... | 
上記のようにすれば使えるには使えるのですが、
問題は listen が応答するまで掴んだままそこで止まってしまうという仕様です。
'mychannel' に何かが publish されて始めて、それ以降に進みます。
受信したメッセージを単発 print するだけみたいな簡単なコードならいいですが、
実際はそういう訳にもいかないので、大体以下のようにします(要所のみ)。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import redis from multiprocessing import Process class myClass:     def __init__(self, ...):         ...         self.rc=redis.Redis(host=...)         self.ps=self.rc.pubsub()         self.ps.subscribe('mychannel')         self.proc=Process(target=self.subsc_listen)         self.proc.start()     def subsc_listen(self):         for mes in self.ps.listen()             print(mes)             ... | 
上記のように Process 使って、listen() を別プロセスで走らせておきます。
これで Publish を待つ間、メインプロセスは別のことをしていることができます。
さらに redis に限らず、multiprocessing モジュールを使うと往々にしてなのですが、
上記のように listen 内容を print で永久に垂れ流すだけというのもそうないと思うので、
大体以下のようになります。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import redis from multiprocessing import Value, Process from enum import Enum class myStatus(Enum):     INIT=0     ...     END statusList=['INIT', ..., 'END'] class myClass:     def __init__(self, ...):         ...         self.status=Value('i', myStatus.INIT.value)         self.subsc_established=False         ...         self.rc=redis.Redis(host=...)         self.ps=self.rc.pubsub()         self.ps.subscribe('mychannel')         self.proc=Process(target=self.subsc_listen)         self.proc.start()     def __del__(self):         self.proc.join()         ...     def subsc_listen(self):         for mes in self.ps.listen()             # the first message, successfully established??             if not self.subsc_established:                 if mes['data']==1:                     self.subsc_established=True                 else:                     ...                 continue             # main             self.mes=mes[data]             if type(self.mes)==type(b'byte'):                 self.mes=self.mes.decode()             if self.mes in statusList:                 self.status.value=myStatus[self.mes].value                 if self.status.value==myStatus.END.value:                     break             else:                 ... | 
プロセス間の共有メモリ Value/Array
随分増えましたが、一番のポイントは 14行目の Value です。
これはプロセス間の共有メモリです。
listen しているプロセスが値を書き換えて、それを元プロセスからも参照する場合などは
このように(或いは Arrayを使うか)しておく必要があります。
今回の例では、別プロセスで動く subsc_listen 関数内(41行目)で、
listen したメッセージを元に、status  を詰め替えしています。
 
最初のメッセージの例外処理
subsc_listen関数内 28行目から。
redis-cli でもそうでしたが、最初のメッセージは subscribe の成否です。
大体以降で自分が欲しい値とは異なるので、別処理を入れておく必要があります。
 
listen/subscribe を終わらせる
なにごとにも終わりは来るものですが、
listen を for で回すだけだと永久に終わりがきません。
上記の例では 'END' という status を受け取ると終了するようにしています(42行目)。
この処理で for ループを抜けると、subsc_listen が終わり、
この関数を動かしていた self.proc が join され、
晴れて myClass が del できるようになります。