Paho-MQTT是由Eclipse基金会开发的开源Python MQTT客户端。Paho-MQTT可以在任何支持Python的设备上运行。在本教程中,我们将使用 Paho 构建一个 MQTT 客户端。我将把库的每个功能添加到客户端程序中,并解释它是如何工作的。在本教程结束时,您将对库的工作原理有一个基本的了解。

如果您不熟悉 MQTT,最好先学习我的上一篇《MQTT基础知识及工作原理》

0. 安装 Python MQTT 客户端 Paho-MQTT

Paho MQTT 需要 Python 版本 3.4+。要进行安装,请打开主机或终端,然后输入:

pip install paho-mqtt

1. 建立与 MQTT 代理的连接

本教程将使用 Eclipse 提供的公共 MQTT 代理。

警告: 不要在生产环境中使用公有 MQTT 代理。

要在PC上运行代理,您可以安装mosquitto:
在Windows和Ubuntu/Debian上安装Mosquitto

建立与 MQTT 代理的连接

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)

这是怎么回事?

  1. 我们导入 paho 库,并将代理地址设置为 ,将端口号设置为 。iot.eclipse.org1883

1883是 MQTT 中所有未加密连接的缺省端口号。

  1. 我们创建一个 MQTT 客户端对象并调用它。我们将在下一节中看到有关 paho 客户端对象的更多信息。client

  2. 接下来,我们使用代理的地址和端口号调用函数。connect()

如果连接成功,该函数将返回 0。connect()

让我们分解一下客户端对象:

1.1 客户端对象

该对象创建 MQTT 客户机。它需要4个可选参数:client()

client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

client_id是客户端在连接到代理时给出的唯一字符串。如果您未提供客户端 ID,代理将向客户端分配一个客户端 ID。

每个客户端必须具有唯一的客户端 ID。代理使用客户机 ID 来唯一标识每个用户。如果使用相同的客户端 ID 连接第二个客户端,则第一个客户端将断开连接。

clean_session是默认设置为True的布尔值。

如果设置为False,则代理存储有关客户端的信息。
如果设置为True,代理将删除有关客户端的所有存储信息。
要了解干净会话,请参阅: MQTT 中的干净会话说明

userdata是可以作为参数发送到回调的数据。有关回调的更多信息,请参见第 4 节。

protocol可以是其中之一,也可以取决于您要使用的版本。MQTTv31MQTTv311

transport缺省值为 。如果要通过WebSockets发送消息,请设置为 。tcpwebsockets

为了干净地断开与代理的连接,我们可以使用函数。disconnect()

2. 发布消息

要发布消息,我们使用该函数。该函数采用 4 个参数:publish()

publish(topic, payload=None, qos=0, retain=False)

topic是包含主题名称的字符串。

主题名称区分大小写。

payload是一个字符串,其中包含将发布到主题的消息。订阅该主题的任何客户端都将看到有效负载消息。

这些是可选参数:

qos为 0、1 或 2。它默认为 0。服务质量是保证收到消息的级别。
要了解不同的 MQTT 服务级别质量,请参阅此帖子
: MQTT QoS 级别说明

retain是默认为False的布尔值。如果设置为 True,则它告诉代理将该主题上的该消息存储为"最后一条好消息"。
要了解 MQTT 中的消息保留,请参阅以下内容:MQTT 保留的消息

将发布函数添加到我们的代码中:

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)
client.publish(topic="TestingTopic", payload="TestingPayload", qos=0, retain=False)要检查消息是否已成功发布到某个主题,我们需要一个订阅该主题的客户端。## 3. 订阅主题要订阅主题,我们需要该函数。该函数采用 2 个参数subscribe()```powershell
subscribe("topicName", qos=0)

topic是包含主题名称的字符串。

注意:主题名称区分大小写。

qos为 0、1 或 2。它将降级为 0。

让我们修改我们的程序以订阅要发布到的主题:

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)
注意:如果您希望客户端订阅多个主题,则可以将它们放在元组列表中。示例:client.subscribe([('topicName1', 1),('topicName2', 1)])元组的格式为 [(主题名称,QoS 级别)]

当您执行此程序时,您会注意到已发布的消息仍未显示在控制台/终端上。

订阅某个主题会告诉代理向您发送发布到该主题的消息。我们已经订阅了该主题,但我们需要一个回调函数来处理这些消息。

  1. 回调
    回调是在事件发生时执行的函数。在 paho 中,这些事件包括连接、断开连接、订阅、取消订阅、发布、接收消息、日志记录。

在对程序实现回调之前,我们需要首先了解程序如何调用这些回调。为此,我们将使用 paho 中可用的不同循环函数。

4.1 循环函数

