Eclipse Paho MQTT Python Client 使用手册
Eclipse Paho MQTT Python Client 使用手册
原文地址:https://www.cooooder.com/archives/20210303
目录
- 介绍
- 环境
- 准备
- 快速开始
- 常用API
- Client
- 回调函数
- 示例
介绍
paho.mqtt.python 是一个MQTT客户端python库,能够让应用程序简单方便的连接到MQTT代理进行消息发布、订阅主题和消息接收。
目前 paho.mqtt.python-1.5.1 版本支持5.0、3.1.1和3.1 MQTT协议,同时支持Python 2.7.9+或3.5+。
环境
- MQTT代理:EMQ X Broker 4.2.6
- Python 3.9.0
- paho-mqtt 1.5.1
准备
参照 EMQ X Broker安装启动教程
成功启动EMQ后,可通过浏览器访问 http://localhost:18083 admin/public 进入EMQ控制台,在【工具 > Websocket】模块可方便进行客户端连接、订阅、消息接收、发布等测试和调试工作Python 安装省略
paho-mqtt 安装
pip install paho-mqtt
快速开始
Python快速实现MQTT主题订阅和消息接收
import paho.mqtt.client as mq_ttdef on_connect(client, userdata, flags, rc):"""回调函数:当MQTT代理响应客户端连接请求时触发:param client: 回调返回的客户端实例:param userdata: Client()或user_data_set()中设置的私有用户数据:param flags: MQTT代理发送的响应标识:param rc: 连接结果0:连接成功1:连接被拒绝 - 协议版本2: 连接被拒绝 - 客户端标识符无效3:连接被拒绝 - 服务器不可用4:连接被拒绝 - 用户名或密码错误5:连接被拒绝 - 未授权6-255:当前未使用:return:"""print("Connected with result code "+str(rc))# 在on_connect()中进行消息订阅,是因为如果丢失连接进行重连,主题也会重新被订阅client.subscribe("testTopic/#")def on_message(client, userdata, message):"""回调函数:当接收到MQTT代理发布的消息时触发:param client: 回调返回的客户端实例:param userdata: Client()或user_data_set()中设置的私有用户数据:param message: MQTTMessage的一个实例,这是一个包含主题,有效负载,qos,retain的类:return:"""print(message.topic+" "+str(message.payload))mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.on_connect = on_connect
mq_client.on_message = on_message
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)# 阻塞式自动处理收发数据、自动处理重新连接,所有的数据处理逻辑都在预先设定好的回调函数中进行的
mq_client.loop_forever()
在 EMQ X Broker - 【客户端】可以看到客户端已连接
查看原图
在 EMQ X Broker - 【Websocket】发布testTopic主题消息
查看原图
Python程序打印出接收到的消息
Connected with result code 0
testTopic b'{ "msg": "Hello, World!" }'
testTopic b'{ "msg": "Hello, World2!" }'
常用API
Client
Client类实例常规用法流程如下:
- 创建一个客户端实例
- 使用任一 connect*() 方法连接到MQTT代理
- 调用任一 loop*() 方法保持与MQTT代理通讯
- 使用 subscribe() 方法订阅一个主题并接收消息
- 使用 publish() 方法向MQTT代理发布消息
- 使用 disconnect() 中断与MQTT代理的连接
client()
# 构造方法
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
- client_id
- 连接到MQTT代理时使用的唯一客户端ID字符串。如果为0或者为None,将随机生成分配一个,这种情况下clean_session参数必须为True
- clean_session
- 布尔值类型,用来确定客户端类型。如果为True,当断开连接时,MQTT代理将移除该客户端的所有信息;如果为False,客户端则为持久客户端,当断开连接时,订阅信息和消息队列将被MQTT保存
- 当断开连接时,客户端不会丢弃自己发送的消息。调用 connect() 或者 reconnect() 将导致重新发送消息,只有使用 reinitialise() 可以将客户端重置为初始状态
- userdata
- 用户定义的任意类型数据作为 userdata 参数传递给回调函数,可以通过调用user_data_set() 方法进行更新,不过会有点延迟
- protocol
- 客户端使用的MQTT协议版本,可以是 MQTTv31 或 MQTTv311
- transport
- 传输形式,设置为websockets,则会通过websockets发送给MQTT,默认tcp
- 示例
import paho.mqtt.client as mqttmqttc = mqtt.Client()
connect()
connect(host, port=1883, keepalive=60, bind_address="")
客户端连接MQTT代理,这是一个阻塞函数
- host
- 代理的主机名或者IP地址
- port
- 连接服务的端口,默认1883
- keepalive
- 心跳检测时长
- bind_address
- 绑定此客户端本地网络的IP地址
- 回调函数
- on_connect()
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")
与 loop_start() 结合使用以非阻塞的形式进行连接,在调用 loop_start() 之前,连接不会完成
disconnect()
disconnect()
彻底与MQTT代理断开,使用该方法断开连接不会让代理发送遗嘱消息
- 回调函数
- on_disconnect()
enable_logger()
enable_logger(logger=None)
使用标准的Python日志包启用日志记录,可以与on_log回调方法同时使用
reconnect()
reconnect()
使用之前的信息配置重新连接代理,在调用之前必须先调用 connect*() 方法
reinitialise()
重置客户端为初始化状态,参数与 client() 一致
- 示例
mqttc.reinitialise()
loop()
loop(timeout=1.0, max_packets=1)
定期调用处理事件
- timeout
- 最大阻塞的秒数
- max_packets
- 已过期,不设置
- 示例
while True:mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)
这些函数实现了网络循环的线程接口,在执行connect*()之前或者之后调用一次 loop_start() ,后台会自动运行一个线程调用 loop() ,这样就释放了主线程去执行其它工作,避免发生阻塞,这个调用也处理重新连接到代理。调用 loop_stop() 停止后台线程
mqttc.connect("127.0.0.1")
mqttc.loop_start()while True:mqttc.publish("topicTest", 'test')
loop_forever()
阻塞式网络循环处理事件,直到客户端调用 disconnect() 才会返回,它会自动重连
publish()
publish(topic, payload=None, qos=0, retain=False)
客户端向MQTT代理发送一条消息
- topic
- 消息发布的主题,不能为None或者空字符
- payload
- 发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
- qos
- 消息的服务质量等级,必须为0 or 1 or 2
- retain
- 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
- 回调函数
- on_publish()
Return MQTTMessageInfo对象
reconnect_delay_set()
reconnect_delay_set(min_delay=1, max_delay=120)
断开连接后,客户端将自动尝试连接,每次尝试间隔 [min_delay, max_delay] 秒,从min_delay开始逐渐加倍至max_delay,连接成功后,延迟重置为min_delay
subscribe()
subscribe(topic, qos=0)
订阅一个或多个主题,该方法有三种不同的调用方式:
# 1. 字符串和整数
subscribe("my/topic", 2)
# 2. 字符串和整数元组
subscribe(("my/topic", 1))
# 3. 字符串和整数元组的列表
# 单次调用多个主题,比多次调用subscribe更有效
subscribe([("my/topic", 0), ("another/topic", 2)])
Return 一个元组 (result, mid)
- result
- 成功:MQTT_ERR_SUCCESS
- 失败:(MQTT_ERR_NO_CONN, None)
- mid
- 消息ID
- 回调函数
- on_subscribe()
unsubscribe()
unsubscribe(topic)
取消一个或多个主题
- topic
- 主题字符串或者字符串列表
Return 一个元组 (result, mid)
- 主题字符串或者字符串列表
- result
- 成功:MQTT_ERR_SUCCESS
- 失败:(MQTT_ERR_NO_CONN, None)
- mid
- 消息ID
- 回调函数
- on_unsubscribe()
user_data_set()
user_data_set(userdata)
设置传递给回调函数的用户私有数据
username_pw_set()
username_pw_set(username,password = None)
设置用户名和密码(可选)供MQTT代理验证,必须在 connect*() 之前调用
will_set()
will_set(topic, payload=None, qos=0, retain=False)
设置遗嘱发送给MQTT代理,如果客户端在没有调用 disconnect() 的情况下断开连接,则MQTT代理将会代表它发送该消息
- topic
- 遗嘱消息发布的主题,不能为None或者空字符
- payload
- 遗嘱发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息作为遗嘱。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
- qos
- 遗嘱消息的服务质量等级,必须为0 or 1 or 2
- retain
- 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
回调函数
on_connect()
on_connect(client, userdata, flags, rc)
MQTT代理响应客户端连接请求时( connect*() )调用
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- flags
- MQTT代理发送的响应标识
- rc
- 连接结果
- 0:连接成功
- 1:连接被拒绝 - 协议版本
- 2:连接被拒绝 - 客户端标识符无效
- 3:连接被拒绝 - 服务器不可用
- 4:连接被拒绝 - 用户名或密码错误
- 5:连接被拒绝 - 未授权6-255:当前未使用
- 连接结果
- 示例
def on_connect(client, userdata, flags, rc):print("Connected with result code "+str(rc))mqttc.on_connect = on_connect
on_disconnect()
当客户端与MQTT代理断开连接时 (disconnect()) 调用
on_disconnect(client, userdata, rc)
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- rc
- 断开结果,如果是 MQTT_ERR_SUCCESS(0),则是响应disconnect()调用
- 如果是其它值,则是意外关闭
- 示例
def on_disconnect(client, userdata, rc):if rc != 0:print("Unexpected disconnection.")mqttc.on_disconnect = on_disconnect
on_message()
on_message(client, userdata, message)
在客户端收到已订阅主题的消息,并且该消息没有被主题过滤器 message_callback_add() 匹配时调用
- client
- 回调返回的客户端实例
- userdata
- Client() 或 user_data_set() 中设置的私有用户数据
- message
- MQTTMessage实例,包含 topic、payload、qos、retain
- 示例
def on_message(client, userdata, message):print("Received message '" + str(message.payload) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))mqttc.on_message = on_message
message_callback_add()
message_callback_add(sub, callback)
定义特定订阅主题传入的消息回调,包括通配符,比如:客户端订阅了 sensor/#主题,一个回调处理 sensor/temperature,另一个回调处理 sensor/humidity
- sub
- 待过滤的主题,只能定义一个回调
- callback
- 回调函数,与 on_message() 相同形式
- 示例
# 处理温度消息回调
def temperature_callback(client, userdata, message):print(message.topic+" "+str(message.payload))# 处理湿度消息回调
def humidity_callback(client, userdata, message):print(message.topic+" "+str(message.payload))mqttc.subscribe('sensor/#')
mqttc.message_callback_add('sensor/temperature', temperature_callback)
mqttc.message_callback_add('sensor/humidity', humidity_callback)
message_callback_remove()
message_callback_remove(sub)
删除先前注册的主题/订阅特定回调
on_publish()
on_publish(client, userdata, mid)
当客户端调用 publish() 发布一条消息至MQTT代理后调用。Qos=1或2时,意味着客户端和代理完成握手,Qos=0时,仅表示消息离开客户端。
- mid
- mid变量与从相应的 publish() 返回的mid变量匹配,以允许跟踪传出的消息。
即使 publish() 调用返回,也不总意味着消息已发送
on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)
当MQTT代理响应订阅请求时被调用
- mid
- mid变量匹配从相应的 subscribe() 返回的mid变量
- granted_qos
- 整数列表,它提供了代理为每个不同的订阅请求授予的QoS级别
on_unsubscribe()
on_unsubscribe(client, userdata, mid)
当代理响应取消订阅请求时调用
- mid
- mid匹配从相应的 unsubscribe() 返回的mid变量
on_log()
on_log(client, userdata, level, buf)
当客户端有日志信息时调用
- level
- 消息严重性
- MQTT_LOG_INFO
- MQTT_LOG_NOTICE
- MQTT_LOG_WARNING
- MQTT_LOG_ERR
- MQTT_LOG_DEBUG
- 消息严重性
- buf
- 该消息本身就在buf里
可以与标准的Python logging同时使用,通过enable_logger()方法启用
- 该消息本身就在buf里
示例
import paho.mqtt.client as mq_tt
import logginglogging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')def on_connect(client, userdata, flags, rc):print("Connected with result code "+str(rc))client.subscribe("topicTest/#")def topic_one_callback(client, userdata, message):print(message.topic+" "+str(message.payload))def topic_two_callback(client, userdata, message):print(message.topic+" "+str(message.payload))mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.enable_logger()
mq_client.on_connect = on_connect
mq_client.message_callback_add("topicTest/one", topic_one_callback)
mq_client.message_callback_add("topicTest/two", topic_two_callback)
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)mq_client.loop_start()
count=0
while True:if count == 0:print('1')count += 1
Eclipse Paho MQTT Python Client 使用手册相关推荐
- Paho MQTT Python客户端常用API、安装与使用
MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介. Paho 是Eclipse的开源 MQTT 客户端项目,提供 ...
- paho架构_GitHub - yanzhangfeng/paho-mqtt: Eclipse Paho MQTT C/C++ client for Embedded platforms
paho-mqtt 1.介绍 Paho MQTT 是 Eclipse 实现的基于 MQTT 协议的客户端,本软件包是在 Eclipse paho-mqtt 源码包的基础上设计的一套 MQTT 客户端程 ...
- paho mqtt client调试记录
官网:http://www.eclipse.org/paho/clients/c/ 编译流程: git clone https://github.com/eclipse/paho.mqtt.c.git ...
- Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt
概要 Python 使用 paho.mqtt 库,利用阿里云物联网平台的设备证书:productKey.deviceName.deviceSecret,自动合成 userName.passWord.以 ...
- paho mqtt java_MQTT之Eclipse.Paho源码(一)--建立连接
写在前面 通过上一个章节MQTT系列---Java端实现消息发布与订阅的介绍,我们已经基本构建出一个可以简单通信的MQTT生产和消费服务,并且具备基本的发布/订阅消息功能.那么从本章开始,我们将从源代 ...
- MQTT C Client实现消息推送(入门指南)
我自己搭建了博客,以后可能不太在CSDN上发博文了,https://www.qingdujun.com/ . MQTT(Message Queuing Telemetry Transport,消息队列 ...
- linux 编译mqtt静态库_编译MQTT C++ Client
nmake -f ms\nt.mak(这是静态库,动态库是ntdll.mak) nmake -f ms\nt.mak test(测试命令,如果成功则最后显示"passed all t ...
- MQTT客户端paho.mqtt.XXX
1. MQTT客户端C代码库 C语言库:https://github.com/eclipse/paho.mqtt.c 1.1 C源码下载构建 # centos7 OS 方法一 $ git clone ...
- MQTT-Eclipse paho mqtt源码分析-连接MQTT Broker
Eclipse paho mqtt源码分析 MQTT paho mqtt 源码分析 org.eclipse.paho.client.mqttv3.MqttClient MQTT MQTT(消息队列遥测 ...
最新文章
- 远程办公,为什么一直不被公司普遍接受?
- 如何实现一个连接池?一文带你深入浅出,彻底搞懂!
- 你应该知道的五种IO模型
- 乐观锁、悲观锁简单分析,回忆旧(新)知识...
- 一次 Druid 连接池泄露引发的血案!
- php 工商银行公众号支付代码_微信支付PHP SDK之微信公众号支付代码详解
- python房地产爬虫_房产中介网站爬虫实战(Python BS4+多线程)(一)
- 计算机通信网络学什么软件,通信工程专业需要用到的电脑软件有哪些
- eclipse 2018 安装html、jsp、JavaScript编辑器
- 端口已经被占用 (Port 8081 already in use)解决方法
- 法力无边的stage-0
- java 二进制 表示负数_java中的负数表示
- 字节跳动推出在线医疗App“小荷” 品牌域名或要另辟蹊径?
- C#对XML、JSON等格式的解析
- 这些常见的漏洞和修复方法你知道吗?
- 解决新版Chrome无法将单个标签页静音的问题
- win10c语言乱码修复方法,软件乱码 教你win10系统打开软件乱码的修复技巧
- 计算机网络云怎么连接网络设置方法,华为云电脑如何连网,华为云电脑使用教程...
- Xilinx Aurora 8B/10B IP核详解和仿真
- 2022-2028全球氢化镁行业调研及趋势分析报告