今回は
MQTT(Message Queuing Teremetory Transport)のお勉強のためローカル環境で、publisher/broker/subscriberを構成して、メッセージの送受信確認をやってみました。
準備
機器構成
Win10上のWSL2(debian)とpi4b(debian)を同一ネットワーク上に接続して環境を構成した。pub/subのプログラムはお手軽さを優先してpythonにしました。
pi4b(broker)
brokerには、mosquittoを使います。debianだとsudo apt-get install mosquittoでインストールとサービス起動ができます。brokerの仕込みはこれだけです。ポート1883で動いてます。
wsl2(pub/sub)
Win10のwsl2はdebianです。pythonのmqttライブラリは、paho-mqttというのがメジャーなようです。pip3 install paho-mqttでインストールできます。
動作確認
broker(pi4b)
mosquittoがサービスとして動いていれば、他にやることはありません。ポート1883を待ち受けて仕事してくれます。
pi@pi4b:~ $ systemctl status mosquitto.service
mosquitto.service - Mosquitto MQTT v3.1/v3.1.1 Broker
Loaded: loaded (/lib/systemd/system/mosquitto.service; enabled; vendor preset
Active: active (running) since Thu 2021-06-17 08:59:19 JST; 1h 21min ago
Docs: man:mosquitto.conf(5)
man:mosquitto(8)
Main PID: 2101 (mosquitto)
Tasks: 1 (limit: 4915)
CGroup: /system.slice/mosquitto.service
└─2101 /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
6月 17 08:59:19 pi4b systemd[1]: Starting Mosquitto MQTT v3.1/v3.1.1 Broker...
6月 17 08:59:19 pi4b systemd[1]: Started Mosquitto MQTT v3.1/v3.1.1 Broker.
pub/sub(pi4b)
publisherとsubscriberのpythonコードをそれぞれ作成します。データを入れる箱をtopicsと言うらしいですが"test/001"という名前にしておきます。
送信データは、提携メッセージに数字を1つづつ増加させたものを引っ付けてpubします。
publisher側の状況
publish: 3146
publish: 3147
publish: 3148
publish: 3149
publish: 3150
publish: 3151
publish: 3152
publish: 3153
publish: 3154
publish: 3155
subscriber側の状況
Received message 'b'Hello, from paho_sub!3138'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3139'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3140'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3141'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3142'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3143'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3144'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3145'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3146'' on topic 'test/001' with QoS 0
Received message 'b'Hello, from paho_sub!3147'' on topic 'test/001' with QoS 0
subscriberを増やしても、publisherは影響を受けないというのがmqttの特徴だそうです。subscribe.pyを2つ動かすと確かに同じメッセージを受け取ることができます。
VScode上にbashを2つ起動して、sub.pyを2つ動かした状況です。ちょいネタ:mqtt携帯アプリ
ちょっと蛇足ですが。androidアプリにも「mqtt client」というのがあるので試しにインストールしてみました。WiFiオンにしてローカルネットワークに接続し、brokerのIPアドレスとtopics名"test/001"を設定するとWin10のdebianからpublishされた文字列がsubscribeできます。またこの端末から任意の文字列をpublishも出来ることを確認しました。へーーー。
subscribeのスクリーンショットです。今日のまとめ
MQTTのお勉強のため、片方向の送受信確認をしました。送受信データを使ってモータ制御などにも応用できるようです。倒立ロボのパラメータ送信や状態確認に使えると嬉しいのですが。更にマイコンからの確認や送受信を往復できるように引き続き詰めていきたいと思います。
参考リンク
#
# pahoを使ったpublisher
#
import paho.mqtt.client as mqtt # MQTTのライブラリをインポート
from time import sleep # 3秒間のウェイトのために使う
BROKER_HOST = "x.x.x.x" # brokerのIPアドレス
# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
print("Connected with result code " + str(rc))
# ブローカーが切断したときの処理
def on_disconnect(client, userdata, flag, rc):
if rc != 0:
print("Unexpected disconnection.")
# publishが完了したときの処理
def on_publish(client, userdata, mid):
print("publish: {0}".format(mid))
# メイン関数 この関数は末尾のif文から呼び出される
def main():
client = mqtt.Client() # クラスのインスタンス(実体)の作成
client.on_connect = on_connect # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect # 切断時のコールバックを登録
client.on_publish = on_publish # メッセージ送信時のコールバック
client.connect(BROKER_HOST, 1883, 60) # 接続先とポート番号
# 通信処理スタート
client.loop_start() # subはloop_forever()だが,pubはloop_start()で起動だけさせる
# 永久に繰り返す
count = 1
while count > 0:
client.publish("test/001","Hello, from paho_sub!" + str(count)) # トピック名とメッセージを決めて送信
count+=1
sleep(2) # 3秒待つ
if __name__ == '__main__': # importされないときだけmain()を呼ぶ
main() # メイン関数を呼び出す
-------
# # pahoを使ったsubscriber # import paho.mqtt.client as mqtt # MQTTのライブラリをインポート BROKER_HOST = "x.x.x.x"
# brokerのIPアドレス
# ブローカーに接続できたときの処理 def on_connect(client, userdata, flag, rc): print("Connected with result code " + str(rc)) # 接続できた旨表示 client.subscribe("test/001") # subするトピックを設定 # ブローカーが切断したときの処理 def on_disconnect(client, userdata, flag, rc): if rc != 0: print("Unexpected disconnection.") # メッセージが届いたときの処理 def on_message(client, userdata, msg): # msg.topicにトピック名が,msg.payloadに届いたデータ本体が入っている print("Received message '" + str(msg.payload) + "' on topic '" + msg.topic + "' with QoS " + str(msg.qos)) # MQTTの接続設定 client = mqtt.Client() # クラスのインスタンス(実体)の作成 client.on_connect = on_connect # 接続時のコールバック関数を登録 client.on_disconnect = on_disconnect # 切断時のコールバックを登録 client.on_message = on_message # メッセージ到着時のコールバック client.connect(BROKER_HOST, 1883, 60) # 接続先とポート番号 client.loop_forever() # 永久ループして待ち続ける