目录

前言

js demo

参数

new Paho.Client 创建对象

onConnectionLost  连接丢失回调

onMessageArrived  监听数据

disconnect() :关闭链接

connect (connectOptions)将此消息客户端连接到其服务器。

mqtt 频繁断开和重连问题

小程序实践

单例模式 mqtt 封装

initMqtt文件

页面创建链接


前言

P2P,顾名思义,是一对一的消息收发模式,即只有一个消息发送者和一个消息接收者。而Pub/Sub模式通常用于一对多或多对多的消息群发场景,即拥有一个或多个消息发送者和多个消息接收者的场景。

在P2P模式中,发送者发送消息时已经明确该消息预期的接收者信息,并明确该消息只需要被特定的单个客户端消费。发送者发送消息时通过Topic信息直接指定接收者,接收者无需提前订阅即可获取该消息。

P2P模式不仅可以为接收者节省注册订阅关系的成本,此外,由于收发消息的链路有单独的优化,还可以降低推送延迟。

参考文档 :P2P消息收发模式(MQTT)

https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.2.13.229f42caHWXK5fhttps://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.2.13.229f42caHWXK5f参考文档 :P2P消息收发模式 .Client.

https://www.eclipse.org/paho/files/jsdoc/Paho.MQTT.Client.htmlhttps://www.eclipse.org/paho/files/jsdoc/Paho.MQTT.Client.html参考文档  paho-mqtt基础库

https://github.com/AwakenCN/InChat/blob/paho-mqtt/wechat-client/utils/paho-mqtt.jshttps://github.com/AwakenCN/InChat/blob/paho-mqtt/wechat-client/utils/paho-mqtt.js

参考文档  阿里云安全库

https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.jshttps://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.js

js demo

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN""http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><title>Aliyun Mqtt Websockets</title><meta name="viewport" content="width=device-width, initial-scale=1.0"><script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.9-1/crypto-js.js" type="text/javascript"></script><script type="text/javascript">instanceId = 'XXXX';//实例 ID,购买后从控制台获取host = 'XXXX.mqtt.aliyuncs.com';// 设置当前用户的接入点域名,接入点获取方法请参考接入准备章节文档,先在控制台创建实例port = 80;//WebSocket 协议服务端口,如果是走 HTTPS,设置443端口topic = 'XXXX';//需要操作的 Topic,第一级父级 topic 需要在控制台申请useTLS = false;//是否走加密 HTTPS,如果走 HTTPS,设置为 trueaccessKey = 'XXXXX';//账号的 AccessKey,在阿里云控制台查看secretKey = 'XXXXX';//账号的的 SecretKey,在阿里云控制台查看cleansession = true;groupId = 'GID_XXXX';//MQTT GroupID,创建实例后从 MQTT 控制台创建clientId = groupId + '@@@00001';//GroupId@@@DeviceId,由控制台创建的 Group ID 和自己指定的 Device ID 组合构成var mqtt;var reconnectTimeout = 2000;var username = 'Signature|' + accessKey + '|' + instanceId;//username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7var password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);function MQTTconnect() {mqtt = new Paho.MQTT.Client(host,//MQTT 域名port,//WebSocket 端口,如果使用 HTTPS 加密则配置为443,否则配置80clientId//客户端 ClientId);var options = {timeout: 3,onSuccess: onConnect,mqttVersion: 4,cleanSession: cleansession,onFailure: function (message) {setTimeout(MQTTconnect, reconnectTimeout);}};mqtt.onConnectionLost = onConnectionLost;mqtt.onMessageArrived = onMessageArrived;if (username != null) {options.userName = username;options.password = password;options.useSSL = useTLS;//如果使用 HTTPS 加密则配置为 true}mqtt.connect(options);}function onConnect() {// Connection succeeded; subscribe to our topic//发送 P2P 消息,topic 设置方式参考https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.6.586.694f7cb4oookL7message = new Paho.MQTT.Message("Hello mqtt P2P Msg!!");//set bodymessage.destinationName = topic + "/p2p/" + clientId;// set topicmqtt.send(message);}function onConnectionLost(response) {setTimeout(MQTTconnect, reconnectTimeout);};function onMessageArrived(message) {var topic = message.destinationName;var payload = message.payloadString;console.log("recv msg : " + topic + "   " + payload);};MQTTconnect();</script>
</head>
</html>

