Eclipse Paho MQTT Python Client 使用手册

原文地址:https://www.cooooder.com/archives/20210303

目录

  • 介绍
  • 环境
  • 准备
  • 快速开始
  • 常用API
    • Client
    • 回调函数
  • 示例

介绍

paho.mqtt.python 是一个MQTT客户端python库,能够让应用程序简单方便的连接到MQTT代理进行消息发布、订阅主题和消息接收。
目前 paho.mqtt.python-1.5.1 版本支持5.0、3.1.1和3.1 MQTT协议,同时支持Python 2.7.9+或3.5+。

环境

  • MQTT代理:EMQ X Broker 4.2.6
  • Python 3.9.0
  • paho-mqtt 1.5.1

准备

  1. 参照 EMQ X Broker安装启动教程
    成功启动EMQ后,可通过浏览器访问 http://localhost:18083 admin/public 进入EMQ控制台,在【工具 > Websocket】模块可方便进行客户端连接、订阅、消息接收、发布等测试和调试工作

  2. Python 安装省略

  3. paho-mqtt 安装

pip install paho-mqtt

快速开始

Python快速实现MQTT主题订阅和消息接收

import paho.mqtt.client as mq_ttdef on_connect(client, userdata, flags, rc):"""回调函数:当MQTT代理响应客户端连接请求时触发:param client: 回调返回的客户端实例:param userdata: Client()或user_data_set()中设置的私有用户数据:param flags: MQTT代理发送的响应标识:param rc: 连接结果0:连接成功1:连接被拒绝 - 协议版本2: 连接被拒绝 - 客户端标识符无效3:连接被拒绝 - 服务器不可用4:连接被拒绝 - 用户名或密码错误5:连接被拒绝 - 未授权6-255:当前未使用:return:"""print("Connected with result code "+str(rc))# 在on_connect()中进行消息订阅,是因为如果丢失连接进行重连,主题也会重新被订阅client.subscribe("testTopic/#")def on_message(client, userdata, message):"""回调函数:当接收到MQTT代理发布的消息时触发:param client: 回调返回的客户端实例:param userdata: Client()或user_data_set()中设置的私有用户数据:param message: MQTTMessage的一个实例,这是一个包含主题,有效负载,qos,retain的类:return:"""print(message.topic+" "+str(message.payload))mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.on_connect = on_connect
mq_client.on_message = on_message
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)# 阻塞式自动处理收发数据、自动处理重新连接,所有的数据处理逻辑都在预先设定好的回调函数中进行的
mq_client.loop_forever()

在 EMQ X Broker - 【客户端】可以看到客户端已连接
查看原图

在 EMQ X Broker - 【Websocket】发布testTopic主题消息
查看原图

Python程序打印出接收到的消息

Connected with result code 0
testTopic b'{ "msg": "Hello, World!" }'
testTopic b'{ "msg": "Hello, World2!" }'

常用API

Client

Client类实例常规用法流程如下:

  • 创建一个客户端实例
  • 使用任一 connect*() 方法连接到MQTT代理
  • 调用任一 loop*() 方法保持与MQTT代理通讯
  • 使用 subscribe() 方法订阅一个主题并接收消息
  • 使用 publish() 方法向MQTT代理发布消息
  • 使用 disconnect() 中断与MQTT代理的连接
client()
# 构造方法
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
  • client_id

    • 连接到MQTT代理时使用的唯一客户端ID字符串。如果为0或者为None,将随机生成分配一个,这种情况下clean_session参数必须为True
  • clean_session
    • 布尔值类型,用来确定客户端类型。如果为True,当断开连接时,MQTT代理将移除该客户端的所有信息;如果为False,客户端则为持久客户端,当断开连接时,订阅信息和消息队列将被MQTT保存
    • 当断开连接时,客户端不会丢弃自己发送的消息。调用 connect() 或者 reconnect() 将导致重新发送消息,只有使用 reinitialise() 可以将客户端重置为初始状态
  • userdata
    • 用户定义的任意类型数据作为 userdata 参数传递给回调函数,可以通过调用user_data_set() 方法进行更新,不过会有点延迟
  • protocol
    • 客户端使用的MQTT协议版本,可以是 MQTTv31 或 MQTTv311
  • transport
    • 传输形式,设置为websockets,则会通过websockets发送给MQTT,默认tcp
  • 示例
import paho.mqtt.client as mqttmqttc = mqtt.Client()
connect()
connect(host, port=1883, keepalive=60, bind_address="")

