mqtt协议-broker之moqutte源码研究二之Connect报文处理
先上一个图,大概说明一下moquette 的类之间的关系
一.ProtocolProcessor类
该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍
protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有当前正在处理的订阅关系的存储,之所以有这个是过滤无效的订阅请求
private SubscriptionsDirectory subscriptions;//订阅目录,本质上是topic树
private ISubscriptionsStore subscriptionStore;//所有的订阅的集合
private boolean allowAnonymous;//是否允许匿名连接
private boolean allowZeroByteClientId;//是否允许clientId为空
private IAuthorizator m_authorizator; //对topic的读写权限认证private IMessagesStore m_messagesStore;//retainMessage的存储private ISessionsStore m_sessionsStore;//session 存储private IAuthenticator m_authenticator;//连接时候的鉴权认证
private BrokerInterceptor m_interceptor;//各个层面的拦截器private Qos0PublishHandler qos0PublishHandler;//qos0拦截器
private Qos1PublishHandler qos1PublishHandler;//qos1拦截器
private Qos2PublishHandler qos2PublishHandler;/qos2拦截器
private MessagesPublisher messagesPublisher;//分发消息,遗愿消息,以及集权间同步消息
private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重发器ConcurrentMap<String, WillMessage> m_willStore//遗愿消息存储几乎所有的功能的源头都在这个类里面
二.对14种报文的处理,都在ProtocolProcessor类,后面会分篇挨个讲解moquette对这14个报文的处理
具体哪14中文报文如下
名字 值 报文流动方向 描述
Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留
或者到这里看更详细的mqtt中文翻译
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感谢作者的辛劳工作和无私分享
三.debug跟踪moquette 对CONNECT报文的处理
大概分为以下几步
1.验证协议版本,如果不是mqtt-3.1或者mqtt-3.1.1则拒绝连接
2.验证clientId是否为空,如果为空,但是配置的时候(在上篇介绍的moquette.cof里面配置)要求不允许唯恐,即上面的allowZeroByteClientId或者cleanSession为false即要求保存会话,则视为不合法,拒绝连接,否则由moquette生成clientId
3.验证是否有登录的权限
这里面贴上源码讲解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}
3.1.如果CONNETCT报文里面的可变头里面没有用户名,直接返回true
3.2.如果有用户名,同时有密码,从可变头取出密码,调用m_authenticator进行验证
3.3 如果有用户名,没有密码,认证失败,拒绝连接
3.4 如果没有用户名,同时配置为不允许匿名,则认证失败
4.创建连接描述符,连接描述符包括clientId,channel,isCleanSession,ConnectState,同时判断连接描述符集合里面是否包括该连接描述符,如果包含,代表该连接以及建立,断开连接
5.根据CONNECT报文里面的Keep Alive time 来设置tcp参数
6.根据CONNECT报文遗愿消息标志位,觉得是否存储遗愿消息
7.返回CONNACK报文,这里面把返回CONNACK报文单独讲解一下
private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {LOG.info("Sending connect ACK. CId={}", clientId);final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);if (!success) {return false;}MqttConnAckMessage okResp;ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);boolean isSessionAlreadyStored = clientSession != null;if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);} else {okResp = connAck(CONNECTION_ACCEPTED);}if (isSessionAlreadyStored) {LOG.info("Cleaning session. CId={}", clientId);clientSession.cleanSession(msg.variableHeader().isCleanSession());}descriptor.writeAndFlush(okResp);LOG.info("The connect ACK has been sent. CId={}", clientId);return true;
}7.1 判断当前连接的状态,怎么判断的呢?这里面用了AtomicReference<ConnectionState>通过调用原子引用类 compareAndSet(DISCONNECTED, SENDACK)来解决并发修改连接状态的问题。7.2如果状态是disConnect,将状态修改为sendAck7.3 如果CONNETCT报文里面的CleanSession标识设置为0同时broker已经有了client的会话,将CONNACK报文里面的连接确认标志设为1,告诉客户端,broker已经有了响应的会话信息。否则将连接确认标志设为07.4 如果已经存在相应的client的会话,则根据新的连接,更新clientSession里面的是否清理session属性
8.唤醒拦截器记录连接事件
9.创建或者从新加载clientSession,这里面单独讲解一下
private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,String clientId) {final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);if (!success) {return null;}ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);boolean isSessionAlreadyStored = clientSession != null;if (!isSessionAlreadyStored) {clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());}if (msg.variableHeader().isCleanSession()) {LOG.info("Cleaning session. CId={}", clientId);clientSession.cleanSession();}return clientSession;
}9.1 AtomicReference<ConnectionState>通过调用原子引用类 compareAndSet(SENDACK, SESSION_CREATED)将连接状态从sendAck修改为session_create9.2 session存储结合里面,是否已经存在会话信息,如果不存在,创建一个新的clientsession9.3 如果存在,根据CONNETCT报文里面的cleansession自动决定是否清理调旧的会话信息。
10.如果CONNETCT报文要求不清理会话信息(cleansession标志位为0),则重发QoS1 and QoS2 messages,同时将连接状态从session_create修改成message_republish
11.将连接状态从session_create修改成established
到此,broker和client直接的mqtt连接正式建立,后面client可以开始发送SUBSCRIBE或者PUBLISH报文了。
在这里再补充一点,对于broker来说,建立连接的过程中,连接状态会从disConnect->sendAck->session_create->message_republish->established,之所以要设置这些状态,是因为,每一步后面的操作都要基于前面的状态来决定是否需要真正执行,这里面用到了原子引用类来保证,状态的修改这个操作的原子行,确保了在并发的情况下,每一步操作都是条件满足的。
下面一篇将会讲解SUBSCRIBE报文的处理
转载于:https://blog.51cto.com/13579730/2073630
mqtt协议-broker之moqutte源码研究二之Connect报文处理相关推荐
- tcp/ip 协议栈Linux源码分析二 IPv4分片报文重组分析二
继续接着上篇讲,之前我们说过,收到分片报文后首先会检查分片报文所占内存是否过大,如果超过阈值的话就要调用ip_evictor函数去释放一些旧的分片队列,关于如何释放分片队列资源上一篇已经总结完成,接下 ...
- 物联网协议之MQTT源码分析(二)
此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦. juejin.im/post/5cd66 ...
- WebRTC源码研究(4)web服务器工作原理和常用协议基础
文章目录 WebRTC源码研究(4)web服务器工作原理和常用协议基础 前言 做WebRTC 开发为啥要懂服务器开发知识 1. Web 服务器简介 2. Web 服务器的类型 3. Web 服务器的工 ...
- WebRTC源码研究(4)web服务器工作原理和常用协议基础(转载)
前言 前面3篇博客分别对WebRTC框架的介绍,WebRTC源码目录,WebRTC的运行机制进行了介绍,接下来讲解一点关于服务器原理的知识.后面博客会写关于WebRTC服务器相关的开发,目前git上面 ...
- Apache Camel源码研究之Rest
本文以Camel2.24.3 + SpringBoot2.x 为基础简单解读Camel中的Rest组件的源码级实现逻辑. 0. 目录 1. 前言 2. 源码解读 2.1 启动时 2.1.1 `Rest ...
- WebRTC源码研究(7)创建简单的HTTPS服务
文章目录 WebRTC源码研究(7)创建简单的HTTPS服务 1. HTTPS简介 2. HTTPS 协议 3. HTTPS 证书 4. 创建简单的HTTPS服务 4.1 生成HTTPS证书 4.2 ...
- WebRTC源码研究(1)WebRTC架构
文章目录 WebRTC源码研究(1)WebRTC架构 1. WebRTC简介 2. WebRTC的能力 2.1 抓住属于WebRTC的5G时代风口 2.1.1 浏览器的支持情况 2.1.2 大厂的加入 ...
- 物联网云监控平台设备管理iot源码,MQTT/ONENET带APP端源码
大型物联网平台全套源码 物联网云监控IOT设备管理源码带APP端 开发语言:PHP 数据库:MYSQL 开发工具:phpstrom 源码类型:全开源免费分享 物联网云监控WEB设备管理iot源码,MQ ...
- 二、Neo4j源码研究系列 - 单步调试
二.Neo4j源码研究系列 - 单步调试 一.背景介绍 上一篇我们已经把了neo4j的源码准备以及打包流程完成了,本篇将讲解如何对neo4j进行单步调试.对于不了解如何编译打包neo4j的读者,请阅读 ...
- 一起谈.NET技术,.NET Framework源码研究系列之---万法归宗Object
经过前面三篇关于.NET Framework源码研究系列的随笔,相信大家都发现其实.NET Framework的实现其实并不复杂,也许跟我们自己做的项目开发差不多.本人也是这样的看法.不过,经过仔细深 ...
最新文章
- python棋盘放米循环结构_Python递归法计算棋盘上所有路径总奖品最大值(京东2016编程题)...
- Java高级技术笔记
- 程序员面试题精选100题(40)-扑克牌的顺子[算法]
- 数据库系统概念总结:第八章 关系数据库设计
- vue-resource jsonp跨域问题解决方法
- C ++ 指针 | 指针与三维数组_6
- Google 公开被利用了两年的 iOS 漏洞;微博新推社交产品“绿洲”;微软全新终端 v0.4 发布 | 极客头条...
- 图像处理自学(三):CAMERAM处理流程总结
- 计算机组成原理试卷分析,《计算机组成原理与汇编语言》试卷分析报告.doc.docx...
- Atitit Queue consum algo 队列消费算法fifo lifo ro目录1. 队列消费算法 11.1. FIFO 先入先出 11.2. LIFO 后入先出 不能多开 1
- 山东大学软件学院2022数据化企业期末复习总结
- 新浪通行证在线申诉找回密码业务逻辑错误导致严重安全漏洞
- 在c语言中 函数的作用是什么,C语言程序中函数的定义
- MP3音频编解码芯片 VS1053B-L
- C++ Cstring类型使用
- Android Application Fundamentals——Android应用程序基础知识
- 超声波测距仪编程_Arduino轻松学Mixly编程第9课 超声波测距仪
- excel几个表合成一张_如何将几个excel文件合并_多个excel表合并成一个的方法
- 软件开发过程培训总结
- Java基于网络爬虫的股票信息收集软件
热门文章
- python to datetime_Python中缺少datetime.timedelta.to_seconds()-float?
- unity和python哪个好学_纠结学习Python还是unity3d_课课家教育
- java是什么_Java是什么?Java的特点有哪些?
- ai疾病风险因素识别_克服AI的“蠕动因素”
- colab 数据集_Google Colab上的YOLOv4:轻松训练您的自定义数据集(交通标志)
- pytorch深度学习入门_立即学习AI:01 — Pytorch入门
- python:遍历文件夹下的所有文件
- android studio线性渐变,使用Kotlin实现文字渐变TextView的代码
- Python删除文件中含有特定值的行
- python3 ValueError: The shape of the input to Flatten is not fully defined (got (0, 6, 80)