参数

new Paho.Client 创建对象

JavaScript应用程序使用Paho.MQTT.Client对象与服务器通信。

大多数应用程序只创建一个Client对象,然后调用其connect()方法,但是如果需要,应用程序可以创建多个Client对象。在这种情况下,每个客户端对象的主机、端口和clientId属性的组合必须不同。

发送、订阅和取消订阅方法被实现为异步JavaScript方法(即使底层协议交换本质上可能是同步的)。这意味着它们通过调用应用程序(通过应用程序提供的有关方法的成功或失败回调函数)来发出完成的信号。这种回调在每个方法调用中最多调用一次,并且不会持续到调用脚本的生命周期之外。

相比之下,Paho.MQTT.Client对象上定义了一些回调函数,尤其是onMessageArrived。这些方法可能会被多次调用,并且与客户端进行的特定方法调用没有直接关系。

  • host字符串消息服务器的地址,作为完全限定的WebSocket URI,作为DNS名称或虚线十进制IP地址。
  • port number要连接到的端口号-仅当主机不是URI时才需要
  • path字符串要连接到的主机上的路径-仅当主机不是URI时使用。默认值:'/mqt'。
  • clientId字符串消息传递客户端标识符,长度在1到23个字符之间。

onConnectionLost  连接丢失回调

当连接丢失时调用。connect()方法成功后。建立连接丢失时使用的回调。连接可能会丢失,因为客户端启动了断开连接,或者因为服务器或网络导致客户端断开连接。例如,如果客户端无法连接,则可以在不调用connectionComplete回调的情况下调用disconnect回调。将单个响应对象参数传递给onConnectionLost回调,该回调包含以下字段:

  • errorCode 错误代码
  • errorMessage 错误消息

onMessageArrived  监听数据

当消息到达此Paho.MQTT.客户端时调用。传递给onMessageArrived回调的参数为

  • destinationName强制消息要发送到的目的地的名称(对于即将发送的消息)或接收消息的目的地名称。(用于onMessage功能接收的消息)。
  • payloadString :只读如果有效负载包含有效的UTF-8字符,则将有效负载作为字符串
  • payloadBytes :只读作为ArrayBuffer的有效负载
  • qs:用于传递消息的服务质量。0最大努力(默认)。1至少一次。2正好一次。
  • retained:如果为true,则服务器将保留该消息,并将其传递给当前订阅和未来订阅。如果为false,则服务器仅将消息传递给当前订户,这是新消息的默认值。如果消息发布时保留的布尔值设置为true,并且在消息发布后进行了订阅,则收到的消息将保留的布尔设置为true。
  • duplicate :只读如果为true,则此消息可能与已接收的消息重复。这仅在从服务器接收的消息上设置。

disconnect() :关闭链接

connect (connectOptions)将此消息客户端连接到其服务器。

connectOptions用于连接的属性

  1. timeout : 如果在该秒数内连接未成功,则视为连接失败。默认值为30秒
  2. userName 此连接的身份验证用户名
  3. password:此连接的身份验证密码。
  4. onSuccess:当从服务器接收到连接确认时调用。单个响应对象参数传递给onSuccess回调,该回调包含以下字段: invocationContext传递给connectOptions中的onSuccess方法。
  5. cleanSession:清除会话  cleanif true(默认值)成功连接时删除客户端和服务器的持久状态。
  6. mqttVersion 用于连接到MQTT代理的MQTT版本。 3or4
  7. useSSL 如果存在且为真,请使用SSL Websocket连接。{如果使用 HTTPS 加密则配置为 true}
  8. onFailure:当连接请求失败或超时时调用。单个响应对象参数传递给onFailure回调,该回调包含以下字段:
  • 传入connectOptions中onFailure方法的invocationContext。
  • errorCode表示错误性质的数字。
  • error描述错误的消息文本。

mqtt 频繁断开和重连问题

mqtt.js底层没有对你调用它方法传入的callback函数做异常捕获(你传入callback函数里面不做异常捕获,会导致mqtt底层代码逻辑异常,导致频繁断连&重连问题发生),所以你所有的callback函数都需要增加try..catch..方法捕获异常(比如发布,订阅,监听等方法调用的时候的第二个callback函数)

