Android使用的MQTT客户端,支持订阅、发送消息;

支持创建连接到本地保存;

支持话题消息筛选;

使用视频:https://dwz.cn/undJFEnq
小米应用商店也有 【蘑菇IoT】~

核心代码贴一下,做个记录


import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;import androidx.annotation.Nullable;import com.annimon.stream.Collectors;
import com.annimon.stream.Stream;
import com.freddon.android.snackkit.extension.regex.RegexHelper;
import com.freddon.android.snackkit.extension.tools.NetSuit;
import com.freddon.android.snackkit.log.Loger;
import com.qiniu.util.StringUtils;
import com.sagocloud.ntworker.agent.App;
import com.sagocloud.ntworker.agent.RxEventBus;
import com.sagocloud.ntworker.mqtt.ActionEventType;
import com.sagocloud.ntworker.mqtt.EventType;
import com.sagocloud.ntworker.mqtt.bean.MQTTConnectUserEntity;
import com.sagocloud.ntworker.mqtt.bean.MQTTMessage;
import com.sagocloud.ntworker.mqtt.bean.MqttConnectPoint;
import com.sagocloud.ntworker.mqtt.event.MQTTClientActionEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTTransferEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTMessageEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTStateEvent;
import com.sagocloud.ntworker.mqtt.event.MQTTTraceEvent;import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.android.service.MqttTraceHandler;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;public class MQTTService extends Service {public final static String CONN = "CON_MQTT_CF";private MqttAndroidClient mqttAndroidClient;private MQTTMessageEvent mQTTConnectEvent;private MQTTConnectUserEntity connectPoint;private MqttConnectOptions mMqttConnectOptions;private MqttCallback mqttCallback = new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {mQTTConnectEvent = new MQTTMessageEvent();mQTTConnectEvent.setType(EventType.connectionLost);Loger.e("?connectionLost:", cause.getMessage());RxEventBus.post(mQTTConnectEvent);RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected = false));}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {mQTTConnectEvent = new MQTTMessageEvent();mQTTConnectEvent.setType(EventType.messageArrived);mQTTConnectEvent.setTopic(topic);mQTTConnectEvent.setMessage(message);Loger.e("✉️messageArrived:", topic);RxEventBus.post(mQTTConnectEvent);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {mQTTConnectEvent = new MQTTMessageEvent();mQTTConnectEvent.setType(EventType.deliveryComplete);try {mQTTConnectEvent.setTopic(StringUtils.join(token.getTopics(), ","));mQTTConnectEvent.setMessage(token.getMessage());} catch (MqttException e) {e.printStackTrace();}Loger.e("?deliveryComplete:", token.toString());RxEventBus.post(mQTTConnectEvent);}};private IMqttActionListener iMqttActionListener = new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {RxEventBus.post(new MQTTTransferEvent(asyncActionToken, null));Loger.e("?onSuccess:", "" + Arrays.toString(asyncActionToken.getTopics()));App.mqttIsConnected = true;RxEventBus.post(new MQTTStateEvent(true));DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();disconnectedBufferOptions.setBufferEnabled(true);disconnectedBufferOptions.setBufferSize(100);disconnectedBufferOptions.setPersistBuffer(false);disconnectedBufferOptions.setDeleteOldestMessages(false);mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {RxEventBus.post(new MQTTTransferEvent(asyncActionToken, exception.getMessage()));Loger.e("?onFailure:", "" + exception.getMessage());App.mqttIsConnected = false;RxEventBus.post(new MQTTStateEvent(false));}};private CompositeDisposable subscription;private MqttTraceHandler traceCallback = new MqttTraceHandler() {@Overridepublic void traceDebug(String tag, String message) {Loger.e("?traceDebug:" + tag, "" + message);
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.DEBUG, tag, message));}@Overridepublic void traceError(String tag, String message) {Loger.e("?traceError:" + tag, "" + message);
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.ERROR, tag, message));}@Overridepublic void traceException(String tag, String message, Exception e) {Loger.e("?traceException:" + tag, "" + message + e.getMessage());RxEventBus.post(new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));
//            LiveDataBus.post(MQTTTraceEvent.class,new MQTTTraceEvent(MQTTTraceEvent.Type.EXCEPTION, tag, message));}};private Disposable actSubscription;private Disposable actTimerSubscription;public static void startService(Context context, MQTTConnectUserEntity point) {Intent service = new Intent();service.setClass(context, MQTTService.class);service.putExtra(CONN, point);context.startService(service);}private void $prepareActionHandler() {if (subscription == null) {subscription = new CompositeDisposable();}subscription.clear();if (actSubscription != null && actSubscription.isDisposed()) {actSubscription.dispose();}if (actTimerSubscription != null && actTimerSubscription.isDisposed()) {actTimerSubscription.dispose();}actSubscription = RxEventBus.subscribeIOEvent(MQTTClientActionEvent.class,event -> {ActionEventType type = event.getEventType();Object payload = event.getPayload();switch (type) {case connect:connect();break;case publish:if (payload instanceof MQTTMessage) {publish((MQTTMessage) payload);}break;case subscribe:if (payload instanceof MQTTMessage) {subscribe((MQTTMessage) payload);}break;case unsubscribe:if (payload instanceof MQTTMessage) {unsubscribe((MQTTMessage) payload);}break;case unsubscribe_all:if (payload instanceof String[]) {unsubscribeAll((String[]) payload);}break;case close:disconnect();mqttAndroidClient = null;App.mqttIsConnected = false;RxEventBus.post(new MQTTStateEvent(false));stopSelf();break;}},error -> {Loger.d("error", error.getMessage());});actTimerSubscription = Observable.interval(2000, TimeUnit.MILLISECONDS).subscribe((i) -> {App.mqttIsConnected = mqttAndroidClient != null && mqttAndroidClient.isConnected();RxEventBus.post(new MQTTStateEvent(App.mqttIsConnected));});subscription.add(actSubscription);subscription.add(actTimerSubscription);}@Overridepublic void onDestroy() {disconnect();RxEventBus.unsubscribeEvent(subscription);super.onDestroy();}@Nullable@Overridepublic IBinder onBind(Intent intent) {return null;}@Overridepublic int onStartCommand(Intent intent, int flags, int startId) {if (intent != null) {connectPoint = intent.getParcelableExtra(CONN);if (connectPoint != null) {$prepareActionHandler();connect();} else {if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {connect();}}}return super.onStartCommand(intent, flags, startId);}private void $prepared(MqttConnectPoint connectPoint) {String serverURI = String.format(Locale.ENGLISH, "%s://%s:%s", connectPoint.isUseSSL() ? "ssl" : "tcp", connectPoint.getHost(), connectPoint.getPort());if (mqttAndroidClient != null) {disconnect();}mqttAndroidClient = new MqttAndroidClient(this, serverURI, connectPoint.getClientId());mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调mqttAndroidClient.setTraceEnabled(true);mqttAndroidClient.setTraceCallback(traceCallback);mMqttConnectOptions = new MqttConnectOptions();mMqttConnectOptions.setMqttVersion(connectPoint.getVersion());mMqttConnectOptions.setMaxInflight(connectPoint.getMaxInflight());mMqttConnectOptions.setAutomaticReconnect(connectPoint.isAutoReconnect());mMqttConnectOptions.setCleanSession(connectPoint.isClearSession()); //设置是否清除缓存mMqttConnectOptions.setConnectionTimeout(connectPoint.getConnectTimeout()); //设置超时时间,单位:秒mMqttConnectOptions.setKeepAliveInterval(connectPoint.getTickTime()); //设置心跳包发送间隔,单位:秒if (RegexHelper.isAllNotEmpty(connectPoint.getUserName(), connectPoint.getUserPasswort())) {mMqttConnectOptions.setUserName(connectPoint.getUserName()); //设置用户名mMqttConnectOptions.setPassword(connectPoint.getUserPasswort().toCharArray()); //设置密码}if (connectPoint.isUseSSL() && connectPoint.getSslProperties() != null) {mMqttConnectOptions.setSSLProperties(connectPoint.getSslProperties());}if (RegexHelper.isNotEmpty(connectPoint.getLwt())) {mMqttConnectOptions.setWill(connectPoint.getLwt().getTopic(), connectPoint.getLwt().getMessage().getBytes(), connectPoint.getLwt().getQos(), connectPoint.getLwt().isRetained());}}private void connect() {if (mqttAndroidClient == null) {$prepared(connectPoint);}if (!mqttAndroidClient.isConnected() && NetSuit.checkEnable(this)) {try {mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);} catch (MqttException e) {e.printStackTrace();}}}private void disconnect() {try {if (mqttAndroidClient == null) return;mqttAndroidClient.unregisterResources();mqttAndroidClient.disconnect();mqttAndroidClient.close();} catch (MqttException e) {e.printStackTrace();} finally {mqttAndroidClient = null;}}private void subscribe(MQTTMessage subscribe) {try {if (mqttAndroidClient == null || subscribe == null || subscribe.getTopic() == null)return;MqttMessage.validateQos(subscribe.getQos());List<MQTTMessage> sub = connectPoint.getSubTopics();if (sub == null) {sub = new ArrayList<>();}Long count = Stream.of(sub).filter(item -> subscribe.getTopic().equalsIgnoreCase(item.getTopic())).collect(Collectors.counting());if (count > 0) {return;}sub.add(subscribe);connectPoint.setSubTopics(sub);mqttAndroidClient.subscribe(subscribe.getTopic(), subscribe.getQos());} catch (IllegalArgumentException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}private void subscribeAll(String[] topics, int[] qos) {if (RegexHelper.isAnyEmpty(topics, qos)) return;if (mqttAndroidClient == null) return;if (topics.length != qos.length) return;try {mqttAndroidClient.subscribe(topics, qos);} catch (MqttException e) {e.printStackTrace();}}private void unsubscribe(MQTTMessage subscribe) {try {if (mqttAndroidClient == null || connectPoint == null) return;List<MQTTMessage> sub = connectPoint.getSubTopics();if (sub != null) {List<MQTTMessage> filtered = Stream.of(sub).filter(item -> !subscribe.getTopic().equalsIgnoreCase(item.getTopic())).collect(Collectors.toList());connectPoint.setSubTopics(filtered);}mqttAndroidClient.unsubscribe(subscribe.getTopic());} catch (MqttException e) {e.printStackTrace();}}private void unsubscribeAll(String[] topics) {try {if (mqttAndroidClient == null) return;if (topics == null) mqttAndroidClient.unsubscribe("#");else {mqttAndroidClient.unsubscribe(topics);}if (connectPoint != null) {connectPoint.setSubTopics(null);}} catch (MqttException e) {e.printStackTrace();}}private void publish(MQTTMessage subscribe) {try {MqttMessage.validateQos(subscribe.getQos());mqttAndroidClient.publish(subscribe.getTopic(), subscribe.getMessage().getBytes(), subscribe.getQos(), subscribe.isRetained());} catch (IllegalArgumentException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}}