循环是客户端对象的函数。当客户端收到消息时,该消息将存储在接收缓冲区中。当要将消息从客户端发送到代理时,该消息将存储在发送缓冲区中。循环函数用于处理缓冲区中的任何消息并调用相应的回调函数。它们还将尝试在断开连接时重新连接到代理。

大多数循环函数都是异步运行的,这意味着当调用循环函数时,它将在单独的线程上运行。

有 3 种类型的循环函数:

4.1.1 loop_forever():这是一个阻塞循环函数。这意味着循环函数将继续运行,并且在调用此函数后无法执行任何其他行。如果要无限期地运行程序,并且程序中有单个客户端构造函数,请使用此功能。

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

4.1.2 loop_start()/loop_stop(): 这是一个非阻塞循环函数,这意味着你可以调用这个函数,并在函数调用后继续执行代码。顾名思义,loop_start() 用于启动循环函数,loop_stop() 用于停止循环函数。如果需要在同一程序中创建多个客户端对象,则可以使用 loop_start()。

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_start()
#Some Executable Code Here
client.loop_stop()
#Client Loop Stops After loop_stop() is called

4.1.3 loop():这是一个阻塞循环函数。loop() 和 loop_forever() 之间的区别在于,如果调用 loop() 函数,则必须手动处理重新连接,这与后者不同。loop_forever() & loop_start() 函数将在断开连接时自动尝试重新连接到代理。除非在特殊情况下,否则不建议使用 loop()。

现在回到回调。在下一节中,我们将在程序中实现每个回调。

4.2 on_connect

每次客户端连接/重新连接到代理时都会调用回调。让我们将回调添加到我们的程序中。on_connect()

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):print("Connected With Result Code: {}".format(rc))  def on_disconnect(client, userdata, rc):print("Client Got Disconnected")  client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

必须将回调分配给客户端对象。如果不是,则回调将不会执行

这是怎么回事?

我们创建on_connect回调函数。它需要4个参数:客户端对象,用户数据,标志,rc。
对象。client
userdata是在客户端构造函数中声明的自定义数据。如果要将自定义数据传递到回调中,则需要它。
flags是一个字典对象,用于检查是否已将客户端对象的 clean 会话设置为 True 或 False。
rc这是result code,用于检查连接状态。不同的结果代码是:

0:连接成功 1:连接被拒绝 – 协议不正确 版本 2:连接被拒绝 – 客户端标识符无效 3:连接被拒绝 – 服务器不可用 4:连接被拒绝 – 用户名或密码错误 5:连接被拒绝 – 未授权 6-255:当前未使用。

该程序的输出是:

Connected With Result Code 0

这意味着连接成功。

4.3 on_disconnect
当客户端与代理断开连接时,将调用回调。on_disconnect()

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):print("Connected With Result Code " (rc))  def on_disconnect(client, userdata, rc):print("Client Got Disconnected")  client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

不要忘记将回调函数分配给客户端对象!
client.on_disconnect = on_disconnect

运行该程序,断开互联网连接,然后查看是否打印了消息"客户端已断开连接"。此外,重新连接到互联网以查看客户端是否重新连接到代理。

可以将多个客户端对象附加到单个回调函数。当您的程序连接到多个代理时,这很有用。

4.4 on_message

回到尽管订阅了主题但仍未显示消息的问题:回调用于处理发布到已订阅主题的消息。on_message()

在我们的程序中,我们需要做3件事:1.
订阅我们要发布的主题。
2. 使用回调处理已发布的消息。在我们的程序中,我们将简单地打印消息。
3. 将回调函数分配给客户端对象的属性。on_message

让我们创建另一个 mqtt 客户端。此新程序将向已订阅现有程序的主题发布消息。

sub.py(接收客户端)

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):print("Connected With Result Code "+rc)  def on_message(client, userdata, message):print("Message Recieved: "+message.payload.decode())  client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

pub.py(发布客户端)

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)

on_message() 回调有 3 个参数:、 & 。我已经解释了本节中的前两个。
该对象有 4 个属性:、 、 、 。clientuserdatamessageon_connect()messagetopicpayloadqosretain

如果您注意到对象的每个属性也是我们在客户端函数中使用的参数。messagepublish()

先运行,然后运行 。输出将显示消息和主题。sub.pypub.py

如果邮件未打印出来,请检查是否:

您已将该函数添加到客户端对象的属性中。on_message()
如果您已订阅 了 中的回调中的主题。on_connect()sub.py
如果已分别在 和 函数中正确输入主题名称。subscribe()publish()sub.pypub.py
如果要在函数中打印消息。on_message()
如何组织和处理消息?

如果您从不同的主题收到多条消息,并且需要以不同的方式处理每个主题的消息,那么我们必须对消息进行排序。

