先上一个图,大概说明一下moquette 的类之间的关系

一.ProtocolProcessor类
该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍

    protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有当前正在处理的订阅关系的存储,之所以有这个是过滤无效的订阅请求
private SubscriptionsDirectory subscriptions;//订阅目录,本质上是topic树
private ISubscriptionsStore subscriptionStore;//所有的订阅的集合
private boolean allowAnonymous;//是否允许匿名连接
private boolean allowZeroByteClientId;//是否允许clientId为空
private IAuthorizator m_authorizator; //对topic的读写权限认证private IMessagesStore m_messagesStore;//retainMessage的存储private ISessionsStore m_sessionsStore;//session 存储private IAuthenticator m_authenticator;//连接时候的鉴权认证
private BrokerInterceptor m_interceptor;//各个层面的拦截器private Qos0PublishHandler qos0PublishHandler;//qos0拦截器
private Qos1PublishHandler qos1PublishHandler;//qos1拦截器
private Qos2PublishHandler qos2PublishHandler;/qos2拦截器
private MessagesPublisher messagesPublisher;//分发消息,遗愿消息,以及集权间同步消息
private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重发器ConcurrentMap<String, WillMessage> m_willStore//遗愿消息存储几乎所有的功能的源头都在这个类里面

二.对14种报文的处理,都在ProtocolProcessor类,后面会分篇挨个讲解moquette对这14个报文的处理
具体哪14中文报文如下

名字                   值           报文流动方向                      描述

Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留

或者到这里看更详细的mqtt中文翻译
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感谢作者的辛劳工作和无私分享

三.debug跟踪moquette 对CONNECT报文的处理
大概分为以下几步
1.验证协议版本,如果不是mqtt-3.1或者mqtt-3.1.1则拒绝连接
2.验证clientId是否为空,如果为空,但是配置的时候(在上篇介绍的moquette.cof里面配置)要求不允许唯恐,即上面的allowZeroByteClientId或者cleanSession为false即要求保存会话,则视为不合法,拒绝连接,否则由moquette生成clientId
3.验证是否有登录的权限
这里面贴上源码讲解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}

3.1.如果CONNETCT报文里面的可变头里面没有用户名,直接返回true
3.2.如果有用户名,同时有密码,从可变头取出密码,调用m_authenticator进行验证
3.3 如果有用户名,没有密码,认证失败,拒绝连接
3.4 如果没有用户名,同时配置为不允许匿名,则认证失败

4.创建连接描述符,连接描述符包括clientId,channel,isCleanSession,ConnectState,同时判断连接描述符集合里面是否包括该连接描述符,如果包含,代表该连接以及建立,断开连接
5.根据CONNECT报文里面的Keep Alive time 来设置tcp参数
6.根据CONNECT报文遗愿消息标志位,觉得是否存储遗愿消息
7.返回CONNACK报文,这里面把返回CONNACK报文单独讲解一下

        private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {LOG.info("Sending connect ACK. CId={}", clientId);final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);if (!success) {return false;}MqttConnAckMessage okResp;ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);boolean isSessionAlreadyStored = clientSession != null;if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);} else {okResp = connAck(CONNECTION_ACCEPTED);}if (isSessionAlreadyStored) {LOG.info("Cleaning session. CId={}", clientId);clientSession.cleanSession(msg.variableHeader().isCleanSession());}descriptor.writeAndFlush(okResp);LOG.info("The connect ACK has been sent. CId={}", clientId);return true;
}7.1 判断当前连接的状态,怎么判断的呢?这里面用了AtomicReference<ConnectionState>通过调用原子引用类  compareAndSet(DISCONNECTED, SENDACK)来解决并发修改连接状态的问题。7.2如果状态是disConnect,将状态修改为sendAck7.3 如果CONNETCT报文里面的CleanSession标识设置为0同时broker已经有了client的会话,将CONNACK报文里面的连接确认标志设为1,告诉客户端,broker已经有了响应的会话信息。否则将连接确认标志设为07.4 如果已经存在相应的client的会话,则根据新的连接,更新clientSession里面的是否清理session属性

8.唤醒拦截器记录连接事件
9.创建或者从新加载clientSession,这里面单独讲解一下

            private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,String clientId) {final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);if (!success) {return null;}ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);boolean isSessionAlreadyStored = clientSession != null;if (!isSessionAlreadyStored) {clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());}if (msg.variableHeader().isCleanSession()) {LOG.info("Cleaning session. CId={}", clientId);clientSession.cleanSession();}return clientSession;
}9.1 AtomicReference<ConnectionState>通过调用原子引用类  compareAndSet(SENDACK, SESSION_CREATED)将连接状态从sendAck修改为session_create9.2 session存储结合里面,是否已经存在会话信息,如果不存在,创建一个新的clientsession9.3 如果存在,根据CONNETCT报文里面的cleansession自动决定是否清理调旧的会话信息。

10.如果CONNETCT报文要求不清理会话信息(cleansession标志位为0),则重发QoS1 and QoS2 messages,同时将连接状态从session_create修改成message_republish
11.将连接状态从session_create修改成established

到此,broker和client直接的mqtt连接正式建立,后面client可以开始发送SUBSCRIBE或者PUBLISH报文了。
在这里再补充一点,对于broker来说,建立连接的过程中,连接状态会从disConnect->sendAck->session_create->message_republish->established,之所以要设置这些状态,是因为,每一步后面的操作都要基于前面的状态来决定是否需要真正执行,这里面用到了原子引用类来保证,状态的修改这个操作的原子行,确保了在并发的情况下,每一步操作都是条件满足的。

下面一篇将会讲解SUBSCRIBE报文的处理

