Python

Python 以 Redis 資料庫實作發布 Publish 訂閱 Subscribe 模式教學與範例

介紹如何在 Python 中透過 Redis 資料庫,實作訊息的發布(publish)與訂閱(subscribe)模式。

安裝與測試 redis-py 模組

在 Python 中若要使用 Redis 資料庫,可以採用 redis-py 模組,此模組可以使用 pip 安裝:

# 安裝 redis-py 模組
pip install redis

安裝好之後,可以進行基本的 Redis 連線測試:

import redis

# 建立 Redis 資料庫連線
r = redis.Redis(host='localhost', port=6379, db=0)

# 連線測試
print(r.ping())
True

確認 Redis 資料庫可以正常連線之後,就可以開始撰寫發布與訂閱模式的程式了。

發布者

首先撰寫一個發布者指令稿,在建立 Redis 連線之後,使用 Redis 資料庫連線物件的 publish 方法函數,發布訊息至指定的頻道:

# 發布者指令稿
import redis

# 建立 Redis 資料庫連線
r = redis.Redis(host='localhost', port=6379, db=0)

# 發布訊息至 my-first-channel 頻道
r.publish('my-first-channel', 'some data')

訂閱者

另外再撰寫一個訂閱者指令稿,負責接收訊息。建立 Redis 資料庫連線之後,再建立一個 PubSub 物件,呼叫 PubSub 物件的 subscribe 方法函數,訂閱指定的頻道,最後透過 PubSub 物件的 get_message 方法函數取得訊息。

# 訂閱者指令稿
import redis
import time

# 建立 Redis 資料庫連線
r = redis.Redis(host='localhost', port=6379, db=0)

# 建立 PubSub 物件
p = r.pubsub()

# 訂閱 my-first-channel 頻道
p.subscribe('my-first-channel')

# 取得訊息
while True:
    if msg := p.get_message():
        print(msg)
    time.sleep(0.001)
{'type': 'subscribe', 'pattern': None, 'channel': b'my-first-channel', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'my-first-channel', 'data': b'some data'}

get_message 函數所取得的訊息是一個 dictionary,包含以下幾個欄位:

  • type:訊息類型,可能的值有:subscribeunsubscribepsubscribepunsubscribemessagepmessage
  • channel:訊息所屬頻道。
  • pattern:匹配頻道的 pattern,僅適用於 pmessage 類型。
  • data:對於 messagepmessage 類型來說,此值為實際的訊息內容;對於 subscribeunsubscribe 類型,此值則為目前訂閱頻道的數量。

在訂閱者接收到的訊息中,除了資料本身之外,還包含了許多訂閱確認的訊息,如果想要忽略訂閱確認訊息,只處理資料,可以在建立 PubSub 物件時加上 ignore_subscribe_messages=True 參數:

# 建立 PubSub 物件(忽略訂閱確認訊息,只處理資料)
p = r.pubsub(ignore_subscribe_messages=True)

阻斷式接收訊息

在舊式的實作方式中,訂閱者會以阻斷式(block)的方式接收訊息,如果訂閱者除了接收訊息之外,並沒有其他的工作,那採用這種方式會比較簡單:

# 舊式接收訊息(阻斷式)
for msg in p.listen():
    print(msg)

多頻道發布與訂閱模式

若要同時訂閱多個頻道,一種方式是在呼叫 subscribe 函數時,指定多個頻道:

# 訂閱 my-first-channel 與 my-second-channel 兩個頻道
p.subscribe('my-first-channel', 'my-second-channel')

另一種方式是以指定 pattern 的方式,一次訂閱所有符合 pattern 的頻道:

# 訂閱所有由 my- 開頭的頻道
p.psubscribe('my-*')

取消訂閱頻道

若要取消訂閱頻道,可以呼叫 unsubscribe 方法函數:

# 取消訂閱 my-first-channel 頻道
p.unsubscribe('my-first-channel')

若呼叫 unsubscribe 方法函數,不帶任何參數,則會取消訂閱所有頻道:

# 取消訂閱所有頻道
p.unsubscribe()

若要取消以 pattern 訂閱的頻道,可以使用 punsubscribe 方法函數:

# 取消以 pattern 訂閱的頻道
p.punsubscribe('my-*')

當使用完 PubSub 物件之後,關閉其連線:

# 關閉 PubSub 連線
p.close()

訂閱者執行緒

發布者與訂閱者通常在實務上會是獨立的兩個程式,如果要在一個 Python 程式之內同時實作發布者與訂閱者,可以考慮使用多執行緒的方式,將發布者與訂閱者放在不同的執行緒中,分別執行不同的工作。

PubSub 本身就有支援執行緒的功能,讓訂閱者在獨立的執行緒中處理接收到的工作,而在主執行緒中發布者可以處理其他工作或是發布新訊息:

import redis

# 建立 Redis 資料庫連線
r = redis.Redis(host='localhost', port=6379, db=0)

# 建立 PubSub 物件
p = r.pubsub()

# 自訂訂閱者處理函數
def my_handler(message):
    print('MY HANDLER: ', message['data'])

# 訂閱 my-channel 頻道,指定訂閱者處理函數
p.subscribe(**{'my-channel': my_handler})

# 啟動訂閱者執行緒
thread = p.run_in_thread(sleep_time=0.001)

# 發布訊息至 my-channel 頻道
r.publish('my-channel', 'some data')

# 停止訂閱者執行緒
thread.stop()
MY HANDLER:  b'some data'
Share
Published by
Office Guide
Tags: Redis

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago