一、MoP是啥

MoP 即MQTT on Pulsar ,是一个在Pulsar基础上实现的MQTT协议,git地址:https://github.com/streamnative/mop
通过MoP可以快速的搭建一个MQTT服务器,下面介绍一下MoP的主要的代码结构。

二、MQTT消息定义

MQTT的消息在MoP中的定义是MqttMesssage,其代码如下:

public class MqttMessage {private final MqttFixedHeader mqttFixedHeader;private final Object variableHeader;private final Object payload;private final DecoderResult decoderResult;public static final MqttMessage PINGREQ;public static final MqttMessage PINGRESP;public static final MqttMessage DISCONNECT;
}
  • (1)MqttFixedHeader(固定头) :存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
  • (2)variableHeader(可变头):存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
  • (3)消息体(payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

三、MoP主要代码模块

  • 1、MQTTProtocolHandler:代码入口,继承自ProtocolHandler。
  • 2、MQTTProxyService:采用netty的方式,实现网络通信
    3、MQTTCommonInboundHandler:继承自ChannelInboundHandlerAdapter,实现ChannelInboundHandlerAdapter中的channelRead方法,获取客户端发送过来的消息,根据消息类型进入不同的处理流程。
public class MQTTCommonInboundHandler extends ChannelInboundHandlerAdapter {protected ProtocolMethodProcessor processor;  //协议处理@Overridepublic void channelRead(ChannelHandlerContext ctx, Object message) {checkArgument(message instanceof MqttMessage);checkNotNull(processor);MqttMessage msg = (MqttMessage) message;try {checkState(msg);MqttMessageType messageType = msg.fixedHeader().messageType();if (log.isDebugEnabled()) {log.debug("Processing MQTT Inbound handler message, type={}", messageType);}switch (messageType) {case CONNECT:checkArgument(msg instanceof MqttConnectMessage);processor.processConnect((MqttConnectMessage) msg);break;case SUBSCRIBE:checkArgument(msg instanceof MqttSubscribeMessage);processor.processSubscribe((MqttSubscribeMessage) msg);break;case UNSUBSCRIBE:checkArgument(msg instanceof MqttUnsubscribeMessage);processor.processUnSubscribe((MqttUnsubscribeMessage) msg);break;case PUBLISH:checkArgument(msg instanceof MqttPublishMessage);processor.processPublish((MqttPublishMessage) msg);break;case PUBREC:processor.processPubRec(msg);break;case PUBCOMP:processor.processPubComp(msg);break;case PUBREL:processor.processPubRel(msg);break;case DISCONNECT:processor.processDisconnect(msg);break;case PUBACK:checkArgument(msg instanceof MqttPubAckMessage);processor.processPubAck((MqttPubAckMessage) msg);break;case PINGREQ:processor.processPingReq();break;default:throw new UnsupportedOperationException("Unknown MessageType: " + messageType);}} catch (Throwable ex) {ReferenceCountUtil.safeRelease(msg);log.error("Exception was caught while processing MQTT message, ", ex);ctx.close();}}

如上,先将消息解析成MqttMessage类型,再从mqttFixedHeader中获取消息类型messageType,然后根据不同类型进行不同的处理流程。

4、MQTTProxyInboundHandler:继承自MQTTCommonInboundHandler,代码如下:

public class MQTTProxyInboundHandler extends MQTTCommonInboundHandler {private final MQTTProxyService proxyService;public MQTTProxyInboundHandler(MQTTProxyService proxyService) {this.proxyService = proxyService;}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);this.processor = new MQTTProxyProtocolMethodProcessor(proxyService, ctx);}
}

MQTTProxyInboundHandler通过实现channelActive,初始化processor MQTTProxyProtocolMethodProcessor类型。即处理mqtt消息主要是在MQTTProxyProtocolMethodProcessor中进行。

5、MQTTProxyProtocolMethodProcessor:继承自AbstractCommonProtocolMethodProcessor,主要用来处理MQTT消息。下面以处理CONNECT消息和PUBLISH消息为例来说明一下主要的逻辑。

