一、写在前面

在对接项目中IoT时,发现目前有对MQTT做了接入,这里主要从实现细节出发;对具体的需求以及配套的技术方案进行整理,以供参考。

一、IoT与MQTT

提到 IoT(Internet of Things)、IIoT(Industrial IoT ) 不得不说 MQTT,其被广泛的应用在物联网以及工业物联网之中,是一种消息传递协议。不同于我们所认识的平时常见的一些智能设备,如手机、电脑、平板等;这些设备一般都有着很好的计算能力,所依赖的网络环境很优质。但是一般的硬件设备性能较差,网络环境不稳定,而MQTT则是专门针对于硬件性能,网络状态不稳定场景下而生的。有着天然的优势。

二、什么是MQTT

MQTT是用于物联网的最常用的消息传递协议 (IoT)。MQTT代表MQ遥测运输。该协议是一组规则,它定义了IoT设备如何通过Internet发布订阅数据。用于IoT和工业IoT(IIT)设备(例如嵌入式设备,传感器,工业PLC等)之间的消息传递和数据交换。协议是事件驱动的,并使用发布/订阅(PUB / SUB)模式连接设备。发布者和接收器(订阅者)通过主题通信,并彼此分离。它们之间的连接由MQTT代理处理。MQTT代理过滤所有传入消息并将其正确分发给订阅者。

三、与传统Http的区别
  • MQTT以数据为中心,底层基于TCP链接,直接操作轻量级的二进制数据,并且数据包很小(可以小到一个字符,两个字节)。划重点,由于这个特性,其对于网络环境状态要求没有HTTP那么高,这也是为什么广泛应用于IoT设备的原因之一。
  • MQTT基于发布/订阅模式,区别于HTTP的请求/回调模式,这就决定了一个同一个设备即可以是客户端(Client)同时也可以是服务端(Server),回想发布订阅模式,消息的发布可以是1toN(N>=0),而HTTP则是1to1。
  • MQTT的发布/订阅架构决定了其无法基于UDP(面向无链接),而HTPP底层可以是基于TCP或者UDP。
  • 消息体量的区别,MQTT数据包很小,而HTTP数据量一般较大。
四、MQTT构成部分
1.Publish&Subscribe

MQTT对于发布订阅做了自身的解耦处理,主要是从三个维度出发,1.空间解耦:发布者和订阅者不需要相互了解(例如,没有IP地址和端口的交换)。2.时间解耦:发布者和订阅者不需要同时运行。3.同步解耦:在发布或接收期间,两个组件的操作不需要中断。

2.Topics&Best Practices

主要需要注意Topics的匹配规则,分为单项通配符,与多项通配符。单项以 + 连接:this/is/+/single,其中仅仅 + 部分可以被替换为单个路径(以 / 分割)。多项通配符仅支持在尾端支持:this/is/multi/#,并且是多级的。

3.Keep Alive

保活时效,包括其他的字段,官方文档都给出了很详细的解释,认真了解一项技术实现。这里主要基本认识MQTT是个什么东西,具体的实现细节与规范也不是一两句话可以说的清楚的,且可能存在误导的风险。

五、MQTT实际项目中的使用
1.实现什么需求?

以实际的项目为例,现需要实现的功能有:

  • 服务端下发消息通知到IoT设备,消息以Type区分,不同的消息需对应不同的处理措施。
  • 根据消息的不同,有语音播放、叫号、本地数据更新、服务端配置下发。

功能相对很简单,总结就是服务端推送消息,设备根据消息做出响应。

2.具体实现方案

导入依赖

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

主要分为几个类:a.主体请求Client,b.数据返回的回调dataCallback,c.链接状态回调connectCallback,d.具体消息处理策略IHandler。方案主要就包括这几个大类,逐步实现各个细节。

3.数据接口回调IDataCallback
interface IDataCallback {fun connectionError(cause: Throwable?)fun dataMessage(dataMessage: String)
}
4.连接状态回调IConnectionCallback
interface IConnectionCallback {fun connectSuccess()fun connectFail(reason: Throwable?)
}
5.CMqttClient链接类

在实现之前,列举几个关键的参数,参数配置在MqttConnectOptions

