Categories: Python

Python 使用 Paho 函式庫發布、訂閱 MQTT 訊息教學與範例

介紹如何在 Python 中使用 Paho 函式庫實作 MQTT 協定的發布與訂閱架構,傳送與接收訊息。

Eclipse Paho 是一套開放原始碼的 MQTT 函式庫,有各種程式語言的實作版本,在 Python 中可使用 Eclipse Paho MQTT Python Client,以下是使用教學與範例。

基本 MQTT Client

以下是基本的 MQTT 訊息訂閱者(subscriber)指令稿 subscriber1.py,使用帳號密碼登入指定的 MQTT 伺服器後,接收 hello/world 這個主題的訊息:

# 訂閱者(subscriber)指令稿 subscriber1.py
import paho.mqtt.client as mqtt

# 建立連線(接收到 CONNACK)的回呼函數
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # 每次連線之後,重新設定訂閱主題
    client.subscribe("hello/world")

# 接收訊息(接收到 PUBLISH)的回呼函數
def on_message(client, userdata, msg):
    print("[{}]: {}".format(msg.topic, str(msg.payload)))

# 建立 MQTT Client 物件
client = mqtt.Client()

# 設定建立連線回呼函數
client.on_connect = on_connect

# 設定接收訊息回呼函數
client.on_message = on_message

# 設定登入帳號密碼(若無則可省略)
client.username_pw_set("myuser","mypassword")

# 連線至 MQTT 伺服器(伺服器位址,連接埠)
client.connect("mqtt.example.com", 1883)

# 進入無窮處理迴圈
client.loop_forever()

這裡的 username_pw_set() 函數可以用來設定登入 MQTT 伺服器用的帳號與密碼,若自己的 MQTT 伺服器沒有設定帳號與密碼,就可以把這一行刪除。

以下則是基本的 MQTT 訊息發布者(publisher)指令稿 publisher1.py,使用帳號密碼登入指定的 MQTT 伺服器後,將訊息發布至 hello/world 這個主題:

# 發布者(publisher)指令稿 publisher1.py
import paho.mqtt.client as mqtt

# 建立 MQTT Client 物件
client = mqtt.Client()

# 設定登入帳號密碼(若無則可省略)
client.username_pw_set("myuser","mypassword")

# 連線至 MQTT 伺服器(伺服器位址,連接埠)
client.connect("mqtt.example.com", 1883)

# 發布訊息至 hello/world 主題
client.publish("hello/world", "test message")

在使用時要先執行訂閱者指令稿 subscriber1.py,讓它等待 hello/world 這個主題的訊息:

# 執行訂閱者指令稿
python subscriber1.py
Connected with result code 0

再另外開啟一個終端機執行發布者指令稿 publisher1.py,將訊息遞送至 hello/world 這個主題:

# 執行發布者指令稿
python publisher1.py

這樣訂閱者就可以收到發布者所傳送的訊息了:

[hello/world]: b'test message'

MQTTS 加密連線

如果自己的 MQTT 伺服器支援加密連線(SSL/TLS),就可以採用加密連線的方式來傳輸資料,可以增強資料傳輸的安全性。

以下是採用 MQTTS 加密連線(SSL/TLS)的訂閱者指令稿 subscriber2.py,在實作上加密與非加密連線只有兩個小差異,一個是呼叫 tls_set() 設定加密連線,另一個是將連接埠更改為 MQTTS 用的 8883

# 訂閱者(subscriber)指令稿 subscriber2.py
import paho.mqtt.client as mqtt

# 建立連線(接收到 CONNACK)的回呼函數
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # 每次連線之後,重新設定訂閱主題
    client.subscribe("hello/world")

# 接收訊息(接收到 PUBLISH)的回呼函數
def on_message(client, userdata, msg):
    print("[{}]: {}".format(msg.topic, str(msg.payload)))

# 建立 MQTT Client 物件
client = mqtt.Client()

# 設定建立連線回呼函數
client.on_connect = on_connect

# 設定接收訊息回呼函數
client.on_message = on_message

# 設定登入帳號密碼(若無則可省略)
client.username_pw_set("myuser","mypassword")

# 設定 SSL/TLS 加密連線
client.tls_set()

# 連線至 MQTT 伺服器(伺服器位址,連接埠)
client.connect("mqtt.example.com", 8883)

# 進入無窮處理迴圈
client.loop_forever()

以下則是採用 MQTTS 加密連線(SSL/TLS)的發布者指令稿 publisher2.py

# 發布者(publisher)指令稿 publisher2.py
import paho.mqtt.client as mqtt

# 建立 MQTT Client 物件
client = mqtt.Client()

# 設定登入帳號密碼(若無則可省略)
client.username_pw_set("myuser","mypassword")

# 設定 SSL/TLS 加密連線
client.tls_set()

# 連線至 MQTT 伺服器(伺服器位址,連接埠)
client.connect("mqtt.example.com", 8883)

# 發布訊息至 hello/world 主題
client.publish("hello/world", "test message")

採用加密連線的指令稿在使用上都跟未加密的版本相同。

發布一次性 MQTT 訊息

如果在 Python 指令稿中只需要發布一次 MQTT 訊息,後續就不再需要發布任何 MQTT 訊息,此時就可以改用 publish 這個輔助模組來處理,程式碼的寫法上會比較簡潔。

發送單筆 MQTT 訊息

若要發送單一筆 MQTT 訊息,可用 publish 模組的 single() 函數:

import paho.mqtt.publish as publish

# 發布一則 MQTT 訊息
publish.single(
  topic="hello/world",
  payload="test message",
  hostname="mqtt.example.com",
  port=1883,
  auth={'username':'myuser','password':'mypassword'})

以下是使用 publish 發布一次性 MQTT 訊息的加密版本:

import paho.mqtt.publish as publish

# 發布一則 MQTT 訊息(加密傳輸)
publish.single(
  topic="hello/world",
  payload="test message",
  hostname="mqtt.example.com",
  port=8883,
  auth={'username':'myuser','password':'mypassword'},
  tls={})

發送多筆 MQTT 訊息

若一次要發送多筆 MQTT 訊息,則可改用 publish 模組的 multiple() 函數:

import paho.mqtt.publish as publish

# 準備要傳送的訊息
messages = [
  {'topic':"hello/world", 'payload':"test message"},
  {'topic':"hello/world", 'payload':"test message", 'qos':0, 'retain':False},
  ("hello/world", "test message 2", 0, False) # topic, payload, qos, retain
]

# 發布多則 MQTT 訊息
publish.multiple(
  messages,
  hostname="mqtt.example.com",
  port=1883,
  auth={'username':'myuser','password':'mypassword'})

以下是一次傳送多筆 MQTT 訊息的加密版本:

import paho.mqtt.publish as publish

# 準備要傳送的訊息
messages = [
  {'topic':"hello/world", 'payload':"test message"},
  {'topic':"hello/world", 'payload':"test message", 'qos':0, 'retain':False},
  ("hello/world", "test message 2", 0, False) # topic, payload, qos, retain
]

# 發布一則 MQTT 訊息(加密傳輸)
publish.multiple(
  messages,
  hostname="mqtt.example.com",
  port=8883,
  auth={'username':'myuser','password':'mypassword'},
  tls={})

多執行緒

如果希望一個 Python 指令稿中,可以同時處理發送與接收 MQTT 訊息,可以採用多執行緒的方式來處理,在連線至 MQTT 伺服器之後,以 loop_start() 啟動背景的 loop 執行緒,負責接收與處理 MQTT 訊息,而主執行緒則可以負責產生資料,並發送 MQTT 訊息至 MQTT 伺服器:

# 多執行緒指令稿 multithread.py
import paho.mqtt.client as mqtt
import time, datetime

def on_connect(client, userdata, flags, rc):
    client.subscribe("hello/world")

def on_message(client, userdata, msg):
    print("[{}]: {}".format(msg.topic, str(msg.payload)))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set("myuser","mypassword")
client.connect("mqtt.example.com", 1883)

# 啟動 loop 執行緒
client.loop_start()

# 主執行緒工作
while True:
    # 產生資料
    msg = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')

    # 發送 MQTT 訊息
    client.publish("hello/world", msg)

    time.sleep(1)

這段指令稿在執行之後,就會每隔一秒發送包含時間的 MQTT 訊息,同時由背景的 loop 執行緒將這些訊息收回來,在實務上我們可以在主執行緒中收取感測器的資料,將處理好的資料連同時間一起發送出去。

loop_start() 所發起的背景 loop 執行緒,可以藉由呼叫 loop_stop() 將其終止。

參考資料

Share
Published by
Office Guide

Recent Posts

Python 使用 PyAutoGUI 自動操作滑鼠與鍵盤

本篇介紹如何在 Python ...

1 年 ago

Ubuntu Linux 以 WireGuard 架設 VPN 伺服器教學與範例

本篇介紹如何在 Ubuntu ...

1 年 ago

Linux 網路設定 ip 指令用法教學與範例

本篇介紹如何在 Linux 系...

1 年 ago

Linux 以 Cryptsetup、LUKS 加密 USB 隨身碟教學與範例

介紹如何在 Linux 系統中...

1 年 ago