1)处理CONNECT消息

 @Overridepublic void processConnect(MqttConnectMessage msg) {MqttConnectPayload payload = msg.payload();MqttConnectMessage connectMessage = msg;final int protocolVersion = msg.variableHeader().version();final String username = payload.userName();String clientId = payload.clientIdentifier();if (log.isDebugEnabled()) {log.debug("process CONNECT message. CId={}, username={}", clientId, username);}// Check MQTT protocol version.if (!MqttUtils.isSupportedVersion(protocolVersion)) {log.error("MQTT protocol version is not valid. CId={}", clientId);channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().unsupportedVersion());channel.close();return;}if (!MqttUtils.isQosSupported(msg)) {channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().willQosNotSupport(protocolVersion));channel.close();return;}// Client must specify the client ID except enable clean session on the connection.if (StringUtils.isEmpty(clientId)) {if (!msg.variableHeader().isCleanSession()) {channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().identifierInvalid(protocolVersion));channel.close();log.error("The MQTT client ID cannot be empty. Username={}", username);return;}clientId = MqttMessageUtils.createClientIdentifier(channel);connectMessage = MqttMessageUtils.stuffClientIdToConnectMessage(msg, clientId);if (log.isDebugEnabled()) {log.debug("Client has connected with generated identifier. CId={}", clientId);}}String userRole = null;if (!authenticationEnabled) {if (log.isDebugEnabled()) {log.debug("Authentication is disabled, allowing client. CId={}, username={}", clientId, username);}} else {MQTTAuthenticationService.AuthenticationResult authResult = authenticationService.authenticate(payload);if (authResult.isFailed()) {channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().authFail(protocolVersion));channel.close();log.error("Invalid or incorrect authentication. CId={}, username={}", clientId, username);return;}userRole = authResult.getUserRole();}doProcessConnect(connectMessage, userRole);}

当MoP接收到connect类型消息时,调用processConnect方法,进行一系列的消息校验,然后调用 doProcessConnect(connectMessage, userRole)来创建连接

 public void doProcessConnect(MqttConnectMessage msg, String userRole) {connection = Connection.builder().protocolVersion(msg.variableHeader().version()).clientId(msg.payload().clientIdentifier()).userRole(userRole).cleanSession(msg.variableHeader().isCleanSession()).connectMessage(msg).keepAliveTime(msg.variableHeader().keepAliveTimeSeconds()).channel(channel).connectionManager(connectionManager).serverReceivePubMaximum(proxyConfig.getReceiveMaximum()).build();connection.sendConnAck();}

通过创建好的connection对象来和客户端通信,并发送消息ack通知客户端建立连接成功。

2)处理PUBLISH消息

//处理PUBLISH消息
public void processPublish(MqttPublishMessage msg) {if (log.isDebugEnabled()) {log.debug("[Proxy Publish] publish to topic = {}, CId={}",msg.variableHeader().topicName(), connection.getClientId());}final int packetId = msg.variableHeader().packetId();final String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(msg.variableHeader().topicName(),proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(TopicName.get(pulsarTopicName));lookupResult.thenCompose(brokerAddress -> writeToBroker(brokerAddress, pulsarTopicName, msg)).exceptionally(ex -> {msg.release();log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}",msg.variableHeader().topicName(), connection.getClientId(), ex);MopExceptionHelper.handle(MqttMessageType.PUBLISH, packetId, channel, ex);return null;});}
//写入消息到broker中private CompletableFuture<Void> writeToBroker(InetSocketAddress mqttBroker, String topic, MqttMessage msg) {CompletableFuture<MQTTProxyExchanger> proxyExchanger = connectToBroker(mqttBroker, topic);return proxyExchanger.thenCompose(exchanger -> writeToBroker(exchanger, msg));}

从msg中获取topicName,再根据topicName获取broker地址,再根据broker地址、topicName将消息写入broker

MQTT协议框架MOP代码结构解析相关推荐

  1. Android蓝牙系统框架和代码结构

    Android蓝牙系统框架和代码结构 概述 在 Android 4.2版本中,谷歌公司和博通合作,引入了博通的 BTE/BTA 协议栈,重构了蓝牙子系统.新的蓝牙协议栈被命名为 BlueDroid.它 ...

  2. smali代码结构解析

    smali代码结构解析 一.开头(类的声明) 1.声明一个类继承自某个类 java代码如下 public class MainActivity extends AppCompatActivity 对应 ...

  3. Android之高仿雅虎天气(二)---代码结构解析

    版本已升级至1.0.1 源码地址: GitHub:https://github.com/way1989/WayHoo OsChina:http://git.oschina.net/way/WayHoo ...

  4. xfire框架内部基本结构解析

    1 概述 xfire是webservice的一个实现框架,是apache旗下CXF的前身,是一个比较被广泛使用的webservice框架,网上有很多关于如何使用xfire或cxf的hello worl ...

  5. 合泰32-Onenet-WiFi模块-合泰单片机通过MQTT协议数据上云(二)

    本篇目标 一.学习MQTT协议 二.发送温度湿度数据到OneNET 准备材料 感觉麻烦的小伙伴请移步到文末,那里有整个工程的链接. (关于串口和WiFi部分的内容,可以查看前一篇博文,链接:这个是传送 ...

  6. 物联网平台通信协议之 MQTT 协议

    物联网平台通信协议之 MQTT 协议 文章目录 物联网平台通信协议之 MQTT 协议 MQTT 概述 MQTT 数据格式 MQTT 概述 MQTT( Message Queuing Telemetry ...

  7. DDD 实战 (2):看看代码结构长啥样(值得收藏)

    Talking is easy, show me your code 真正开始 DDD 旅程前,我想让您看到经过 DDD 设计之后的代码长啥样.我想,这是所有本着"talking is ea ...

  8. DDD实战(2):看看代码结构长啥样

    Talking is easy, show me your code 真正开始 DDD 旅程前,我想让您看到经过 DDD 设计之后的代码长啥样.我想,这是所有本着"talking is ea ...

  9. MQTT协议详解及开发教程(一)MQTT协议概述

    推荐一款稳定的基于C编写的MQTT Client开源库<cMQTT> 一 概述 MQTT协议目前在物联网技术中应用非常广泛,各种公有云的IOT平台通信基本上都是按照该协议来实现的,这里先简 ...

最新文章

  1. yii mailer 扩展发送邮件
  2. pip 安装报错,is not a supported wheel on this platform
  3. Android怎么实现选课功能,选课系统android
  4. [工具]微软的学习平台Microsoft Learn很好用,推荐一下
  5. linux下的驱动大小,(转)Linux驱动开发需要注意的点/KO大小/内存管理
  6. mysql的每隔1分钟定时_深入研究MySQL(四)、备份与恢复
  7. 大数据,并非一蹴而就
  8. 设计模式之--单例模式
  9. 微课|《Python编程基础与案例集锦(中学版)》第4章例题讲解(1)
  10. 机试题:地图定位、拍照并显示、录制视频并播放
  11. GitHub使用笔记
  12. 知识竞赛时,竞赛活动主题及环节主题如何修改?
  13. java jsp高校贫困生助学贷款系统ssm框架
  14. 360云服务器合作,360云主机速度(云服务器)
  15. 教师计算机培训汇报ppt,教师培训工作总结ppt模板
  16. hdu4114.Disney's FastPass
  17. 微型计算机原理实验二,微机原理实验2
  18. python-测试代码
  19. 力扣(Leetcode)695. 岛屿的最大面积(Java)带注释
  20. 2011-2020年股票的收益率上下波动比率数据集

热门文章

  1. 5G PRB和RBG关系
  2. APP界面保持屏幕常亮方法
  3. Docker服务正常运行一段时间后突然无法访问问题排查
  4. 多种常见传感器工作原理
  5. 爱好-文化-冢-象冢:《最后一头战象》沈石溪
  6. iphone个系列尺寸_苹果iPhone 12系列手机遭曝光 解决信号问题将实锤
  7. 一个向上帝买了挂的男人
  8. 【自动化】火车头采集器
  9. 关闭计算机系统英语,电脑系统英文肿么关机
  10. windows找不到文件请确定文件名是否正确怎么办?