Redis の Pub/Sub を Python から使ってみたのでまとめてみる。
Contents
Pub/Sub メッセージングモデルとは?
"Pub" は Publish, "Sub" は Subscribe の略で、
日本語だと出版と購読などと訳されます。
予め仲介者に subscribe したい "チャンネル" を登録しておくと、
誰かがその "チャンネル" に向かって publish を行うと、
全ての subscriber に向かってその内容が配信されます。
pub-sub が1対1である必要はなく、1-多数 でも 複数-複数 でも問題ありません。
pub/sub の双方は、相手がどこの誰であるかを知っている必要もありません。
今回はこの pub/sub を redis を介して行います。
Redis における Pub/Sub
まずは redis-cli からコマンドベースで Pub/Sub を試します。
Subscribe
コマンドは以下です。
1 |
> SUBSCRIBE channel [channel ...] |
チャンネル名は複数でも構いません。
コマンドを打つと listen状態に入り、指定したチャンネルからの配信待ちになります。
例)
1 2 3 4 5 |
> subscribe mychannel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "mychannel" 3) (integer) 1 |
最初の応答は、subscribe できたというレスポンスで、
publish されたものではないので注意が必要です。
Publish
コマンドは以下です。
1 |
> PUBLISH channel message |
チャンネル名は Subscribe側と同じものを指定します。
例)
Publish側:
1 |
> publish mychannel mymessage |
Subscribe側:
1 2 3 4 5 6 |
> subscribe mychannel Reading messages... (press Ctrl-C to quit) ... 1) "message" 2) "mychannel" 3) "mymessage" |
Python で redis と pub/sub を利用する
redis-py モジュール
python 実装はいくつもあるようですが、
おそらく一番普及している redis-py モジュールを用いました。
Anaconda からは
$ conda install redis-py で入ります。
あとは、
1 2 3 |
import redis rc=redis.Redis(host=myhost, port=...) |
みたいな感じで生成できます。
Publish
Publish 側は簡単です。
先に生成したオブジェクトに対して、
1 |
rc.publish("mychannel", "mymessage") |
だけです。
Subscribe
Subscribe 側はちょっとだけ作業が必要です。
1 2 3 4 |
ps=rc.pubsub() ps.subscribe("mychannel") ps.listen() ... |
上記のようにすれば使えるには使えるのですが、
問題は listen が応答するまで掴んだままそこで止まってしまうという仕様です。
'mychannel' に何かが publish されて始めて、それ以降に進みます。
受信したメッセージを単発 print するだけみたいな簡単なコードならいいですが、
実際はそういう訳にもいかないので、大体以下のようにします(要所のみ)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import redis from multiprocessing import Process class myClass: def __init__(self, ...): ... self.rc=redis.Redis(host=...) self.ps=self.rc.pubsub() self.ps.subscribe('mychannel') self.proc=Process(target=self.subsc_listen) self.proc.start() def subsc_listen(self): for mes in self.ps.listen() print(mes) ... |
上記のように Process 使って、listen() を別プロセスで走らせておきます。
これで Publish を待つ間、メインプロセスは別のことをしていることができます。
さらに redis に限らず、multiprocessing モジュールを使うと往々にしてなのですが、
上記のように listen 内容を print で永久に垂れ流すだけというのもそうないと思うので、
大体以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import redis from multiprocessing import Value, Process from enum import Enum class myStatus(Enum): INIT=0 ... END statusList=['INIT', ..., 'END'] class myClass: def __init__(self, ...): ... self.status=Value('i', myStatus.INIT.value) self.subsc_established=False ... self.rc=redis.Redis(host=...) self.ps=self.rc.pubsub() self.ps.subscribe('mychannel') self.proc=Process(target=self.subsc_listen) self.proc.start() def __del__(self): self.proc.join() ... def subsc_listen(self): for mes in self.ps.listen() # the first message, successfully established?? if not self.subsc_established: if mes['data']==1: self.subsc_established=True else: ... continue # main self.mes=mes[data] if type(self.mes)==type(b'byte'): self.mes=self.mes.decode() if self.mes in statusList: self.status.value=myStatus[self.mes].value if self.status.value==myStatus.END.value: break else: ... |
プロセス間の共有メモリ Value/Array
随分増えましたが、一番のポイントは 14行目の Value です。
これはプロセス間の共有メモリです。
listen しているプロセスが値を書き換えて、それを元プロセスからも参照する場合などは
このように(或いは Arrayを使うか)しておく必要があります。
今回の例では、別プロセスで動く subsc_listen 関数内(41行目)で、
listen したメッセージを元に、status を詰め替えしています。
最初のメッセージの例外処理
subsc_listen関数内 28行目から。
redis-cli でもそうでしたが、最初のメッセージは subscribe の成否です。
大体以降で自分が欲しい値とは異なるので、別処理を入れておく必要があります。
listen/subscribe を終わらせる
なにごとにも終わりは来るものですが、
listen を for で回すだけだと永久に終わりがきません。
上記の例では 'END' という status を受け取ると終了するようにしています(42行目)。
この処理で for ループを抜けると、subsc_listen が終わり、
この関数を動かしていた self.proc が join され、
晴れて myClass が del できるようになります。