转载于:https://blog.51cto.com/13579730/2073630

mqtt协议-broker之moqutte源码研究二之Connect报文处理相关推荐

  1. tcp/ip 协议栈Linux源码分析二 IPv4分片报文重组分析二

    继续接着上篇讲,之前我们说过,收到分片报文后首先会检查分片报文所占内存是否过大,如果超过阈值的话就要调用ip_evictor函数去释放一些旧的分片队列,关于如何释放分片队列资源上一篇已经总结完成,接下 ...

  2. 物联网协议之MQTT源码分析(二)

    此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦. juejin.im/post/5cd66 ...

  3. WebRTC源码研究(4)web服务器工作原理和常用协议基础

    文章目录 WebRTC源码研究(4)web服务器工作原理和常用协议基础 前言 做WebRTC 开发为啥要懂服务器开发知识 1. Web 服务器简介 2. Web 服务器的类型 3. Web 服务器的工 ...

  4. WebRTC源码研究(4)web服务器工作原理和常用协议基础(转载)

    前言 前面3篇博客分别对WebRTC框架的介绍,WebRTC源码目录,WebRTC的运行机制进行了介绍,接下来讲解一点关于服务器原理的知识.后面博客会写关于WebRTC服务器相关的开发,目前git上面 ...

  5. Apache Camel源码研究之Rest

    本文以Camel2.24.3 + SpringBoot2.x 为基础简单解读Camel中的Rest组件的源码级实现逻辑. 0. 目录 1. 前言 2. 源码解读 2.1 启动时 2.1.1 `Rest ...

  6. WebRTC源码研究(7)创建简单的HTTPS服务

    文章目录 WebRTC源码研究(7)创建简单的HTTPS服务 1. HTTPS简介 2. HTTPS 协议 3. HTTPS 证书 4. 创建简单的HTTPS服务 4.1 生成HTTPS证书 4.2 ...

  7. WebRTC源码研究(1)WebRTC架构

    文章目录 WebRTC源码研究(1)WebRTC架构 1. WebRTC简介 2. WebRTC的能力 2.1 抓住属于WebRTC的5G时代风口 2.1.1 浏览器的支持情况 2.1.2 大厂的加入 ...

  8. 物联网云监控平台设备管理iot源码,MQTT/ONENET带APP端源码

    大型物联网平台全套源码 物联网云监控IOT设备管理源码带APP端 开发语言:PHP 数据库:MYSQL 开发工具:phpstrom 源码类型:全开源免费分享 物联网云监控WEB设备管理iot源码,MQ ...

  9. 二、Neo4j源码研究系列 - 单步调试

    二.Neo4j源码研究系列 - 单步调试 一.背景介绍 上一篇我们已经把了neo4j的源码准备以及打包流程完成了,本篇将讲解如何对neo4j进行单步调试.对于不了解如何编译打包neo4j的读者,请阅读 ...

  10. 一起谈.NET技术,.NET Framework源码研究系列之---万法归宗Object

    经过前面三篇关于.NET Framework源码研究系列的随笔,相信大家都发现其实.NET Framework的实现其实并不复杂,也许跟我们自己做的项目开发差不多.本人也是这样的看法.不过,经过仔细深 ...

最新文章

  1. python棋盘放米循环结构_Python递归法计算棋盘上所有路径总奖品最大值(京东2016编程题)...
  2. Java高级技术笔记
  3. 程序员面试题精选100题(40)-扑克牌的顺子[算法]
  4. 数据库系统概念总结:第八章 关系数据库设计
  5. vue-resource jsonp跨域问题解决方法
  6. C ++ 指针 | 指针与三维数组_6
  7. Google 公开被利用了两年的 iOS 漏洞;微博新推社交产品“绿洲”;微软全新终端 v0.4 发布 | 极客头条...
  8. 图像处理自学(三):CAMERAM处理流程总结
  9. 计算机组成原理试卷分析,《计算机组成原理与汇编语言》试卷分析报告.doc.docx...
  10. Atitit Queue consum algo 队列消费算法fifo lifo ro目录1. 队列消费算法 11.1. FIFO 先入先出 11.2. LIFO 后入先出 不能多开 1
  11. 山东大学软件学院2022数据化企业期末复习总结
  12. 新浪通行证在线申诉找回密码业务逻辑错误导致严重安全漏洞
  13. 在c语言中 函数的作用是什么,C语言程序中函数的定义
  14. MP3音频编解码芯片 VS1053B-L
  15. C++ Cstring类型使用
  16. Android Application Fundamentals——Android应用程序基础知识
  17. 超声波测距仪编程_Arduino轻松学Mixly编程第9课 超声波测距仪
  18. excel几个表合成一张_如何将几个excel文件合并_多个excel表合并成一个的方法
  19. 软件开发过程培训总结
  20. Java基于网络爬虫的股票信息收集软件

热门文章

  1. python to datetime_Python中缺少datetime.timedelta.to_seconds()-float?
  2. unity和python哪个好学_纠结学习Python还是unity3d_课课家教育
  3. java是什么_Java是什么?Java的特点有哪些?
  4. ai疾病风险因素识别_克服AI的“蠕动因素”
  5. colab 数据集_Google Colab上的YOLOv4:轻松训练您的自定义数据集(交通标志)
  6. pytorch深度学习入门_立即学习AI:01 — Pytorch入门
  7. python:遍历文件夹下的所有文件
  8. android studio线性渐变,使用Kotlin实现文字渐变TextView的代码
  9. Python删除文件中含有特定值的行
  10. python3 ValueError: The shape of the input to Flatten is not fully defined (got (0, 6, 80)