执行此操作的一种方法是使用该属性检查将消息发布到哪个主题。然后,您可以创建 if 条件来相应地处理消息。message.topic

更好的方法是使用对象的功能。这将创建多个回调来处理来自不同主题的消息。
参数采用主题名称,参数采用将处理该主题消息的回调的名称。message_callback_add(sub, callback)clientsubcallback

例:

sub.py

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):print("Connected With Result Code " (rc))  def on_message_from_kitchen(client, userdata, message):print("Message Recieved from Kitchen: "+message.payload.decode())  def on_message_from_bedroom(client, userdata, message):print("Message Recieved from Bedroom: "+message.payload.decode())  def on_message(client, userdata, message):print("Message Recieved from Others: "+message.payload.decode())  client = mqtt.Client()
client.on_connect = on_connect
#To Process Every Other Message
client.on_message = on_message
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)
client.subscribe("KitchenTopic", qos=1)
client.subscribe("BedroomTopic", qos=1)
client.message_callback_add("KitchenTopic", on_message_from_kitchen)
client.message_callback_add("BedroomTopic", on_message_from_bedroom)  client.loop_forever()

在向其添加回调之前,请确保您已订阅这两个主题。

默认情况下,来自其他主题的消息将由 on_message() 处理

您可以通过发布到"KitchenTopic"和"卧室主题"并查看它是否单独处理来尝试上述程序。

4.5 on_publish

执行函数时调用该函数。这将返回一个元组 。on_publish()publish()(result, mid)

result是错误代码。错误代码 0 表示消息已成功发布。

mid代表消息 ID。它是一个整数,是客户端分配的唯一消息标识符。如果使用 QoS 级别 1 或 2,则客户端循环将使用 标识尚未发送的消息。mid

4.6 on_subscribe()/on_unsubscribe()

当客户端订阅主题时,将调用回调。例:on_subscribe()on_subscribe(client, userdata, mid, granted_qos)

当客户端取消订阅某个主题时,将调用回调。例:on_unsubscribe()on_unsubscribe(client, userdata, mid)

mid是函数中讨论的消息 id。on_publish()

granted_qos是订阅时该主题的 qos 级别。如果需要,您可以在程序中尝试一下。

5. 其他有用的 Paho-MQTT 功能

Paho还有一些有用的函数,可以在程序中使用它们来执行函数,而不必创建客户端构造函数并经历创建回调的麻烦。

5.1 单() / 多() 发布

单个或多个函数用于将单个消息或多条消息发布到代理上的主题,而无需创建客户机对象。