小程序实践

 const Paho = require('../../utils/mqtt');const CryptoJS = require('../../utils/crypto-js.js');let instanceId = 'XXXX';//实例 ID,购买后从控制台获取let host = 'XXXX.mqtt.aliyuncs.com';// 设置当前用户的接入点域名,接入点获取方法请参考接入准备章节文档,先在控制台创建实例let port = 80;//WebSocket 协议服务端口,如果是走 HTTPS,设置443端口let topic = 'XXXX';//需要操作的 Topic,第一级父级 topic 需要在控制台申请let useTLS = false;//是否走加密 HTTPS,如果走 HTTPS,设置为 truelet accessKey = 'XXXXX';//账号的 AccessKey,在阿里云控制台查看let secretKey = 'XXXXX';//账号的的 SecretKey,在阿里云控制台查看let cleansession = true;let groupId = 'GID_XXXX';//MQTT GroupID,创建实例后从 MQTT 控制台创建let clientId = groupId + '@@@00001';//GroupId@@@DeviceId,由控制台创建的 Group ID 和自己指定的 Device ID 组合构成var mqtt;var reconnectTimeout = 2000;var username = 'Signature|' + accessKey + '|' + instanceId;//username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7var password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);onLoad(options) {this.MQTTconnect()
},
// MQTT连接MQTTconnect() {try {let that = this//创建mqtt对象mqtt = new Paho.Client(host,//MQTT 域名port,//WebSocket 端口,如果使用 HTTPS 加密则配置为443,否则配置80clientId//客户端 ClientId);// 准备链接属性var options = {timeout: 3, //超时时间onSuccess: that.onConnect, //当从服务器接收到连接确认时调用mqttVersion: 4,//用于连接到MQTT代理的MQTT版本。cleanSession: cleansession,//清除会话  cleanif true(默认值)成功连接时删除客户端和服务器的持久状态。onFailure: function (message) {//当连接请求失败或超时时调用。console.log(message, "连接请求失败或超时。");setTimeout(that.MQTTconnect, reconnectTimeout);}};mqtt.onConnectionLost = that.onConnectionLost;  //连接丢失回调mqtt.onMessageArrived = that.onMessageArrived;  //监听数据回调if (username != null) {options.userName = username;//此连接的身份验证密码。options.password = password;//此连接的身份验证用户名soptions.useSSL = useTLS;//如果使用 HTTPS 加密则配置为 true}//链接mqtt.connect(options);} catch (error) {}},//当从服务器接收到连接确认时调用onConnect() {try {// Connection succeeded; subscribe to our topic//发送 P2P 消息,topic 设置方式参考https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.6.586.694f7cb4oookL7message = new Paho.Message("Hello mqtt P2P Msg!!");//set bodymessage.destinationName = topic + "/p2p/" + clientId;// set topicmqtt.send(message);} catch (error) {}},//连接丢失回调onConnectionLost(response) {try {let that = thisconsole.log("连接丢失了:" + response.errorCode + response.errorMessage)setTimeout(that.MQTTconnect, reconnectTimeout);} catch (error) {}},//监听数据onMessageArrived(message) {try {var topic = message.destinationName;var payload = message.payloadString;console.log("recv msg : " + topic + "   " + payload);} catch (error) {}},

单例模式 mqtt 封装

保证只有一个链接对象,便于复用,页使用面重写 onConnectionLost & onMessageArrived

  • getInstance 创建链接
  • onConnect  服务器确认链接回调
  • disconnect 关闭链接
  • onConnectionLost 连接丢失回调
  • onMessageArrived 监听数据
  • onFailure 创建链接失败回调 页面传入callback
  • info 链接信息

initMqtt文件