val options = MqttConnectOptions()
//默认为true,表示非持久订阅,无论服务端或者客户端重启,不会保持状态,重启后指定消息也无法送达
//设置为false,表示持久订阅,服务端与客户端重启或重链,指定消息可以送达
options.isCleanSession = false
//链接的用户名(账号名)
options.userName = user
//链接的用户密码(账户密码)
options.password = password.toCharArray()
//链接超时值s,默认30s,为0时,等待网络状态,即成功或失败
options.connectionTimeout = connectTimeout
//默认60s,检测服务端是否可用,为0时则禁止客户端保活,保活间隔内,没有消息的情况下客户端会通过ping来检测链接是否保持
options.keepAliveInterval = keepAliveInterval
//是否开启自动重连接,初始尝试重连是等待1s,失败情况下,延迟加倍,直到2分钟。
options.isAutomaticReconnect = true

关于自动重新连接有三个必要条件,cleanSession需要设置为falseisAutomaticReconnect需要设置为true,并且初始已经连接过。划重点,这里就要求,MQTT虽然可以自动重试连接当时必须有这三个前提,那么首次由于网络等其他原因未能连接的,这层的重试机制是需要我们自身去实现的,也就是需要保证首次能够连接到服务端。源码以及注释:

//Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times
synchronized void reconnect() {//....
}
//注释说的很明确三个条件,1.cleanSession需设置为false,2.isAutomaticReconnect需设置为true,并且是之前是已经连接过了。
class CMqttClient {//正在连接服务器private var connecting = falseprivate var mqttAndroidClient: MqttAndroidClient? = null//连接到服务器fun connectToServer(context: Context,host: String,clientId: String,accountName: String,accountPsw: String,connectTimeout: Int = 20,keepAliveTime: Int = 60,connectionCallback: IConnectionCallback? = null,dataCallback: IDataCallback? = null) {if(null == mqttAndroidClient) {mqttAndroidClient = MqttAndroidClient(context, host, clientId)mqttAndroidClient?.setCallback(object: MqttCallback {override fun connectionLost(cause: Throwable?) {dataCallback?.connectionError(cause)}override fun messageArrived(topic: String?, message: MqttMessage?) {message?.let {val payLoad = String(it.payload)Log.d(xxxxxxxxx)dataCallback?.dataMessage(payLoad)}}override fun deliveryComplete(token: IMqttDeliveryToken?) {//do something}})}connecting = trueval options = MqttConnectOptions()options.isCleanSession = falseoptions.userName = useroptions.password = password.toCharArray()options.connectionTimeout = connectTimeoutoptions.keepAliveInterval = keepAliveIntervaloptions.isAutomaticReconnect = truemqttAndroidClient?.connect(options, null, object: IMqttActionListener{override fun onSuccess(asyncActionToken: IMqttToken?) {connecting = falseconnectionCallback?.connectSuccess()}override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {connecting = falseconnectionCallback?.connectFail(exception)}})}//是否已经建立链接fun hasConnected(): Boolean {return try {mqttAndroidClient?.isConnected == true} catch {false}}//断开与服务端连接fun disConnectFromServer() {if(mqttAndroidClient?.isConnected == true) {mqttAndroidClient?.disconnect()}connecting = false}//释放资源fun relaseClient() {mqttAndroidClient?.close()mqttAndroidClient = nullconnecting = false}//订阅消息fun subscribe(topics: Array<String>, qos: IntArray, timeOut: Long = 2000): Boolean {return mqttAndroidClient?.let {try{val mqttToken = it.subscribe(topics, qos)mqttToken.waitForCompletion(timeout)mqttToken.isCompletetrue} catch {Log.d(xxxx)false}}?: false}//取消订阅fun unSubscribe(): Boolean {//省略}
}

需要注意的是这里的ClientId,是唯一性的,像IoT设备以设备deviceId作为ClientId,如果换成用户userId,当在多设备登录的情况下,那么重试等其他一些机制会影响预期结果,给排查问题带来一定的难度。

5.消息处理接口IHandler
interface IHandler {fun handlerMessage(message: String)
}

消息体中会包含不同的type,根据不同的type实现不同的处理器,当然为了灵活还要借助注解机制

6.注解模版
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.SOURCE)
annotation class MQTTHandler {val groupName: Stringval type: String
}

通过反射的方式加载对应的IHandler实现类,核心代码

