Eclipse paho mqtt源码分析

  • MQTT
  • paho mqtt
  • 源码分析
    • org.eclipse.paho.client.mqttv3.MqttClient

MQTT

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

以上说明来源与百度:百度关于MQTT的解释说明

paho mqtt

paho mqtt是IBM根据MQTT协议编写的client jar

源码分析

org.eclipse.paho.client.mqttv3.MqttClient

MqttClient的类图,我们发现其非常简单,就是实现了IMqttClient接口类

我们主要看connect()方法,我们发现MqttClient中的connect()方法调用的是MqttAsyncClient类中的connect()方法,我们在一路像下走,会发现真正的连接方法在ClientComms类中

 /** @see IMqttClient#connect(MqttConnectOptions)*/public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {aClient.connect(options, null, null).waitForCompletion(getTimeToWait());}

在MqttAsyncClient的connect方法中,程序会创建对应的NetworkModule来进行与服务端的连接。NetworkModule一共有4种,分别是:TCPNetworkModule、SSLNetworkModule、WebSocketSecureNetworkModule、WebSocketNetworkModule。具体选择哪种实现,是通过Uri的secheme来实现。具体进行connect连接是在ClientComms的connect方法中。

 /** (non-Javadoc)* * @see* org.eclipse.paho.client.mqttv3.IMqttAsyncClient#connect(org.eclipse.paho.* client.mqttv3.MqttConnectOptions, java.lang.Object,* org.eclipse.paho.client.mqttv3.IMqttActionListener)*/public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {final String methodName = "connect";if (comms.isConnected()) {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);}if (comms.isConnecting()) {throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);}if (comms.isDisconnecting()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);}if (comms.isClosed()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);}if (options == null) {options = new MqttConnectOptions();}this.connOpts = options;this.userContext = userContext;final boolean automaticReconnect = options.isAutomaticReconnect();// @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2}// userName={3} password={4} will={5} userContext={6} callback={7}log.fine(CLASS_NAME, methodName, "103",new Object[] { Boolean.valueOf(options.isCleanSession()), Integer.valueOf(options.getConnectionTimeout()),Integer.valueOf(options.getKeepAliveInterval()), options.getUserName(),((null == options.getPassword()) ? "[null]" : "[notnull]"),((null == options.getWillMessage()) ? "[null]" : "[notnull]"), userContext, callback });//创建network module 并对comms设置networkmodules,此处通过spi技术,加载所有的NetworkModuleFactory接口实现,并通过uri的scheme进行匹配,来选择对应的NetworkModuleFactory,这里我们以TCPNetworkModuleFactory为例//networkmodule有://org.eclipse.paho.client.mqttv3.internal.TCPNetworkModuleFactory//org.eclipse.paho.client.mqttv3.internal.SSLNetworkModuleFactory//org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketNetworkModuleFactory//org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModuleFactory             comms.setNetworkModules(createNetworkModules(serverURI, options));comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));// Insert our own callback to iterate through the URIs till the connect// succeedsMqttToken userToken = new MqttToken(getClientId());ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken, userContext, callback, reconnecting);userToken.setActionCallback(connectActionListener);userToken.setUserContext(this);// If we are using the MqttCallbackExtended, set it on the// connectActionListenerif (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);}comms.setNetworkModuleIndex(0);connectActionListener.connect();return userToken;}

我们来看createNetworkModules方法的实现,它本身并没有什么复杂的逻辑。主要是构建参数,并调用createNetworkModule方法,在createNetworkModule方法种,它有调用了NetworkModuleService类中的createInstance方法。

public static NetworkModule createInstance(String address, MqttConnectOptions options, String clientId)throws MqttException, IllegalArgumentException{try {URI brokerUri = new URI(address);//应用RFC3986的权限补丁applyRFC3986AuthorityPatch(brokerUri);String scheme = brokerUri.getScheme().toLowerCase();//NetworkModule是通过Factory来进行创建的,我们在看FACTORY_SERVICE_LOADER属性,发现它用到了SPI机制,什么是SPI呢,我们会在后面的博客中进行讲解for (NetworkModuleFactory factory : FACTORY_SERVICE_LOADER) {if (factory.getSupportedUriSchemes().contains(scheme)) {return factory.createNetworkModule(brokerUri, options, clientId);}}/** To throw an IllegalArgumentException exception matches the previous behavior of* MqttConnectOptions.validateURI(String), but it would be nice to provide something more meaningful.*/throw new IllegalArgumentException(brokerUri.toString());} catch (URISyntaxException e) {throw new IllegalArgumentException(address, e);}}

在这里我们分析一下ClientComms的connect方法,在此方法中并不会真证进行tcp/ip的连接,具体的连接是通过内部类(ConnectBG)启动线程的方式进行创建。我们可以发现在paho mqtt中,基本上都是通过线程进行启动的。在线程中会同时启动:CommsReceiver(数据接收)、CommsSender(发送)、CommsCallback(回调)。

 private class ConnectBG implements Runnable {ClientComms    clientComms = null;MqttToken       conToken;MqttConnect    conPacket;private String threadName;ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket, ExecutorService executorService) {clientComms = cc;conToken   = cToken;conPacket     = cPacket;threadName = "MQTT Con: "+getClient().getClientId();}void start() {if (executorService == null) {new Thread(this).start();} else {executorService.execute(this);}}public void run() {Thread.currentThread().setName(threadName);final String methodName = "connectBG:run";MqttException mqttEx = null;//@TRACE 220=>log.fine(CLASS_NAME, methodName, "220");try {// Reset an exception on existing delivery tokens.// This will have been set if disconnect occurred before delivery was// fully processed.MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens();for (MqttDeliveryToken tok : toks) {tok.internalTok.setException(null);}// Save the connect token in tokenStore as failure can occur before sendtokenStore.saveToken(conToken,conPacket);// Connect to the server at the network level e.g. TCP socket and then// start the background processing threads before sending the connect// packet.NetworkModule networkModule = networkModules[networkModuleIndex];//这里真正进行tcp/ip的连接//启动网络模块,进行网络连接networkModule.start();//数据的接收,启动完成后,负责从broker接收消息receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);//数据发送,启动完成后,负责发送消息到brokersender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());sender.start("MQTT Snd: "+getClient().getClientId(), executorService);//回调服务,在连接时,指定的Callback服务callback.start("MQTT Call: "+getClient().getClientId(), executorService);//发送一条遗言消息,如果在连接客户端代码中有设置的话internalSend(conPacket, conToken);} catch (MqttException ex) {//@TRACE 212=connect failed: unexpected exceptionlog.fine(CLASS_NAME, methodName, "212", null, ex);mqttEx = ex;} catch (Exception ex) {//@TRACE 209=connect failed: unexpected exceptionlog.fine(CLASS_NAME, methodName, "209", null, ex);mqttEx =  ExceptionHelper.createMqttException(ex);}if (mqttEx != null) {//如果发现异常,进行连接的shutdown操作shutdownConnection(conToken, mqttEx);}}}

以上就是paho mqtt连接MQTT服务器的流程,代码其实非常简单。

MQTT-Eclipse paho mqtt源码分析-连接MQTT Broker相关推荐

  1. SOFA 源码分析 — 连接管理器

    前言 RPC 框架需要维护客户端和服务端的连接,通常是一个客户端对应多个服务端,而客户端看到的是接口,并不是服务端的地址,服务端地址对于客户端来讲是透明的. 那么,如何实现这样一个 RPC 框架的网络 ...

  2. Libevent源码分析-----连接监听器evconnlistener

    出处:http://blog.csdn.net/luotuo44/article/details/38800363 使用evconnlistener: 基于event和event_base已经可以写一 ...

  3. 《源码分析转载收藏向—数据库内核月报》

    月报原地址: 数据库内核月报 现在记录一下,我可能需要参考的几篇文章吧,不然以后还得找: MySQL · 代码阅读 · MYSQL开源软件源码阅读小技巧 MySQL · 源码分析 · 聚合函数(Agg ...

  4. ThingsBoard 二次开发之源码分析 5-如何接收 MQTT 连接

    欢迎加入ThingsBoard技术交流群 这里可复制Q群号:69998183 关注"云腾五洲":获取二开ThingsBoard物联网平台演示 交流社区:ThingsKit-Thin ...

  5. 物联网协议之MQTT源码分析(二)

    此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦. juejin.im/post/5cd66 ...

  6. paho架构_MQTT系列最终章-Paho源码分析(三)-心跳与重连机制

    写在前面 通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天 ...

  7. 14.QueuedConnection和BlockingQueuedConnection连接方式源码分析

    QT信号槽直连时的时序和信号槽的连接方式已经在前面的文章中分析过了,见https://blog.csdn.net/Master_Cui/article/details/109011425和https: ...

  8. 【Android 电量优化】JobScheduler 相关源码分析 ( ConnectivityController 底层源码分析 | 构造函数 | 追踪任务更新 | 注册接收者监听连接变化 )

    文章目录 一.ConnectivityController 连接控制器引入 二.ConnectivityController 构造方法解析 ( 注册接收者 ) 三.mConnectivityRecei ...

  9. socket.io-client源码分析——建立socket连接

    介绍 socket.io是一种用于服务端和客户端的双向通信的js库,提供了长轮询和websocket这两种实现方式socket.io-client是其在客户端的实现.socket.io-client通 ...

最新文章

  1. 在CMD窗口连接到Mysql
  2. python重复执行_关于计时器:在Python中每x秒重复执行一次函数的最佳方法是什么?...
  3. DISCUZ中判断当前页是否是门户首页
  4. 如何免费申请并使用SAP Marketing Cloud测试系统
  5. OPENWRT传感器实验
  6. DOMContentLoaded、readystatechange、load、ready详谈
  7. 线性插值 多项式插值 样条插值 牛顿插值总结
  8. linux 卸载JDK(rpm 方式安装的)
  9. LEACH算法改进 SEP算法源代码
  10. u盘中的android文件夹图标不显示,怎么解决u盘图标变成文件夹,教您解决方法
  11. php里用钢笔画曲线,ps钢笔工具怎么抠图
  12. 不让用计算机怎么回怼,当别人怼你时,如何优雅地怼回去
  13. 表格-table 样式
  14. 【计算机网络】第三话·浅谈OSI和TCP/IP模型
  15. 大学html5毕业设计任务书,2021届本科生毕业设计(论文)工作的通知
  16. Web开发入门不得不看
  17. 全面解析增资扩股不属于股权转让的原因
  18. 如何改变outlook的ost数据文件存储位置
  19. 学计算机的男孩情商高吗,2021情商高的男孩适合学什么专业
  20. 第一台电子计算机采用哪位科学家,计算机基础知识参考试题及答案解析

热门文章

  1. GAN domian adaptation
  2. 常用Unity平台解释
  3. 线性代数中满足乘法交换律的运算-行列式与迹
  4. Second season seventh episode,Ross finds out Rachel like him,what will he do???
  5. 商业模拟游戏:柠檬汁杰克项目
  6. Code Sign error: Provisioning profile '6805769A-5085-4BE7-B9D1-2859CD2CBE9E' can't be found
  7. 硬笔书法的产生与兴起
  8. 数字网络监控和传统模拟监控的区别
  9. 山石防火墙--飞塔防火墙间GRE配置
  10. 【python绘图】seaborn可视化+鼠标滑动自动标注数据