介紹如何在 Python 中透過 Redis 資料庫,實作訊息的發布(publish)與訂閱(subscribe)模式。
安裝與測試 redis-py
模組
在 Python 中若要使用 Redis 資料庫,可以採用 redis-py
模組,此模組可以使用 pip
安裝:
# 安裝 redis-py 模組
pip install redis
安裝好之後,可以進行基本的 Redis 連線測試:
import redis # 建立 Redis 資料庫連線 r = redis.Redis(host='localhost', port=6379, db=0) # 連線測試 print(r.ping())
True
確認 Redis 資料庫可以正常連線之後,就可以開始撰寫發布與訂閱模式的程式了。
發布者
首先撰寫一個發布者指令稿,在建立 Redis 連線之後,使用 Redis 資料庫連線物件的 publish
方法函數,發布訊息至指定的頻道:
# 發布者指令稿 import redis # 建立 Redis 資料庫連線 r = redis.Redis(host='localhost', port=6379, db=0) # 發布訊息至 my-first-channel 頻道 r.publish('my-first-channel', 'some data')
訂閱者
另外再撰寫一個訂閱者指令稿,負責接收訊息。建立 Redis 資料庫連線之後,再建立一個 PubSub 物件,呼叫 PubSub 物件的 subscribe
方法函數,訂閱指定的頻道,最後透過 PubSub 物件的 get_message
方法函數取得訊息。
# 訂閱者指令稿 import redis import time # 建立 Redis 資料庫連線 r = redis.Redis(host='localhost', port=6379, db=0) # 建立 PubSub 物件 p = r.pubsub() # 訂閱 my-first-channel 頻道 p.subscribe('my-first-channel') # 取得訊息 while True: if msg := p.get_message(): print(msg) time.sleep(0.001)
{'type': 'subscribe', 'pattern': None, 'channel': b'my-first-channel', 'data': 1} {'type': 'message', 'pattern': None, 'channel': b'my-first-channel', 'data': b'some data'}
從 get_message
函數所取得的訊息是一個 dictionary,包含以下幾個欄位:
type
:訊息類型,可能的值有:subscribe
、unsubscribe
、psubscribe
、punsubscribe
、message
、pmessage
。channel
:訊息所屬頻道。pattern
:匹配頻道的 pattern,僅適用於pmessage
類型。data
:對於message
、pmessage
類型來說,此值為實際的訊息內容;對於subscribe
、unsubscribe
類型,此值則為目前訂閱頻道的數量。
在訂閱者接收到的訊息中,除了資料本身之外,還包含了許多訂閱確認的訊息,如果想要忽略訂閱確認訊息,只處理資料,可以在建立 PubSub 物件時加上 ignore_subscribe_messages=True
參數:
# 建立 PubSub 物件(忽略訂閱確認訊息,只處理資料) p = r.pubsub(ignore_subscribe_messages=True)
阻斷式接收訊息
在舊式的實作方式中,訂閱者會以阻斷式(block)的方式接收訊息,如果訂閱者除了接收訊息之外,並沒有其他的工作,那採用這種方式會比較簡單:
# 舊式接收訊息(阻斷式) for msg in p.listen(): print(msg)
多頻道發布與訂閱模式
若要同時訂閱多個頻道,一種方式是在呼叫 subscribe
函數時,指定多個頻道:
# 訂閱 my-first-channel 與 my-second-channel 兩個頻道 p.subscribe('my-first-channel', 'my-second-channel')
另一種方式是以指定 pattern 的方式,一次訂閱所有符合 pattern 的頻道:
# 訂閱所有由 my- 開頭的頻道 p.psubscribe('my-*')
取消訂閱頻道
若要取消訂閱頻道,可以呼叫 unsubscribe
方法函數:
# 取消訂閱 my-first-channel 頻道 p.unsubscribe('my-first-channel')
若呼叫 unsubscribe
方法函數,不帶任何參數,則會取消訂閱所有頻道:
# 取消訂閱所有頻道
p.unsubscribe()
若要取消以 pattern 訂閱的頻道,可以使用 punsubscribe
方法函數:
# 取消以 pattern 訂閱的頻道 p.punsubscribe('my-*')
當使用完 PubSub 物件之後,關閉其連線:
# 關閉 PubSub 連線
p.close()
訂閱者執行緒
發布者與訂閱者通常在實務上會是獨立的兩個程式,如果要在一個 Python 程式之內同時實作發布者與訂閱者,可以考慮使用多執行緒的方式,將發布者與訂閱者放在不同的執行緒中,分別執行不同的工作。
PubSub 本身就有支援執行緒的功能,讓訂閱者在獨立的執行緒中處理接收到的工作,而在主執行緒中發布者可以處理其他工作或是發布新訊息:
import redis # 建立 Redis 資料庫連線 r = redis.Redis(host='localhost', port=6379, db=0) # 建立 PubSub 物件 p = r.pubsub() # 自訂訂閱者處理函數 def my_handler(message): print('MY HANDLER: ', message['data']) # 訂閱 my-channel 頻道,指定訂閱者處理函數 p.subscribe(**{'my-channel': my_handler}) # 啟動訂閱者執行緒 thread = p.run_in_thread(sleep_time=0.001) # 發布訊息至 my-channel 頻道 r.publish('my-channel', 'some data') # 停止訂閱者執行緒 thread.stop()
MY HANDLER: b'some data'