interface IHandlerProvide {fun provideHandler(handlerType: String): IHandler?fun release()
}class XXXHandlerProvider: IHandlerProvide {override fun provideHandler(handlerType: String): IHandler? {//find the imp IHandler}override fun release() {//release the source}
}override fun provideHandler(handlerType: String): IHandler? {val handlerClass = Maneger.instance.findIHandlersByGroup(GroupName)?.get(handlerType)
}

使用时,直接加上注解:

@MQTTHandler(groupName = groupxxxx, type = typeNamexxxx)
class TestHandler: IHandler {override fun handlerMessage(message: String) {//do something what you want}
}

整个流程的主要部分已经给出,核心是通过不同的消息type查找出对应的处理器;当然这部分主要是由注解完成的,对于处理器的查找则是通过反射的方式来进行匹配的。

来自:https://juejin.cn/post/7070081254548307999

Android基于MQTT来实现消息通知相关推荐

  1. 「Android基于MQTT实现消息通知」

    「Android基于MQTT实现消息通知」 一.写在前面 在对接项目中IoT时,发现目前有对MQTT做了接入,这里记录一下,官方的资料比较详细,这里主要从实现细节出发:对具体的需求以及配套的技术方案进 ...

  2. 基于mqtt协议的消息推送服务器,基于 MQTT 协议的推送服务

    一.简述 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级& ...

  3. Android 监听系统中消息通知事件

    0. 学习文章 参考了下面Blog 完全没有任何多余的代码 https://blog.csdn.net/wanghang1208/article/details/49905403 原来百度卫士的通知栏 ...

  4. 基于MQTT的消息发布订阅python实现

    简介: MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的"轻量级"消息协议.该协议构建于TCP ...

  5. 基于MQTT的消息推送

    这段时间学习了推送技术,对xmpp和mqtt 协议做了下比较. xmpp基于xml信息传递,所以传输信息量比较大,在保持长链接情况下功耗会比较大. 可能还是比较适合用来做聊天之类的通讯应用,而对于智能 ...

  6. Java中集成极光推送实现给Android提送消息通知(附代码下载)

    场景 Android中集成极光推送实现推送消息通知与根据别名指定推送附示例代码下载: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details ...

  7. Android中集成Jpush实现推送消息通知与根据别名指定推送附示例代码下载

    场景 经常会有后台服务向Android推送消息通知的情况. 实现 首先在Android Studio中新建一个Android应用 在Project根目录的build.gradle中配置了jcenter ...

  8. 于金刚消息引擎服务器,基于MQTT的安全通信服务器的研究与实现

    摘要: 随着智能终端的普及和移动互联网的深入发展,以消息推送系统为核心部件的企业社交平台在企业办公自动化系统中发挥着越来越重要的作用.一方面,受限于移动互联网带宽资源和移动终端计算存储资源,传统的互联 ...

  9. Android学习—Notification消息通知

    最近在项目中需要使用消息通知,自己把它封装成了一个方法,需要的时候方便调用, 下面对Notification类中的一些常量,字段,方法简单介绍一下: 常量: DEFAULT_ALL    使用所有默认 ...

最新文章

  1. 领结婚证了,新的人生开始了!
  2. MATLAB画高斯曲线
  3. java 注解 Annontation
  4. 为安卓应用添加手势密码功能,遇到的一些问题以及解决方法
  5. 解决文字与下划线重叠的问题
  6. [BZOJ1322]Destroying The Graph
  7. 机器学习系统设计——误差矩阵
  8. mybatis Example 使用方法
  9. 【ACM】最少乘法次数 - 树
  10. 河南大学计算机组成原理,河南大学计算机组成原理考点
  11. Git SSH key配置
  12. runloop - 面试题
  13. 日志显示TypeError: Failed to fetch报错与TypeError: NetworkError when attempting to fetch resource报错
  14. Flutter 是移动应用程序开发的未来?
  15. 【文文殿下】浅谈KMP算法next数组与循环节的关系
  16. SQL进阶六:字符串函数
  17. 【转】.NET Interop入门-P/Invoke和Reverse P/Invoke
  18. 模拟器打开开发者模式
  19. 陪玩行业怎么找客户?想做线上引流?这篇文章打开你的思路!
  20. 服务器两个内存为何只显示4g_win10系统插入2个4G内存条却只显示4G的解决方法

热门文章

  1. 小程序 账本小记 统计月收入 月支出 源码分享
  2. 把照片变漫画的方法有哪些
  3. 一曲相思(Cover:阿悠悠)完整SQ版mp3 免费下载
  4. 设置你的SuperFetch
  5. RK平台之mpp编解码
  6. 实战|如何优雅地自定义Prometheus监控指标
  7. c++研发暑期实习面试总结(微软/intel/阿里/百度)
  8. 说词——浣溪沙 苏轼
  9. aspectj weaver记录
  10. Python产生batch数据的方法