客户端连接MQTT代理,这是一个阻塞函数

  • host

    • 代理的主机名或者IP地址
  • port
    • 连接服务的端口,默认1883
  • keepalive
    • 心跳检测时长
  • bind_address
    • 绑定此客户端本地网络的IP地址
  • 回调函数
    • on_connect()
connect_async()
connect_async(host, port=1883, keepalive=60, bind_address="")

与 loop_start() 结合使用以非阻塞的形式进行连接,在调用 loop_start() 之前,连接不会完成

disconnect()
disconnect()

彻底与MQTT代理断开,使用该方法断开连接不会让代理发送遗嘱消息

  • 回调函数

    • on_disconnect()
enable_logger()
enable_logger(logger=None)

使用标准的Python日志包启用日志记录,可以与on_log回调方法同时使用

reconnect()
reconnect()

使用之前的信息配置重新连接代理,在调用之前必须先调用 connect*() 方法

reinitialise()

重置客户端为初始化状态,参数与 client() 一致

  • 示例
mqttc.reinitialise()
loop()
loop(timeout=1.0, max_packets=1)

定期调用处理事件

  • timeout

    • 最大阻塞的秒数
  • max_packets
    • 已过期,不设置
  • 示例
while True:mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)

这些函数实现了网络循环的线程接口,在执行connect*()之前或者之后调用一次 loop_start() ,后台会自动运行一个线程调用 loop() ,这样就释放了主线程去执行其它工作,避免发生阻塞,这个调用也处理重新连接到代理。调用 loop_stop() 停止后台线程

mqttc.connect("127.0.0.1")
mqttc.loop_start()while True:mqttc.publish("topicTest", 'test')
loop_forever()

阻塞式网络循环处理事件,直到客户端调用 disconnect() 才会返回,它会自动重连

publish()
publish(topic, payload=None, qos=0, retain=False)

客户端向MQTT代理发送一条消息

  • topic

    • 消息发布的主题,不能为None或者空字符
  • payload
    • 发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
  • qos
    • 消息的服务质量等级,必须为0 or 1 or 2
  • retain
    • 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
  • 回调函数
    • on_publish()

Return MQTTMessageInfo对象

reconnect_delay_set()
reconnect_delay_set(min_delay=1, max_delay=120)

断开连接后,客户端将自动尝试连接,每次尝试间隔 [min_delay, max_delay] 秒,从min_delay开始逐渐加倍至max_delay,连接成功后,延迟重置为min_delay

subscribe()
subscribe(topic, qos=0)

订阅一个或多个主题,该方法有三种不同的调用方式:

# 1. 字符串和整数
subscribe("my/topic", 2)
# 2. 字符串和整数元组
subscribe(("my/topic", 1))
# 3. 字符串和整数元组的列表
# 单次调用多个主题,比多次调用subscribe更有效
subscribe([("my/topic", 0), ("another/topic", 2)])

Return 一个元组 (result, mid)

  • result

    • 成功:MQTT_ERR_SUCCESS
    • 失败:(MQTT_ERR_NO_CONN, None)
  • mid
    • 消息ID
  • 回调函数
    • on_subscribe()
unsubscribe()
unsubscribe(topic)

取消一个或多个主题

  • topic

    • 主题字符串或者字符串列表
      Return 一个元组 (result, mid)
  • result
    • 成功:MQTT_ERR_SUCCESS
    • 失败:(MQTT_ERR_NO_CONN, None)
  • mid
    • 消息ID
  • 回调函数
    • on_unsubscribe()
user_data_set()
user_data_set(userdata)

设置传递给回调函数的用户私有数据

username_pw_set()
username_pw_set(username,password = None)

设置用户名和密码(可选)供MQTT代理验证,必须在 connect*() 之前调用

will_set()
will_set(topic, payload=None, qos=0, retain=False)

设置遗嘱发送给MQTT代理,如果客户端在没有调用 disconnect() 的情况下断开连接,则MQTT代理将会代表它发送该消息

  • topic

    • 遗嘱消息发布的主题,不能为None或者空字符
  • payload
    • 遗嘱发送的消息内容,如果没有赋值或者赋值为None,则将使用零长度的消息作为遗嘱。传递int或者float将会被转换为该数字的字符串, 如果想发送真正的int或者float数据,使用 struct.pack() 去创建
  • qos
    • 遗嘱消息的服务质量等级,必须为0 or 1 or 2
  • retain
    • 设置为True,MQTT代理保留最后一条消息,以便分发给消息发布后的订阅者
回调函数
on_connect()
on_connect(client, userdata, flags, rc)