var single = (function () {let mqtt, instanceId, host, port, accessKey, secretKey, clientId, username, password, topic;let i = 0let cleansession = true; // true(默认值)成功连接时删除客户端和服务器的持久状态。let useTLS = true;//是否走加密 HTTPS,如果走 HTTPS,设置为 true// 创建链接function getInstance(callback, info) {try {console.log('getInstance执行', mqtt, i++);if (mqtt === undefined) {console.log('创建', info);instanceId = info.instanceId;//实例 ID,购买后从控制台获取host = info.host;// 设置当前用户的接入点域名,接入点获取方法请参考接入准备章节文档,先在控制台创建实例port = 443;//WebSocket 协议服务端口,如果是走 HTTPS,设置443端口accessKey = info.accessKey;//账号的 AccessKey,在阿里云控制台查看secretKey = info.secretKey;//账号的的 SecretKey,在阿里云控制台查看clientId = info.client_id + info.xmid//GroupId@@@DeviceId,由控制台创建的 Group ID 和自己指定的 Device ID 组合构成username = 'Signature|' + accessKey + '|' + instanceId;//username和 Password 签名模式下的设置方法,参考文档 https://help.aliyun.com/document_detail/48271.html?spm=a2c4g.11186623.6.553.217831c3BSFry7password = CryptoJS.HmacSHA1(clientId, secretKey).toString(CryptoJS.enc.Base64);topic = info.topic;//需要操作的 Topic,第一级父级 topic 需要在控制台申请//创建mqtt对象mqtt = new Paho.Client(host,//MQTT 域名port,//WebSocket 端口,如果使用 HTTPS 加密则配置为443,否则配置80clientId//客户端 ClientId);}// 准备链接属性var options = {timeout: 3, //超时时间onSuccess: onConnect, //当从服务器接收到连接确认时调用mqttVersion: 4,//用于连接到MQTT代理的MQTT版本。cleanSession: cleansession,//清除会话  cleanif true(默认值)成功连接时删除客户端和服务器的持久状态。onFailure: callback//当连接请求失败或超时时调用};mqtt.onConnectionLost = onConnectionLost;  //连接丢失回调mqtt.onMessageArrived = onMessageArrived;  //监听数据回调if (username != null) {options.userName = username;//此连接的身份验证密码。options.password = password;//此连接的身份验证用户名soptions.useSSL = useTLS;//如果使用 HTTPS 加密则配置为 true}//链接mqtt.connect(options);} catch (error) { }return mqtt;}//服务器确认链接回调function onConnect() {try {// Connection succeeded; subscribe to our topic//发送 P2P 消息,topic 设置方式参考https://help.aliyun.com/document_detail/96176.html?spm=a2c4g.11186623.6.586.694f7cb4oookL7message = new Paho.Message("Hello mqtt P2P Msg!!");//set bodymessage.destinationName = topic + "/p2p/" + clientId;// set topicmqtt.send(message);} catch (error) { }}// 关闭链接function Disconnect() {console.log('通道关闭 js文件');mqtt.disconnect()mqtt = undefined}// 连接丢失回调function onConnectionLost() {console.log("连接丢失回调 js文件");}// 监听数据function onMessageArrived() {console.log("监听数据js 文件");}return {getInstance: getInstance,Disconnect: Disconnect}})();

页面创建链接

  onShow() {let that = thislet mqtt = single.getInstance(that.onFailure)mqtt.onConnectionLost = this.onConnectionLostmqtt.onMessageArrived = this.onMessageArrived},// mqtt连接超时的回调onFailure(response) {let that = thisconsole.log("连接超时:" + response.errorCode + response.errorMessage)if (response.errorCode != 8) {//8通道关闭setTimeout(function () {let mqtt = single.getInstance(that.onFailure)mqtt.onConnectionLost = that.onConnectionLostmqtt.onMessageArrived = that.onMessageArrived}, 2000);}},//监听数据onMessageArrived(message) {try {var topic = message.destinationName;var payload = JSON.parse(message.payloadString);console.log(payload, '通知');} catch (error) {}},// 连接丢失回调onConnectionLost(response) {try {let that = thisconsole.log("连接丢失了:" + response.errorCode + response.errorMessage)if (response.errorCode != 0) {//0 链接关闭setTimeout(function () {let mqtt = single.getInstance(that.onFailure)mqtt.onConnectionLost = that.onConnectionLostmqtt.onMessageArrived = that.onMessageArrived}, 2000);}} catch (error) {}},

微信小程序--P2P消息收发模式(MQTT)相关推荐

  1. 微信小程序开发消息推送配置教程

    微信小程序开发消息推送配置教程 微信小程序开发消息推送配置这一块网上都是PHP居多,由于用egg.js写了一套验证方法. 第一步:填写服务器配置 登录微信小程序官网后,在小程序官网的"设置- ...

  2. 微信小程序模板消息(服务通知消息)原始post工具封装(不使用jar包--坑比较多),解决47001(JSON格式)和中文乱码问题

    微信小程序模板消息(服务通知消息)原始post工具封装(不使用jar包--坑比较多),解决47001(JSON格式)和中文乱码问题 参考文章: (1)微信小程序模板消息(服务通知消息)原始post工具 ...

  3. 微信小程序模板消息群发解决思路

    基于微信的通知渠道,微信为开发者提供了可以高效触达用户的模板消息能力,以便实现服务的闭环并提供更佳的体验.(微信6.5.2及以上版本支持模板功能.低于该版本将无法收到模板消息.) 模板推送位置:服务通 ...

  4. 化繁为简,我用”知晓推送”开发微信小程序订阅消息

    知晓云在2019年十月份左右就上线了微信小程序订阅消息这个服务,后来迭代升级,又相继提供了相应的sdk插件,然而这之前,我却没有很认真,花精力去使用这些服务,刚好国庆几天假,我就熬了两个通宵,将这个小 ...

  5. 微信小程序订阅消息定时发送消息

    微信小程序订阅消息定时发送消息 本人专注使用云开发,实现一个前端可以做后端以及整个项目的部署与上线. 如果觉得我讲的好就可以给我点个赞.也可以加我微信了解详情. 1.我们先要了解什么是订阅消息 而现在 ...

  6. 微信小程序模板消息群发、无限制推送相关讲解

    模版消息推送是微信小程序采用的通知形式,用户本人在小程序页面有交互行为后,可触发下发通知,通过微信聊天列表中的服务通知可快捷进入查看消息.此外,点击查看详情还能跳转到下发消息的小程序的指定页面.但是为 ...

  7. 微信小程序开发—消息推送

    微信小程序的消息推送简单的说就是发送一条微信通知给用户,用户点开消息可以查看消息内容,可以链接进入到小程序的指定页面. 微信小程序消息推送需要用户触发动作才能发送消息,比如用户提交订单.支付成功.一次 ...

  8. 微信小程序订阅消息失败

    微信小程序订阅消息失败 之前测试微信小程序订阅消息都是正常的,并且支持开发工具的调试,但是同样的方法换到另一个页面通过表单提交触发就没有效果.调试之后发现报错信息,errMsg: "requ ...

  9. 使用Java实现微信小程序订阅消息

    首先到微信小程序的官网,选择合适自己的订阅消息模板. 寻找到适合自己的模板之后,记住模板ID,点开详情,记住每个字段id 微信小程序订阅消息官网文档介绍地址:小程序订阅消息 | 微信开放文档 (qq. ...

最新文章

  1. javaScript中变量作用域
  2. python整数和浮点数相乘_python中整数除法和浮点数到整数转换之间的区别是什么原因?...
  3. 今天,我要教妹子学会Spring:Aware、异步编程、计划任务
  4. graphpad prism画折线图_如何用Graphpad Prism 8作折线图
  5. bzoj:1026: [SCOI2009]windy数(数位dp)
  6. php的Snoopy类
  7. ZeroMQ的一些配置
  8. sqlserver2008r2 还原bak文件
  9. FRR BGP协议分析10 -- 路由衰减
  10. html怎样让页面居中显示,HTML怎么让页面居中
  11. 海量数据处理技巧-转载
  12. C++模拟手机通信录管理系统
  13. Pytorch 深度学习入门与实践 第二章 pytorch快速入门 (1)
  14. php云打印类,PHP应用:PHP云打印类完整示例
  15. maven 导入jar包失败编译代码失败解决方案
  16. KBU808-ASEMI适配大功率开关电源整流桥
  17. Word 中给公式自动编号
  18. kaggle中关于图像的比赛整理
  19. 赶紧来修炼内功~字符串函数详解大全(二)
  20. 国内物流公司通用Material Number Range

热门文章

  1. 【MySQL】使用MySQL
  2. redis可持续化存储的时候出现Can t save in background fork Cannot allocate memory
  3. 【Python】openpyxl单元格合并
  4. 推荐系统的常用算法,选择,漫谈,推荐系统开源软件汇总
  5. java银器锁,银器保值吗?S925、S990、S999又是什么银……
  6. 微服务应用性能分析实战14 互通有无:如何设计跨语言的 APM 交互协议?
  7. 一种能人类大脑信息解读成声音信息的人工智能
  8. Godfather POJ - 3107 (求树的重心)
  9. python下载谷歌地图瓦片_python抓取天地图瓦片
  10. AMESim2020.1仿真编译失败解决方法之一