文章目录

  • 1.回顾RocketMQ的工作流程
  • 2.从nameserv获取topic信息
  • 3.创建topic

1.回顾RocketMQ的工作流程

在我们之前的文章中,我们讲过RocketMQ的一个整体流程,我们先要启动nameserv,然后再启动broker,broker启动的时候会向nameserv注册,然后我们消息生产者在启动的时候会启动很多的后台任务,其中有一个任务就是定时更新topic的信息,定时去nameserv上面获取对应topic的信息,这个任务默认的请求间隔是30s,还有检查broker状态,发送心跳的任务等等,在消息生产者发送消息的时候会从topic对应的若干个MessageQueue中选择一个合适的使用,然后进行封装消息发送,我们说过发送消息的模式有三种,分别是同步发送,异步发送,单向发送。

虽然本篇主要是介绍下topic详细信息的获取,这里获取topic信息其实是有两种触发时机,一个是生产者它有个定时任务,然后定时去nameserv上面获取,获取到了之后,更新下本地缓存的topic table,其实就是一个map,还有个就是生产在发送消息的时候,会根据topic去本地的 topic table中获取 对应topic 的信息,如果本地缓存中没有,它就会将topic 创建一个对应的topicPublishInfo对象,这个对象是空的,然后请求更新这个topic信息

可以大体看下这个代码,其实就是这个意思。再介绍完topic 获取之后,然后才会介绍发送消息的几种模式。

2.从nameserv获取topic信息

好了,下面就正式介绍消息消费者怎样去nameserv获取topic信息。

这个就是它的定时任务,默认是30s执行一次的

它是将consumer 与producer 所有topic 放到一个topic集合中,然后遍历这个集合,一个一个topic请求更新。
接下来我们来看下这个updateTopicRouteInfoFromNameServer 方法,这方法有点长,我们一部分一部分的介绍,大体上分为2个部分吧,第一个部分是获取对应topic的信息,然后第二部分就是更新本地的topic table 缓存。
先来看下获取部分代码:

if是true这段是你没有topic 然后需要创建topic 的时候干的事,它会向nameserv获取topic 为TBW102的值,然后获取它对应的那个topicRouteData。
fasle的时候进入else里面,这个就是拿着topic去nameserv那获取对应的topicRouteData。
这里需要解释下这个TopicRouteData,可以理解为里面存了两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面
稍微看下:

接着我们追踪下从nameserv获取topic信息的代码:

我们可以看到,先是封装了一个获取路由的请求头,然后将topic 设置到请求头里面,然后又将请求头封装成RemotingCommand,其实不管是requestHeader还是RemotingCommand,其实都是存储信息的一些实体,只是代表的含义不一样而已,你可以把RemotingCommand 看作是http协议的格式,想想http有请求行,请求头,请求体,然后他这个requestHeader算是个请求头,然后RemotingCommand里面还有body属性可以看作是请求体,需要注意的是,它设置了一个RequestCode是GET_ROUTEINTO_BY_TOPIC,其实nameserv就是根据这个请求code判断出来你要干什么,只有你把GET_ROUTEINTO_BY_TOPIC 告诉nameserv的时候,nameserv才能知道你要从我这里获取这个topic的路由信息。
接着就是调用remotingClient发送消息了,它这里用的是同步发送的方式(这里暂时先不说了,其实就是使用netty client 发送消息, 后面有提到),也就是阻塞等着响应过来,超时时间默认是3s,再往下看如果这个响应code是success的话,就把body弄出来然后反序列化话成 TopicRouteData对象。

其实这里有个问题,就是我们有多个nameserv ,比如说我们有3台nameserv ,那么生产者是找谁获取的呢?其实它是这个样子的,你上次用的那个nameserv要是还ok的话,也就是连接没断的话,它会继续使用你上次用的个,如果你是第一次发起这种请求,没有与nameserv创建过连接或者是上次创建的那个连接不好使了,这个时候就会有个计数器,轮询的使用 ,也就是计数器值+1%namserv地址个数的形式,如果不理解的同学可以找个轮训算法看下,其实都是使用计数器+1 % 列表的个数,这样能够选出来一个列表的位置来,再根据这个位置去列表中获取一下具体的值,好了这个轮询算法我们先解释这么多 ,如果能够正常创建连接,直接使用这个连接了就,如果不能使用,也就是创建连接失败,访问不通等等,这时候继续循环使用这个轮询算法获取下一个地址,然后创建连接,如果能够创建成功,返回对应的channel就可以了,然后client可以往这个channel上发送请求了,这个channel的话可以看作两端的通道,管道都可以。如果不成功继续循环,他这里循环次数是你配置nameserv地址的个数。

好了,这里我们就把获取路由信息的部分看完了,接着就是解析这个TopicRouteData 然后放到生产者本地缓存起来了。
这块内容也是比较长,我们一段一段看下