MQTT代理响应客户端连接请求时( connect*() )调用

  • client

    • 回调返回的客户端实例
  • userdata
    • Client() 或 user_data_set() 中设置的私有用户数据
  • flags
    • MQTT代理发送的响应标识
  • rc
    • 连接结果

      • 0:连接成功
      • 1:连接被拒绝 - 协议版本
      • 2:连接被拒绝 - 客户端标识符无效
      • 3:连接被拒绝 - 服务器不可用
      • 4:连接被拒绝 - 用户名或密码错误
      • 5:连接被拒绝 - 未授权6-255:当前未使用
  • 示例
def on_connect(client, userdata, flags, rc):print("Connected with result code "+str(rc))mqttc.on_connect = on_connect
on_disconnect()

当客户端与MQTT代理断开连接时 (disconnect()) 调用

on_disconnect(client, userdata, rc)
  • client

    • 回调返回的客户端实例
  • userdata
    • Client() 或 user_data_set() 中设置的私有用户数据
  • rc
    • 断开结果,如果是 MQTT_ERR_SUCCESS(0),则是响应disconnect()调用
    • 如果是其它值,则是意外关闭
  • 示例
def on_disconnect(client, userdata, rc):if rc != 0:print("Unexpected disconnection.")mqttc.on_disconnect = on_disconnect
on_message()
on_message(client, userdata, message)

在客户端收到已订阅主题的消息,并且该消息没有被主题过滤器 message_callback_add() 匹配时调用

  • client

    • 回调返回的客户端实例
  • userdata
    • Client() 或 user_data_set() 中设置的私有用户数据
  • message
    • MQTTMessage实例,包含 topic、payload、qos、retain
  • 示例
def on_message(client, userdata, message):print("Received message '" + str(message.payload) + "' on topic '"+ message.topic + "' with QoS " + str(message.qos))mqttc.on_message = on_message
message_callback_add()
message_callback_add(sub, callback)

定义特定订阅主题传入的消息回调,包括通配符,比如:客户端订阅了 sensor/#主题,一个回调处理 sensor/temperature,另一个回调处理 sensor/humidity

  • sub

    • 待过滤的主题,只能定义一个回调
  • callback
    • 回调函数,与 on_message() 相同形式
  • 示例
# 处理温度消息回调
def temperature_callback(client, userdata, message):print(message.topic+" "+str(message.payload))# 处理湿度消息回调
def humidity_callback(client, userdata, message):print(message.topic+" "+str(message.payload))mqttc.subscribe('sensor/#')
mqttc.message_callback_add('sensor/temperature', temperature_callback)
mqttc.message_callback_add('sensor/humidity', humidity_callback)
message_callback_remove()
message_callback_remove(sub)

删除先前注册的主题/订阅特定回调

on_publish()
on_publish(client, userdata, mid)

当客户端调用 publish() 发布一条消息至MQTT代理后调用。Qos=1或2时,意味着客户端和代理完成握手,Qos=0时,仅表示消息离开客户端。

  • mid

    • mid变量与从相应的 publish() 返回的mid变量匹配,以允许跟踪传出的消息。

即使 publish() 调用返回,也不总意味着消息已发送

on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)

当MQTT代理响应订阅请求时被调用

  • mid

    • mid变量匹配从相应的 subscribe() 返回的mid变量
  • granted_qos
    • 整数列表,它提供了代理为每个不同的订阅请求授予的QoS级别
on_unsubscribe()
on_unsubscribe(client, userdata, mid)

当代理响应取消订阅请求时调用

  • mid

    • mid匹配从相应的 unsubscribe() 返回的mid变量
on_log()
on_log(client, userdata, level, buf)

当客户端有日志信息时调用

  • level

    • 消息严重性

      • MQTT_LOG_INFO
      • MQTT_LOG_NOTICE
      • MQTT_LOG_WARNING
      • MQTT_LOG_ERR
      • MQTT_LOG_DEBUG
  • buf
    • 该消息本身就在buf里
      可以与标准的Python logging同时使用,通过enable_logger()方法启用

示例

import paho.mqtt.client as mq_tt
import logginglogging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')def on_connect(client, userdata, flags, rc):print("Connected with result code "+str(rc))client.subscribe("topicTest/#")def topic_one_callback(client, userdata, message):print(message.topic+" "+str(message.payload))def topic_two_callback(client, userdata, message):print(message.topic+" "+str(message.payload))mq_client = mq_tt.Client(client_id='www.cooooder.com')
mq_client.enable_logger()
mq_client.on_connect = on_connect
mq_client.message_callback_add("topicTest/one", topic_one_callback)
mq_client.message_callback_add("topicTest/two", topic_two_callback)
# 连接到EMQX Broker MQTT代理
mq_client.connect("127.0.0.1", 1883, 60)mq_client.loop_start()
count=0
while True:if count == 0:print('1')count += 1