Android使用的MQTT客户端相关推荐

  1. Android Studio实现MQTT客户端

    初学Android Studio,在实现MQTT客户端的过程中遇到了很多坑,而在查阅博文的时候发现各个博文能提供的解决方法很零碎,我也是结合了诸多博文才最终解决了问题,于是打算做一个小总结 我用的版本 ...

  2. 基于FreeRTOS与MQTT的物联网技术应用系列——步进电机控制(七)基于CrossApp跨平台框架的MQTT客户端控制应用android版

    本文在前一篇基础上,详细介绍以CrossApp跨平台框架为基础,利用mosquito库和easySQLite库设计实现了基于MQTT协议的android版步进电机控制客户端. 一.开发环境的准备 编译 ...

  3. Android实现MQTT客户端

    java代码 package com.example.myapplication;import androidx.appcompat.app.AppCompatActivity;import andr ...

  4. android paho框架,Android Mqtt 客户端paho使用心得

    Android mqtt 客户端实现一般使用以下两个库: implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1 ...

  5. 一起Talk Android吧(第四百九十四回:在Android中使用MQTT通信四)

    文章目录 问题概述 解决办法 经验总结 各位看官们大家好,这一回中咱们说的例子是" 在Android中使用MQTT通信四",本章回内容与前后章节内容无关联.闲话休提,言归正转,让我 ...

  6. 结合Amazon Cognito服务限制接入AWS IoT平台的MQTT客户端的clientId

    AWS IoT 支持使用四种身份委托人进行身份验证: X.509 证书 IAM 用户.组和角色 Amazon Cognito 身份 联合身份 通常,AWS IoT 设备使用 X.509 证书,移动应用 ...

  7. 基于STM32C8T6、ESP8266-01S、JavaWeb、JSP、Html、JavaScript、Android、服务器和客户端设计、上位机和下位机设计等技术融合的物联网智能监控系统设计与实现

    系列文章目录 第一章ESP8266的java软件仿真测试 第二章ESP8266硬件与软件测试 第三章ESP8266客户端与Java后台服务器联调 第四章ESP8266客户端与JavaWeb服务器联调 ...

  8. 一种MQTT客户端消息队列的设计

    MQTT 简介 MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议.它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟 ...

  9. MQTT协议学习:3、MQTT客户端实例

    MQTT协议学习:3.MQTT客户端实例 文章目录 MQTT协议学习:3.MQTT客户端实例 1. 前言 2. Paho MQTT (1). Go客户端实例 (2). Python客户端实例 (3). ...

最新文章

  1. java servlet 多线程_Servlet的多线程和线程安全
  2. MATLAB 成绩排序
  3. 请写出sfr和sbit的语句格式_最新最全 Oracle ORA-01861: 文字与格式字符串不匹配
  4. 一篇故事讲述了计算机网络里的基本概念:网关,DHCP,IP寻址,ARP欺骗,路由,DDOS等
  5. 《大话数据结构》读后总结(九)
  6. 2.1 script 元素
  7. Linux-5.10.13内核完全注释之工作队列
  8. Linux内核等待队列wait_queue学习
  9. C++各种常用名词的意思
  10. summernote 富文本编辑器上传七牛云服务器
  11. 让机器人更安全——(5.总结与展望)
  12. Postman代理设置
  13. java实现的PC小说下载器+阅读器
  14. 项目管理(课程总结1)Week1 澳大利亚维多利亚大学VIT1203 Introduction to Project Management
  15. [python][转载]opencv-python横向纵向拼接图片
  16. 计算机adminstor用户不见了,Win10管理员账户不见了如何用Administrator登录
  17. Android 接入支付宝在手机未安装支付宝客户端的情况下掉不起支付宝sdk的h5页面
  18. 利用分支限界法解决01背包和货郎担问题
  19. [复选框] 获取checkbox选中的值
  20. 安卓10 来电流程梳理

热门文章

  1. flutter中 dp的理解
  2. 浏览器去除烦人的黑白滤镜
  3. 基于深度学习的显著性目标检测方法综述
  4. 4、关于step的设置
  5. 阿里2019社招内推!阿里云高级专家(P8)帮内推!投递简历邮箱看正文!
  6. matlab中牛顿下山法实例,非线性方程的数值解法牛顿下山法matlab.docx
  7. 如何组织一场安全、可靠、高效的网络实战攻防演习?
  8. nginx的安装和html部署问题
  9. 针对移动式和无线物联网设备的低压电机控制系统的设计
  10. 在西安,1000万人的城市,有多少家IT培训机构?