介紹如何在 Python 中使用 Paho 函式庫實作 MQTT 協定的發布與訂閱架構,傳送與接收訊息。
Eclipse Paho 是一套開放原始碼的 MQTT 函式庫,有各種程式語言的實作版本,在 Python 中可使用 Eclipse Paho MQTT Python Client,以下是使用教學與範例。
基本 MQTT Client
以下是基本的 MQTT 訊息訂閱者(subscriber)指令稿 subscriber1.py
,使用帳號密碼登入指定的 MQTT 伺服器後,接收 hello/world
這個主題的訊息:
# 訂閱者(subscriber)指令稿 subscriber1.py import paho.mqtt.client as mqtt # 建立連線(接收到 CONNACK)的回呼函數 def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # 每次連線之後,重新設定訂閱主題 client.subscribe("hello/world") # 接收訊息(接收到 PUBLISH)的回呼函數 def on_message(client, userdata, msg): print("[{}]: {}".format(msg.topic, str(msg.payload))) # 建立 MQTT Client 物件 client = mqtt.Client() # 設定建立連線回呼函數 client.on_connect = on_connect # 設定接收訊息回呼函數 client.on_message = on_message # 設定登入帳號密碼(若無則可省略) client.username_pw_set("myuser","mypassword") # 連線至 MQTT 伺服器(伺服器位址,連接埠) client.connect("mqtt.example.com", 1883) # 進入無窮處理迴圈 client.loop_forever()
這裡的 username_pw_set()
函數可以用來設定登入 MQTT 伺服器用的帳號與密碼,若自己的 MQTT 伺服器沒有設定帳號與密碼,就可以把這一行刪除。
以下則是基本的 MQTT 訊息發布者(publisher)指令稿 publisher1.py
,使用帳號密碼登入指定的 MQTT 伺服器後,將訊息發布至 hello/world
這個主題:
# 發布者(publisher)指令稿 publisher1.py import paho.mqtt.client as mqtt # 建立 MQTT Client 物件 client = mqtt.Client() # 設定登入帳號密碼(若無則可省略) client.username_pw_set("myuser","mypassword") # 連線至 MQTT 伺服器(伺服器位址,連接埠) client.connect("mqtt.example.com", 1883) # 發布訊息至 hello/world 主題 client.publish("hello/world", "test message")
在使用時要先執行訂閱者指令稿 subscriber1.py
,讓它等待 hello/world
這個主題的訊息:
# 執行訂閱者指令稿
python subscriber1.py
Connected with result code 0
再另外開啟一個終端機執行發布者指令稿 publisher1.py
,將訊息遞送至 hello/world
這個主題:
# 執行發布者指令稿
python publisher1.py
這樣訂閱者就可以收到發布者所傳送的訊息了:
[hello/world]: b'test message'
MQTTS 加密連線
如果自己的 MQTT 伺服器支援加密連線(SSL/TLS),就可以採用加密連線的方式來傳輸資料,可以增強資料傳輸的安全性。
以下是採用 MQTTS 加密連線(SSL/TLS)的訂閱者指令稿 subscriber2.py
,在實作上加密與非加密連線只有兩個小差異,一個是呼叫 tls_set()
設定加密連線,另一個是將連接埠更改為 MQTTS 用的 8883
:
# 訂閱者(subscriber)指令稿 subscriber2.py import paho.mqtt.client as mqtt # 建立連線(接收到 CONNACK)的回呼函數 def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # 每次連線之後,重新設定訂閱主題 client.subscribe("hello/world") # 接收訊息(接收到 PUBLISH)的回呼函數 def on_message(client, userdata, msg): print("[{}]: {}".format(msg.topic, str(msg.payload))) # 建立 MQTT Client 物件 client = mqtt.Client() # 設定建立連線回呼函數 client.on_connect = on_connect # 設定接收訊息回呼函數 client.on_message = on_message # 設定登入帳號密碼(若無則可省略) client.username_pw_set("myuser","mypassword") # 設定 SSL/TLS 加密連線 client.tls_set() # 連線至 MQTT 伺服器(伺服器位址,連接埠) client.connect("mqtt.example.com", 8883) # 進入無窮處理迴圈 client.loop_forever()
以下則是採用 MQTTS 加密連線(SSL/TLS)的發布者指令稿 publisher2.py
:
# 發布者(publisher)指令稿 publisher2.py import paho.mqtt.client as mqtt # 建立 MQTT Client 物件 client = mqtt.Client() # 設定登入帳號密碼(若無則可省略) client.username_pw_set("myuser","mypassword") # 設定 SSL/TLS 加密連線 client.tls_set() # 連線至 MQTT 伺服器(伺服器位址,連接埠) client.connect("mqtt.example.com", 8883) # 發布訊息至 hello/world 主題 client.publish("hello/world", "test message")
採用加密連線的指令稿在使用上都跟未加密的版本相同。
發布一次性 MQTT 訊息
如果在 Python 指令稿中只需要發布一次 MQTT 訊息,後續就不再需要發布任何 MQTT 訊息,此時就可以改用 publish
這個輔助模組來處理,程式碼的寫法上會比較簡潔。
發送單筆 MQTT 訊息
若要發送單一筆 MQTT 訊息,可用 publish
模組的 single()
函數:
import paho.mqtt.publish as publish # 發布一則 MQTT 訊息 publish.single( topic="hello/world", payload="test message", hostname="mqtt.example.com", port=1883, auth={'username':'myuser','password':'mypassword'})
以下是使用 publish
發布一次性 MQTT 訊息的加密版本:
import paho.mqtt.publish as publish # 發布一則 MQTT 訊息(加密傳輸) publish.single( topic="hello/world", payload="test message", hostname="mqtt.example.com", port=8883, auth={'username':'myuser','password':'mypassword'}, tls={})
發送多筆 MQTT 訊息
若一次要發送多筆 MQTT 訊息,則可改用 publish
模組的 multiple()
函數:
import paho.mqtt.publish as publish # 準備要傳送的訊息 messages = [ {'topic':"hello/world", 'payload':"test message"}, {'topic':"hello/world", 'payload':"test message", 'qos':0, 'retain':False}, ("hello/world", "test message 2", 0, False) # topic, payload, qos, retain ] # 發布多則 MQTT 訊息 publish.multiple( messages, hostname="mqtt.example.com", port=1883, auth={'username':'myuser','password':'mypassword'})
以下是一次傳送多筆 MQTT 訊息的加密版本:
import paho.mqtt.publish as publish # 準備要傳送的訊息 messages = [ {'topic':"hello/world", 'payload':"test message"}, {'topic':"hello/world", 'payload':"test message", 'qos':0, 'retain':False}, ("hello/world", "test message 2", 0, False) # topic, payload, qos, retain ] # 發布一則 MQTT 訊息(加密傳輸) publish.multiple( messages, hostname="mqtt.example.com", port=8883, auth={'username':'myuser','password':'mypassword'}, tls={})
多執行緒
如果希望一個 Python 指令稿中,可以同時處理發送與接收 MQTT 訊息,可以採用多執行緒的方式來處理,在連線至 MQTT 伺服器之後,以 loop_start()
啟動背景的 loop 執行緒,負責接收與處理 MQTT 訊息,而主執行緒則可以負責產生資料,並發送 MQTT 訊息至 MQTT 伺服器:
# 多執行緒指令稿 multithread.py import paho.mqtt.client as mqtt import time, datetime def on_connect(client, userdata, flags, rc): client.subscribe("hello/world") def on_message(client, userdata, msg): print("[{}]: {}".format(msg.topic, str(msg.payload))) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.username_pw_set("myuser","mypassword") client.connect("mqtt.example.com", 1883) # 啟動 loop 執行緒 client.loop_start() # 主執行緒工作 while True: # 產生資料 msg = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S') # 發送 MQTT 訊息 client.publish("hello/world", msg) time.sleep(1)
這段指令稿在執行之後,就會每隔一秒發送包含時間的 MQTT 訊息,同時由背景的 loop 執行緒將這些訊息收回來,在實務上我們可以在主執行緒中收取感測器的資料,將處理好的資料連同時間一起發送出去。
由 loop_start()
所發起的背景 loop 執行緒,可以藉由呼叫 loop_stop()
將其終止。