Eclipse Paho MQTT Python Client 使用手册相关推荐

  1. Paho MQTT Python客户端常用API、安装与使用

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介. Paho 是Eclipse的开源 MQTT 客户端项目,提供 ...

  2. paho架构_GitHub - yanzhangfeng/paho-mqtt: Eclipse Paho MQTT C/C++ client for Embedded platforms

    paho-mqtt 1.介绍 Paho MQTT 是 Eclipse 实现的基于 MQTT 协议的客户端,本软件包是在 Eclipse paho-mqtt 源码包的基础上设计的一套 MQTT 客户端程 ...

  3. paho mqtt client调试记录

    官网:http://www.eclipse.org/paho/clients/c/ 编译流程: git clone https://github.com/eclipse/paho.mqtt.c.git ...

  4. Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt

    概要 Python 使用 paho.mqtt 库,利用阿里云物联网平台的设备证书:productKey.deviceName.deviceSecret,自动合成 userName.passWord.以 ...

  5. paho mqtt java_MQTT之Eclipse.Paho源码(一)--建立连接

    写在前面 通过上一个章节MQTT系列---Java端实现消息发布与订阅的介绍,我们已经基本构建出一个可以简单通信的MQTT生产和消费服务,并且具备基本的发布/订阅消息功能.那么从本章开始,我们将从源代 ...

  6. MQTT C Client实现消息推送(入门指南)

    我自己搭建了博客,以后可能不太在CSDN上发博文了,https://www.qingdujun.com/ . MQTT(Message Queuing Telemetry Transport,消息队列 ...

  7. linux 编译mqtt静态库_编译MQTT C++ Client

    nmake  -f  ms\nt.mak(这是静态库,动态库是ntdll.mak) nmake  -f  ms\nt.mak test(测试命令,如果成功则最后显示"passed all t ...

  8. MQTT客户端paho.mqtt.XXX

    1. MQTT客户端C代码库 C语言库:https://github.com/eclipse/paho.mqtt.c 1.1 C源码下载构建 # centos7 OS 方法一 $ git clone ...

  9. MQTT-Eclipse paho mqtt源码分析-连接MQTT Broker

    Eclipse paho mqtt源码分析 MQTT paho mqtt 源码分析 org.eclipse.paho.client.mqttv3.MqttClient MQTT MQTT(消息队列遥测 ...

最新文章

  1. 远程办公,为什么一直不被公司普遍接受?
  2. 如何实现一个连接池?一文带你深入浅出,彻底搞懂!
  3. 你应该知道的五种IO模型
  4. 乐观锁、悲观锁简单分析,回忆旧(新)知识...
  5. 一次 Druid 连接池泄露引发的血案!
  6. php 工商银行公众号支付代码_微信支付PHP SDK之微信公众号支付代码详解
  7. python房地产爬虫_房产中介网站爬虫实战(Python BS4+多线程)(一)
  8. 计算机通信网络学什么软件,通信工程专业需要用到的电脑软件有哪些
  9. eclipse 2018 安装html、jsp、JavaScript编辑器
  10. 端口已经被占用 (Port 8081 already in use)解决方法
  11. 法力无边的stage-0
  12. java 二进制 表示负数_java中的负数表示
  13. 字节跳动推出在线医疗App“小荷” 品牌域名或要另辟蹊径?
  14. C#对XML、JSON等格式的解析
  15. 这些常见的漏洞和修复方法你知道吗?
  16. 解决新版Chrome无法将单个标签页静音的问题
  17. win10c语言乱码修复方法,软件乱码 教你win10系统打开软件乱码的修复技巧
  18. 计算机网络云怎么连接网络设置方法,华为云电脑如何连网,华为云电脑使用教程...
  19. Xilinx Aurora 8B/10B IP核详解和仿真
  20. 2022-2028全球氢化镁行业调研及趋势分析报告

热门文章

  1. POPPER 的设计和优化
  2. 山东大学软件学院2022年春算法设计与分析考试
  3. Android 启动广告页
  4. java 调用大漠插件2
  5. vip163邮箱好用吗?如何注册vip163邮箱?
  6. JavaWeb细节——jstl导入eclipse报错
  7. 实现分享功能插件2---jiathis分享插件应用
  8. 六级备考23天|CET-6|翻译技巧5|2019年12月真题|翻译荷花lotus|11:05-12:05
  9. 读书笔记_《当下的力量》_精华书摘
  10. 支付宝微信砸130亿补贴刷脸支付,代理商已率先入场