他这里先是从 topic table中获取对应topic 的老数据,然后 拿老的 与新请求的进行对比,判断一下有没有变动,如果没有变动的话,就调用isNeedUpdateTopicRouteInfo 方法再判断一下需要更新,这个方法其实就是遍历所有的producer 或者是consumer,然后看看他们的topic table里面是不是都有这个topic 没有的话就需要更新下。

这里首先是更新了一下brokerAddrTable这个map ,这map里面然后就是缓存着broker name----》 broker地址的集合
接着就是将 topicRouteData转成topicPublishInfo ,然后 haveTopicRouterInfo设置成true,就是说明它这个topicPublishInfo 里面存着对应的topicRouteData信息。

这里就是将返回的topicRouteData 转成对应的topicPublishInfo,这个topicPublishInfo其实里面就是MessageQueue,比如说我topicRouteData 里面返回2个broker ,然后每个broker的writeQueueNums 个数是4个,这个时候它生成的MessageQueue就是8个,然后每个broker对应着4个MessageQueue。
接着就是遍历更新各个producer的topicPublishInfoTable 对应topic信息。
好了,到这我们的更新topic信息的解析就结束了。

3.创建topic

有这么一个场景:
我发送某个消息的时候指定的那个topic不存在(就是之前没有创建过)消息生产者是怎样处理的,默认的话如果topic不存在的话,消息生产者会先去nameserv拉下topic信息(这里就是走上面第二小节:从nameserv获取topic信息流程),要是还不存在的话,

这里就走红色框的代码了,其实还是调用的updateTopicRouteInfoFromNameServer重载方法,这里这个isDefault 是true了。这个时候就获取一下默认topic的路由信息,这个默认topic是TBW102,发送消息就选择TBW102这个topic的broker发送,broker收到消息后会自动创建这个topic,这里需要注意的是broker得支持自动创建,这里是有个参数配置的autoCreateTopicEnable 设置成true就可以了。
topic我们一般是不会在producer中自动创建的,一般使用RocketMQ的可视化控制台,然后创建topic,指定对应的queue num,指定broker等等,类似下面这个东西

RocketMQ源码解析之消息生产者(获取topic路由信息)相关推荐

  1. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  2. RocketMQ源码解析-Producer消息发送

    首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...

  3. RocketMQ源码解析-事务消息的二阶段提交

    在生产者producer当中,通过sendMessageInTransaction()方法来发送事务消息,但是在一开始向Broker发送的事务消息的时候,具体的事务操作还并没有进行处理,而是相当于向B ...

  4. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  5. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. Spring源码解析 -- SpringWeb请求参数获取解析

    Spring源码解析 – SpringWeb请求参数获取解析 简介 在文章:Spring Web 请求初探中,我们看到最后方法反射调用的相关代码,本篇文章就探索其中的参数是如何从请求中获取的 概览 方 ...

  8. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  9. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  10. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

最新文章

  1. Find Large Files in Linux
  2. loadrunner11下载地址
  3. 解决pjax加载页面不执行js插件的问题
  4. 做人应该知道的十个道理
  5. 阿里云直播PHP SDK如何使用
  6. 李宏毅机器学习作业1:预测PM2.5(含训练数据)
  7. HTML如何添加锚点,总结到位
  8. oracle+内存错误,oracle使用内存的错误,ORA-27102: out of memory
  9. 9206 课堂笔记 综合演练 添加数据与非空验证
  10. [thinkphp 5.0源码阅读] 缓存(一)
  11. 《JavaScript语言精粹》笔记
  12. 计算机组成原理白朔飞,计算机组成原理(第四章复习).ppt
  13. 如何给数组用fill函数和memset函数给数组赋初值
  14. php遗漏,PHP被遗漏的执行函数
  15. eclipse-阶段三-Server被关闭后消失,如何打开
  16. 微信小程序源码打包合集 游戏商城抽奖转盘预约点餐等-1
  17. java电表抄表器接口_远程抄表系统接线图和远程抄表电表安装图及实例
  18. (C++后台面经)网络编程1
  19. 创建txt文本文档快捷键设置
  20. Fall 2020 Berkeley cs61a Projects Ants答案

热门文章

  1. Android圆盘刻度,类似体重测试仪,效果不错哦
  2. [c++]数组的逆输出
  3. Linux ftrace 2.3、kprobe event的使用
  4. 硬件和软件的32位与64位区别
  5. 评《宁可放弃五十万也要逼你去读书》,作者来自另外一个星球?
  6. sns.pairplot()用法
  7. xlsx文件怎么打开?3种方法:Excel+WPS+兼容包来搞定
  8. 微软大战Google Earth的利器-虚拟3D地球“Virtual Earth3D”!
  9. 牛客练习赛53E 老瞎眼 pk 小鲜肉(线段树)
  10. AndrOid系统亭子运行,细讲Android系统下的Preference