rocketmq之producer解析
先来看下producer核心的类设计,如下图:
1、核心发布消息的类DefaultMQProducer
,继承自MQProducer接口,此接口定义了一系列发送消息的方法,如普通消息,顺序消息,延时消息等,最终进行网络通信会交给MQClientAPIImpl
处理。
2、rocketmq从4.1.3版本开始又支持了事务消息,由TransactionMQProducer
类提供(之后会有专门的文章进行详细解读事务消息)
producer之配置
我们看到DefaultMQProducer
继承了一个客户端的公共配置类ClientConfig
(与consumer公用),其实就是一个普通的javaBean,既可以代码中设置属性,也可以集成spring来配置
参数名 | 默认值 | 说明 |
---|---|---|
namesrvAddr | 无 | nameserver的地址列表,用分号隔开 |
clientIP | 本机ip地址 | 客户端ip地址,有时候无法识别,需要手动配置 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
clientCallbackExecutorThreads | cpu核数 | 通信层客户端处理请求的线程数 |
pollNameServerInterval | 30000 | 轮询nameserver的时间间隔,单位ms |
heartbeatBrokerInterval | 30000 | 向broker发送心跳的时间间隔,单位ms |
persistConsumerOffsetInterval | 5000 | 持久化 Consumer 消费进度间隔时间,单位ms |
producer独有的配置:
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,相同分组的producer应该有相同的发送消息逻辑 |
createTopicKey | AUTO_CREATE_TOPIC_KEY | 自动创建topic时,以此默认topic为模板创建指定topic |
defaultTopicQueueNums | 4 | 自动创建topic队列数量 |
sendMsgTimeout | 3000 | 发送消息的超时时间,单位ms |
compressMsgBodyOverHowmuch | 4098 | 消息体超过多大会进行压缩,单位字节 |
retryTimesWhenSendFailed | 2 | 同步发送消息,发送失败重试次数 |
retryTimesWhenSendAsyncFailed | 2 | 异步发送消息,发送失败的重试次数 |
retryAnotherBrokerWhenNotStoreOK | false | 同步发送消息,消息存储失败是否重试其他broker |
maxMessageSize | 4194304 | 客户端限制消息的大小,默认4M |
TransactionListener | 事务消息时,必须设置的回查监听器 |
producer之group概念
我们在创建producer时必须要指定一个group,这里有两个作用:
- 生产者一般会是集群部署的,group用来标识一类生产者,相同group的生产者一般要有相同的发送逻辑。
- 在发送事务消息时,当事务消息异常,broker端来回查事务状态时,需要知道是由哪类生产者发送的事务消息,生产端会根据group名称来查找对应的producer来执行相应的回查逻辑。
producer的启动流程
简单说明下整个启动流程:
1、首先在DefaultMQProducerImpl
中会做一些参数校验,如group是否合法;然后会创建MQClientInstance
实例,此实例包含网络连接、线程资源等,相同的clientId会共享此实例,所以通过MQClientManager
来管理。
2、核心的启动流程在MQClientInstance
类中,如果nameserver地址没有配置的话,会先通过静态的http服务器地址去抓取nameserver的地址;再则启动netty客户端。
3、启动一些定时任务,跟producer有关的如下几个:
- 如果producer没有配置nameserver地址,启动定时抓取nameserver的地址的定时任务,任务延时10s开始,每隔2分支执行一次。
- 轮询nameserver定时任务,主要是定时更新topic的路由信息,任务延时10ms开始,每隔30s执行一次。
- 清除下线的broker和向broker发送心跳,任务延时1s执行,每隔30s执行一次
Producer如何寻址
RocketMQ 有多种配置方式可以令客户端找到 NameServer, 然后通过 NameServer 再找到 Broker,分别如下,
优先级由高到低,高优优先级会覆盖低优先级
1、代码中指定 Name Server 地址
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
2、启动参数指定
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
3、环境变量指定 Name Server 地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
4、HTTP 静态服务器寻址(默认)
如果以上三种都没有设置name server的地址,客户端启动后先会访问一个静态http服务器获取name server的地址,然后会启动一个定时任务访问这个静态 HTTP 服务器,地址如下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
这是默认的地址,当然你也可以更改,做如下设置:
代码:
System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")
或者启动参数指定:
-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer
以上设置后http服务器地址就变成:
http://localhsot:8080/rocketmq/nameServer
这个 URL 的返回内容格式如下:
192.168.0.1:9876;192.168.0.2:9876
客户端每隔 2 分钟访问一次这个 HTTP 服务器,并更新本地的 Name Server 地址。
推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群可以热升级。
发送消息时如何获取路由信息
1、broker在启动的时候通过参数autoCreateTopicEnable
设置是否自动创建topic,默认为true,此时会创建一个名为TBW102
(4.3版本已经改名为AUTO_CREATE_TOPIC_KEY
)的topic(参见类TopicConfigManager
),broker在向namesrv注册时会把默认的topic注册上去。如果设置false,则不会注册。
2、producer在发送消息时会在本地获取路由信息,第一次发送的话本地肯定没有,就会去namesrv获取,如果此时namesrv也没有,则会获取TBW102的topic信息(参见DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此为模板创建topic,然后选择topic下的一台broker发,broker创建后,会通过心跳注册到namesrv上。
3、如果autoCreateTopicEnable设置false的话,producer发送消息会报找不到路由的异常,此时必须手动创建topic。
- 建议autoCreateTopicEnable设置false,基于以上第二步,自动创建topic后,以后所有该TOPIC的消息,都将发送到刚才选择的这台broke上,达不到负载均衡的目的。所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
可以通过管理工具mqadmin来手动创建topic
sh mqadmin updateTopic -c [集群名称] -n [nameserver地址] -t [topic名称] -w [写队列数] -r [读队列数]
- 手动创建了Topic后,producer就可以轮询的发送到不同的broker了。
topic的队列数
这里讲一下自动创建的topic的队列数如何设置,首先broker创建的模板topic=AUTO_CREATE_TOPIC_KEY
的队列是8,参见类TopicConfigManager:
public TopicConfigManager(BrokerController brokerController) { //省略无关代码if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;TopicConfig topicConfig = new TopicConfig(topic);this.systemTopicList.add(topic);topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}//省略无关代码
}
BrokerConfig:
private int defaultTopicQueueNums = 8;
DefaultMQProducer端默认知道要创建的topic的队列数是4
private volatile int defaultTopicQueueNums = 4;
在MQClientInstance
类的方法updateTopicRouteInfoFromNameServer
中有这样一段逻辑:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {//省略无关代码for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}//省略无关代码}
创建队列是取两者最小的一个,也就是4,所以要设置topic的队列数量,很明显了设置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。这是自动创建Topic时队列数的设置方法,上面也提到生成环境一般不会开启自动创建Topic的功能,可以通过上面的手动创建Topic的指令来设置读写队列数。你可能注意到了Topic下有读写队两个队列数,分别代表上面意思呢?读写队列其实是个逻辑概念,一个broker下topic的总队列数是以写队列为准,而读队列意思是允许多少队列可以被消费者消费,也就是说读多写少的情况下,没有问题,队列都可以被消费掉,如果写多读少的话,那么就会存在队列不会被消费的情况。
消息发送
前面我们讲到了如何获取topic的路由信息,如何创建topic的队列数,一个topic下有多个队列,又可以分布在不同的broker上面,所以topic的总队列数应该是所有broker上的topic下队列数的总和。
备注:如果手动在每个broker上分别创建topic的话,相同topic在不同broker上的队列数可以不一样。
那么问题来了,在发送消息时根据怎么样的策略来选择一个队列发送呢?rocketmq提供了一个MQFaultStrategy
策略类来负责选择队列,这里会有一个参数sendLatencyFaultEnable
是否开启延迟故障,
该值默认为
false
,在不开启的情况下,相同线程发送消息是轮询topic下的所有队列,不同线程发送是随机的,核心代码如下:public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {//省略不必要的代码......}return tpInfo.selectOneMessageQueue(lastBrokerName); } //以上代码逻辑参见类MQFaultStrategy.selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {//省略不必要的代码......} } public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos); } //以上代码逻辑参见类TopicPublishInfo public int getAndIncrement() {Integer index = this.threadLocalIndex.get();//ThreadLocal中获取if (null == index) {//为空,随机生成一个index = Math.abs(random.nextInt());if (index < 0)index = 0;this.threadLocalIndex.set(index);}index = Math.abs(index + 1);if (index < 0)index = 0;this.threadLocalIndex.set(index);return index; } //以上代码参见类ThreadLocalIndex
每次获取index的时候都是从本地线程变量ThreadLocal中获取,没有的情况下就是随机生成一个,加1取绝对值后返回,再对队列列表的长度取模,所以在同一线程中,会轮训的从队列列表获取队列。而如果是不同线程的话,index是随机生成的,所以就是随机从队列列表中获取。如下图所示:
可以看到选择队列方法的入参有一个
lastBrokerName
的入参,此参数的目的是在发送消息失败的情况下,producer会重试再次发送,而再次发送选择的队列需要另选一个broker,lastBrokerName
就是要过滤掉失败的broker,选择下一个broker的队列进行发送消息。- 开启延迟故障,每当发送完一次消息,不管成功还是失败,都会把此次存储消息的broker给保存下来,记录故障情况下此broker需要延长多长时间才能再次发送,目前看到在代码里面写死了,故障下30s之内是不能再向此broker发送消息了。
消息重试
producer的send方法本身支持内部重试,重试逻辑如下:
1、最大重试次数默认2次,可以通过参数retryTimesWhenSendFailed
设置
2、发送失败,则轮询到下一个broker,如果此时只有一个broker在线呢?那就会轮训这个broker下的其他队列。
3、这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认为3s。
如果发送消息,broker返回结果超时,这种超时不会进行重试了;如果是方法本身耗时超过sendMsgTimeout ,还未来得及调用发送消息,此时的超时也不会重试。
以上策略其实也很难保证同步发送消息一定成功,如果应用要保证消息不丢失,最好先把消息存储到db,后台启线程定时重试,确保消息一定存储到broker。
rocketmq之producer解析相关推荐
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ的Producer详解之分布式事务消息(代码实现以及过程分析)
执行流程 1. 发送方向 MQ 服务端发送消息. 2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. 3. 发送方开始执行本地事务逻辑. 4. ...
- 从源码分析RocketMQ系列-Producer的SendResult来自哪里?
导语 对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...
- RocketMQ架构原理解析(一):整体架构
一.概述 RocketMQ作为一个apache的顶级项目,拥有将近16K的star,它稳定的系统及强悍的性能,无疑在国内已经成为了企业消息队列的首选.接下来的一段时间,鄙人准备出一系列文章,从源码出发 ...
- RocketMQ学习笔记(8)----RocketMQ的Producer API简介
在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- RocketMQ源码解析-Producer启动
RocketMQ中生产者通过DefaultProducer来创建. protected final transient DefaultMQProducerImpl defaultMQProducerI ...
- RocketMQ源码解析-Producer消息发送
首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...
最新文章
- oracle recyclebin详解,oracle recyclebin详解
- LeetCode Super Ugly Number
- JS 星号 * 处理手机号和名称
- 帆软报表多行多条数据写入表_在线报表FineReport中多数据集如何实现层式报表...
- jzoj3189-解密【字符串hash】
- 关于Linux线程的线程栈以及TLS
- 试列出种计算机组生产率的公式,农业机械化生产学思考题
- Mybatis-01-简介及入门
- markdown的基本使用方法
- python程序编写应注意哪些问题_Python程序员鲜为人知但你应该知道的17个问题
- java编写一个web 留言板_Java Web在线留言板
- 【Matlab优化预测】贝叶斯网络优化LSTM预测【含源码 1158期】
- 引进博士:130平米住房+20万经费+15万年薪;硕士24万引进费+可聘讲师
- 用3DSMAX制作室内效果图的九大步骤
- 基于springBoot+MyBatis+Vue的前后端分离旅游管理系统
- 苏州企业如何免费办理软件著作权
- 程序员要不要懂底层原理
- LC-恢复二叉搜索树(JavaScript实现)
- AttributeError: module ‘win32com.gen_py.00020905-0000-4B30-A977-D214852036FF
- ubuntu 14.04.5 firefox 浏览器flash插件安装
热门文章
- CentOS下Docker安装
- 编写程序,子进程通过管道向父进程发出字符串ok.
- (原创)无废话C#设计模式之十二:Bridge
- python image 转成字节_(推荐)谈谈Python生态圈图像格式转换问题:含实例解析
- 【洛谷】【动态规划/二维背包】P1855 榨取kkksc03
- 兼容ie8 rgba()用法
- 洛谷 P3376 【模板】网络最大流
- 创建一个windows服务的小程序及注意事项
- 不要在同一客户端同时使用超过两个的 HTTP 长连接
- 使用ASP.NET Atlas开发随输入内容自动调整行数的textarea(转)