欢迎加入ThingsBoard技术交流群


这里可复制Q群号:69998183

关注“云腾五洲”:获取二开ThingsBoard物联网平台演示

交流社区:ThingsKit-ThingsBoard社区

ThingsBoard源码分析5-如何接收MQTT连接

1. MQTT server

需要接收设备的MQTT连接,那么thingsboard中必然有MQTT服务器,MQTT服务器创建的类是MqttTransportService

基于netty的mqtt server,添加了MqttTransportServerInitializer的处理类,并向ChannelPipeline添加了netty的MqttDecoderMqttEncoder让我们可以忽略MQTT消息的编解码工作,重要的是添加了MqttTransportHandler

2. MqttTransportHandler处理连接

此例中,我们首先需要创建租户,租户管理员,并添加设备,使用MQTT Box模拟硬件设备,拷贝ACCESS TOKEN做为MQTT Box的Username开始连接我们的thingsboard后台

如果图片看不清楚,请点击:

  • 标准:https://cdn.iotschool.com/photo/2020/00e26598-e91a-4a08-b557-18b204bec6c9.png?x-oss-process=image/resize,w_1920
  • 高清:https://p.pstatp.com/origin/137b60001339a846253dd

由于没有使用ssl,收到连接请求以后,便会调用

private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {String userName = msg.payload().userName();log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);if (StringUtils.isEmpty(userName)) {ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));ctx.close();} else {//取出userName,构造protobuf的类(方便传输与解析),交给transportService处理。此时会使用到源码解析第三篇DefaultTransportService的解析的相关信息了解process的处理。参阅下方①的详细解析。transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {onValidateDeviceResponse(msg, ctx);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));ctx.close();}});}
}
  1. DefaultTransportServiceprocess方法构造了异步任务,成功调用onSuccessConsumer,失败调用onFailureConsumer

  2. 将验证用户的任务交由transportApiRequestTemplate.send

public ListenableFuture<Response> send(Request request) {if (tickSize > maxPendingRequests) {return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));}UUID requestId = UUID.randomUUID();request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));//由第三篇文章的分析得出,此topic时tb_transport.api.responses.localHostNamerequest.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));//参阅第一篇基础知识的介绍,来自谷歌的库,settableFuture,可设置结果的完成SettableFuture<Response> future = SettableFuture.create();ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);//将future放到pendingRequests中②pendingRequests.putIfAbsent(requestId, responseMetaData);log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);//将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {log.trace("[{}] Request sent: {}", requestId, metadata);}@Overridepublic void onFailure(Throwable t) {pendingRequests.remove(requestId);future.setException(t);}});return future;
}
  1. 根据第三篇TbCoreTransportApiService的分析,我们发现DefaultTbQueueResponseTemplate的成员变量requestTemplateconsumer刚好是订阅的tb_transport.api.requests的消息:
......
requests.forEach(request -> {long currentTime = System.currentTimeMillis();long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));if (requestTime + requestTimeout >= currentTime) {byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);if (requestIdHeader == null) {log.error("[{}] Missing requestId in header", request);return;}//获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localHostNamebyte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);if (responseTopicHeader == null) {log.error("[{}] Missing response topic in header", request);return;}UUID requestId = bytesToUuid(requestIdHeader);String responseTopic = bytesToString(responseTopicHeader);try {pendingRequestCount.getAndIncrement();//调用handler进行处理消息AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),response -> {pendingRequestCount.decrementAndGet();response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));//handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localHostNameresponseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);},e -> {pendingRequestCount.decrementAndGet();if (e.getCause() != null && e.getCause() instanceof TimeoutException) {log.warn("[{}] Timeout to process the request: {}", requestId, request, e);} else {log.trace("[{}] Failed to process the request: {}", requestId, request, e);}},requestTimeout,timeoutExecutor,callbackExecutor);.......
  1. 具体验证逻辑:
@Override
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();// protobuf构造的类中判定是否包含需要验证的信息块if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();//调用validateCredentials,具体内容就是查询deviceInfo,并将结果交由第二个Function进行进一步处理return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());} ......
  1. 当通过设备的acess token找到了deviceInfo,便会通过消息中间件将DeviceInfo发出来,topic是tb_transport.api.responses.localHostName,在第三篇的分析中,DefaultTransportServicetransportApiRequestTemplate即订阅此topic:
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {log.trace("Polling responses completed, consumer records count [{}]", responses.size());
} else {continue;
}
responses.forEach(response -> {byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);UUID requestId;if (requestIdHeader == null) {log.error("[{}] Missing requestId in header and body", response);} else {requestId = bytesToUuid(requestIdHeader);log.trace("[{}] Response received: {}", requestId, response);//参见上②,将验证的future放入到pendingRequests中,现在通过设置的requestId取出来ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);if (expectedResponse == null) {log.trace("[{}] Invalid or stale request", requestId);} else {//设置settableFuture的结果expectedResponse.future.set(response);}}
......
  1. DefaultTransportServiceprocess异步请求获得了返回的结果,此时调用onSuccess回调,即调用MqttTransportHandleronValidateDeviceResponse
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {if (!msg.hasDeviceInfo()) {ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));ctx.close();} else {deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());sessionInfo = SessionInfoProto.newBuilder().setNodeId(context.getNodeId()).setSessionIdMSB(sessionId.getMostSignificantBits()).setSessionIdLSB(sessionId.getLeastSignificantBits()).setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB()).setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB()).setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()).setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()).setDeviceName(msg.getDeviceInfo().getDeviceName()).setDeviceType(msg.getDeviceInfo().getDeviceType()).build();//创建SessionEvent.OPEN的消息,调用sendToDeviceActor方法,包含sessionInfotransportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {.......
  1. sendToDeviceActor的实现:
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {//创建tpi,此时会选择一个固定的partition Id,组成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
......//使用tbCoreMsgProducer发送到消息队列,设置了toDeviceActorMsgtbCoreMsgProducer.send(tpi,new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?new TransportTbQueueCallback(callback) : null);
}
  1. 此时第二篇基于DefaultTbCoreConsumerService可以知道DefaultTbCoreConsumerService 的消费者订阅该主题的消息:
try {ToCoreMsg toCoreMsg = msg.getValue();if (toCoreMsg.hasToSubscriptionMgrMsg()) {log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);} else if (toCoreMsg.hasToDeviceActorMsg()) {log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());//交由此方法进行处理forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);}
  1. forwardToDeviceActor对消息的处理

    private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {if (statsEnabled) {stats.log(toDeviceActorMsg);}//创建type为TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,并交给AppActor处理actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
    }
    
  2. 通过第四篇的总结3,我们可以直接去看AppActordoProcess方法对此类型消息的处理,跟踪发现AppActor将消息转给了TenantActor, TenantActor创建了DeviceActor,并将消息转给了DeviceActor;

  3. DeviceActor拿到此类型的消息,进行了如下的处理:

    protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case TRANSPORT_TO_DEVICE_ACTOR_MSG://包装成TransportToDeviceActorMsgWrapper交由processor处理,并继续调用processSessionStateMsgsprocessor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
    
  4. processSessionStateMsgs的处理:

    private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {UUID sessionId = getSessionId(sessionInfo);if (msg.getEvent() == SessionEvent.OPEN) {.....sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));if (sessions.size() == 1) {// 将调用pushRuleEngineMessage(stateData, CONNECT_EVENT);reportSessionOpen();}//将调用pushRuleEngineMessage(stateData, ACTIVITY_EVENT);systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());dumpSessions();}
    ....
    
  5. 由于CONNECT_EVENTACTIVITY_EVENT仅仅类型不同,以下暂时只分析CONNECT_EVENT

    public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {if (tenantId.isNullUid()) {if (entityId.getEntityType().equals(EntityType.TENANT)) {tenantId = new TenantId(entityId.getId());} else {log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);return;}}//和第7点类似,创建的tpi的fullTopicName的例子 tb_rule_engine.main.1TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).setTbMsg(TbMsg.toByteString(tbMsg)).build();producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);toRuleEngineMsgs.incrementAndGet();
    }
    
  6. 通过第二篇的分析DefaultTbRuleEngineConsumerService订阅了此topic: tb_rule_engine.main.1的消息,收到消息以后,调用forwardToRuleEngineActor方法,包裹成QUEUE_TO_RULE_ENGINE_MSG类型的消息,交由AppActor进行分发处理;

  7. AppActor交给TenantActor处理,TenantActor交给RootRuleChain处理,RuleChainActor交给firstRuleNode处理,也就是某一个RuleNodeActor;

  8. 打开前端RULE CHAINS的界面,会发现,MESSAGE TYPE SWITCH是接收input的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是TbMsgTypeSwitchNode

  9. 进入TbMsgTypeSwitchNodeonMsg方法(实际上所有的ruleNode处理消息的方法都是onMsg),发现根据messageType(此时是CONNECT_EVENT)定义了relationtype并调用ctx.tellNext(msg, relationType);

  10. 此时DefaultTbContext创建一个RuleNodeToRuleChainTellNextMsg,类型是RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,交给RuleChainActor处理;

  11. 接下来将会进入到RuleChainActorMessageProcessoronTellNext方法:

    private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {try {checkActive(msg);//消息来源EntityId entityId = msg.getOriginator();//创建一个tpi,可能会使用TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);//查询有关系的RuleNode,其实就是从relation表中查询,该消息来源的id,relation_type和在TbMsgTypeSwitchNode定义的relationType一直的节点id,如上Connect Event就没有找到相应的relation的RuleNodeIdList<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream().filter(r -> contains(relationTypes, r.getType())).collect(Collectors.toList());int relationsCount = relations.size();//Connect Event就没有找到相应的relation的RuleNodeId,消息通过规则引擎,已经处理完成if (relationsCount == 0) {log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());if (relationTypes.contains(TbRelationTypes.FAILURE)) {RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);if (ruleNodeCtx != null) {msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));} else {log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));}} else {msg.getCallback().onSuccess();}//举例:Post telemetry的type可以找到相应的ruleNode,实现类是:TbMsgTimeseriesNode,那么此消息将会交给TbMsgTimeseriesNode处理} else if (relationsCount == 1) {for (RuleNodeRelation relation : relations) {log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());pushToTarget(tpi, msg, relation.getOut(), relation.getType());}} else {MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);for (RuleNodeRelation relation : relations) {EntityId target = relation.getOut();putToQueue(tpi, msg, callbackWrapper, target);}}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));}
    }
    

    What’s more:

    如上面的举例,比如是遥测数据Post telemetry,将会使用TbMsgTimeseriesNodeonMsg做进一步的处理,比如存储数据,再通过webSocket进行数据的更新如果有webSocket的session的话,或者其他通知消息,就不详细展开了。

总结:

  1. 处理MQTT的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,RPC请求发送与接收,大体流程大同小异;

  2. 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;

  3. Actor的模型就是根据消息的类型,使用AppActor进行一步步的分发,最终交由合适的RuleNode进行处理;

  4. Protobuf类型的消息容易序列化传输与解析,所以在thingsboard中大量使用,但是生成的类可读性不是很高,可以选择直接读queue.proto文件,对类有感性的认知。

    ​ 由于作者水平有限,只是梳理了大致的流程,文章难免出现纰漏,望谅解并指正。

ThingsBoard 二次开发之源码分析 5-如何接收 MQTT 连接相关推荐

  1. 一步步实现windows版ijkplayer系列文章之二——Ijkplayer播放器源码分析之音视频输出——视频篇...

    一步步实现windows版ijkplayer系列文章之一--Windows10平台编译ffmpeg 4.0.2,生成ffplay 一步步实现windows版ijkplayer系列文章之二--Ijkpl ...

  2. glibc-2.23学习笔记(二)—— free部分源码分析

    glibc-2.23学习笔记(二)-- free部分源码分析 _libc_free _int_free 函数定义 局部变量 start fast bins部分 unsorted bins部分 mmap ...

  3. 循序渐进,探寻Excel二次开发.NET源码(3)-ExcelBase类

    循序渐进,探寻Excel二次开发.NET源码(3)-ExcelBase类 --Excel打开关闭打印预览 作者:长江支流 关键字:.NET.Excel.Excel打开.Excel关闭.Excel打印预 ...

  4. 类似爱美刻 右糖 轻剪辑 捷映 秀展网 秀多多 来画 创视网 传影 影大师 闪剪源码 技术源头 二次开发 提供源码 逗拍 趣推 飞推 美册 搞定视频 简影 剪影 爱字幕 幸福相册 八角星

    需要源码的下面评论 介绍 类似爱美刻 右糖 轻剪辑 捷映 秀展网 秀多多 来画 创视网 传影 影大师 闪剪源码 技术源头 二次开发 提供源码. 类似 逗拍 趣推 飞推 美册 搞定视频 简影 剪影 爱字 ...

  5. python接入Vissim二次开发,源码

    python接入Vissim二次开发,源码,刚开始学习,为了写论文,一样的朋友可以一块研究代码 代码地址 DQN VISSIM4.3 tensorflow1.2.0 https://github.co ...

  6. 仿爱奇艺视频,腾讯视频,搜狐视频首页推荐位轮播图(二)之SuperIndicator源码分析

    转载请把头部出处链接和尾部二维码一起转载,本文出自逆流的鱼:http://blog.csdn.net/hejjunlin/article/details/52510431 背景:仿爱奇艺视频,腾讯视频 ...

  7. 以太坊ETH-智能合约开发-solidity源码分析-truffle进阶

    0. 背景 上一篇文章我们从合约编写.编译.部署.交互等几个方面介绍了truffle的大致用法. 本篇主要继续深入地介绍truffle的高级用法 + 合约源码分析 1. 将合约部署到测试网Ropste ...

  8. 【图像】【OpenCV鱼眼矫正】二、fisheye::initUndistortRectifyMap()源码分析

    目录 一.fisheye::initUndistortRectifyMap() 之 功能介绍 二.fisheye::initUndistortRectifyMap() 之 源码分析 1. 源码分析 2 ...

  9. Nimbus二storm启动nimbus源码分析-nimbus.clj

    nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &a ...

最新文章

  1. lstm timestep一般是多少_请问rnn和lstm中batchsize和timestep的区别是什么?
  2. Python | 一万多条拼车数据,看春运的迁徙图
  3. 实例——在编程过程中进行单元测试
  4. HDU 2955 Robberies
  5. php 注册树,php常用设计模式(单例,工厂,注册树模式)
  6. 垂直居中及容器内图片垂直居中的CSS解决方法
  7. jsonp和CORS跨域实现
  8. 【远程沟通】“云答辩”“云招聘”双管齐下,解救“最难毕业生”
  9. 深度学习——用softmax函数来规范可变参数
  10. echarts源码打包_Echarts模块v1.5更新【更新支持多线程,封装大量快速方法,增加史上最详细示例】...
  11. fun在c语言中意义与用法,fun的用法
  12. Win10任务栏卡死,无响应,一直转圈,点不动
  13. postgresSQL的FDE加密
  14. Android6.0运行时权限处理
  15. 北斗短报文和北斗定位入门篇
  16. 计算机学猫叫音乐,抖音学猫叫音乐 抖音学猫叫什么歌
  17. tensorflow 中文字体训练集_TensorFlow与中文手写汉字识别
  18. 非参数检验-Wilcoxon,Wilcoxon-Mann-Whitney符号秩检验以及Pearson,Spearman秩,Kendall τ相关检验(附带实例-R实现)
  19. 带你全面掌握高级知识点!java如何实现登录注册
  20. linux优化网页加载过程,【zz】Linux起步过程中硬件模块的加载

热门文章

  1. 《自律100天,穿越人生盲点》读书笔记
  2. 解放双手——Android的自动化构建及发布
  3. 说说抖音和小红书的交互和界面设计,您更喜欢哪一个?
  4. 海外SDK之----------苹果支付
  5. 为您的DC/DC 转换器选择最佳开关频率
  6. Android解析服务器Json数据实例
  7. 电脑怎么录制屏幕视频,3种方法,轻松弄懂
  8. 《数字图像处理》读书笔记2:数字图像处理基础
  9. Arduino框架下STM32F1/F4系列HID模式程序烧录教程
  10. java毕业设计软件工程专业教辅平台课程子系统mybatis+源码+调试部署+系统+数据库+lw