single(topic, payload=None, qos=0, retain=False, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

topic是唯一必需的参数。这是一个包含主题名称的字符串。

auth是包含用户名和密码的字典(如果代理需要)。(eclipse 代理不需要身份验证)。
例:auth = {‘username’:“username”, ‘password’:""}

tls如果我们使用 TLS/SSL 加密,则需要。
例:dict = {‘ca_certs’:"", ‘certfile’:"", ‘keyfile’:"", ‘tls_version’:"", ‘ciphers’:"<ciphers">}

transport接受 2 个值:和 。如果您不想使用websockets,请将其设置为 。websocketstcptcp

其他参数类似于客户端对象中的参数。

multiple(msgs, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

msgs是必需的参数。它包含要发布的消息的列表。每条消息都可以是字典或元组。

字典必须采用以下格式:
msg = {‘topic’:"< topicname >", ‘payload’:"", ‘qos’:‘0’, ‘retain’:‘False’}

元组必须采用以下格式:
("< topicname >", “< payload >”, qos, retain)

在这两种格式中,名称都必须存在。topic

5.2 简单() / 回调()

simple 是一个阻止函数,它订阅一个主题或一个主题列表,并返回发布到该主题的消息。

simple(topics, qos=0, msg_count=1, retained=False, hostname=“localhost”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

topics是唯一必需的参数。这可以是单个主题的字符串,也可以是多个主题的列表。

msg_count是在断开与代理的连接之前必须返回的消息数。对于 msg_count > 1,该函数将返回消息列表。

其他参数已经在 single() / multiple() 部分中进行了解释。

callback与 相似,唯一的区别是它需要一个额外的参数,即回调。简单函数仅返回来自主题的消息,回调函数将返回的消息发送到任何函数进行处理。simple
callback(callback, topics, qos=0, userdata=None, hostname=“iot.eclipse.org”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

6. Will_set()

这是一个非常有用的功能。当客户端连接到代理时,它会告诉代理,如果它断开连接,它必须向主题发布消息。它是客户端构造函数的一个属性。

client.will_set(topic, payload=None, qos=0, retain=False)

要了解如何以及何时在MQTT中使用Last Will & Testament,请参阅这篇文章
:MQTT Last Will and Testament(用示例解释)

您的反馈对我来说非常重要。如果本教程对您有所帮助,或者,如果文章中的某些部分有错误或解释不正确,请在下面的评论中告诉我。

使用 Python MQTT 客户端 Paho-MQTT 的初学者指南相关推荐

  1. MQTT客户端paho.mqtt.XXX

    1. MQTT客户端C代码库 C语言库:https://github.com/eclipse/paho.mqtt.c 1.1 C源码下载构建 # centos7 OS 方法一 $ git clone ...

  2. MQTT 客户端收发 MQTT 消息

    本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源创建.环境准备.示例代码.注意事项等. 注意: 本文给出的实例均基于 Eclipse Paho J ...

  3. android paho框架,Android Mqtt 客户端paho使用心得

    Android mqtt 客户端实现一般使用以下两个库: implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1 ...

  4. MQTT与paho.mqtt

    MQTT协议 采用发布/订阅消息模式 使用TCP/IP提供网络连接 三种消息发布服务质量QoS,消息推送的原则,服务器维护难度递增 至多一次 至少一次 只有一次 主题的分割符与通配符 分割符" ...

  5. MQTT客户端 Paho Java 使用

    文章目录 01.maven 依赖 02.代码 1-publisher 发布者 2-订阅者 subscriber 01.maven 依赖 <dependency><groupId> ...

  6. Qt实现mqtt客户端和mqtt服务器搭建

    下载qtmqtt源码 下载地址:https://github.com/qt/qtmqtt,选择跟自己使用Qt版本一致的分支 使用qmake编译qtmqtt源码 编译环境:Ubuntu1804+Qt5. ...

  7. Paho MQTT Python客户端常用API、安装与使用

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介. Paho 是Eclipse的开源 MQTT 客户端项目,提供 ...

  8. paho | 支持10种语言编写mqtt客户端,总有一款适合你!

    1. 轻量级物联网协议 - MQTT MQTT全称 Message Queuing Telemetry Transport,即消息队列遥测传输协议,是一种基于发布/订阅(publish/subscri ...

  9. C语言基于paho实现MQTT客户端实战案例

    C语言基于paho实现MQTT客户端实战案例 目标 说明 项目代码 make文件 mqttClient.pro 项目入口 main.c 链表 list.h 消息队列 queue.h 消息队列 queu ...

  10. MQTT协议学习:3、MQTT客户端实例

    MQTT协议学习:3.MQTT客户端实例 文章目录 MQTT协议学习:3.MQTT客户端实例 1. 前言 2. Paho MQTT (1). Go客户端实例 (2). Python客户端实例 (3). ...

最新文章

  1. 多种树,兔子才会撞上来
  2. html怎么让图标动起来,让ICON生动起来 纯CSS实现带动画的天气图标
  3. 微生物 研究_微生物监测如何工作,为何如此重要
  4. web前端开发技术期末考试_智慧树来我校开展WEB前端开发微专业导学
  5. linux 内核重定位,Linux 内核学习笔记:预备知识之“目标文件”
  6. finereport报表设计中模板数据集的sql语句中if的用法_报表工具中动态参数的灵活运用...
  7. 【QA5】【mysql问题】ERROR 1045 (28000): Access denied for...
  8. C语言 signal
  9. 方正飞腾4.0视频教程
  10. 9月,重磅推出Linux、数据结构、领域驱动等10本程序员新书
  11. Sam Altman专访:GPT-4没太让我惊讶,ChatGPT则让我喜出望外
  12. 什么东西可以改善睡眠,这些东西应该能帮到你
  13. 汽车维修店如何挖掘潜在客户资源
  14. 真的不建议学Python,煞笔才学习Python,学Python难?两个小时足够搞定
  15. 7种常见分布的数学期望及其证明
  16. v5行为验证使用介绍(三)- 程序接入流程
  17. 博士毕业选择回老家县城大专任教!事业编、副教授待遇、外加几十万安家费......
  18. DB2ADVIS returning error -220
  19. python中a除以b_Python中的除法
  20. Spark:大数据的电花火石!

热门文章

  1. MYSQL数据库同步工具
  2. 操作高通QXDM5,点击重置按钮出现报错
  3. LSTM神经网络和GRU
  4. ISO9000 质量管理和质量保证系列国际标准
  5. 一字之差——手机中的“拼”音输入法和“注”音输入法
  6. 学前教育怎么利用计算机思维,乐高教育全新推出编程启蒙小火车锻炼孩子计算机思维...
  7. iMX8 Android SDK 下载
  8. linux efi分区安装grub2,编译UEFI版本Grub2引导多系统文件efi
  9. iec611313标准下载_IEC 61730-1-2016
  10. 领睿s1pro的黑苹果EFI及黑苹果教程