先来看下producer核心的类设计,如下图:

1、核心发布消息的类DefaultMQProducer,继承自MQProducer接口,此接口定义了一系列发送消息的方法,如普通消息,顺序消息,延时消息等,最终进行网络通信会交给MQClientAPIImpl处理。

2、rocketmq从4.1.3版本开始又支持了事务消息,由TransactionMQProducer类提供(之后会有专门的文章进行详细解读事务消息)

producer之配置

我们看到DefaultMQProducer继承了一个客户端的公共配置类ClientConfig(与consumer公用),其实就是一个普通的javaBean,既可以代码中设置属性,也可以集成spring来配置

参数名 默认值 说明
namesrvAddr nameserver的地址列表,用分号隔开
clientIP 本机ip地址 客户端ip地址,有时候无法识别,需要手动配置
instanceName DEFAULT 客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads cpu核数 通信层客户端处理请求的线程数
pollNameServerInterval 30000 轮询nameserver的时间间隔,单位ms
heartbeatBrokerInterval 30000 向broker发送心跳的时间间隔,单位ms
persistConsumerOffsetInterval 5000 持久化 Consumer 消费进度间隔时间,单位ms

producer独有的配置:

参数名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer组名,相同分组的producer应该有相同的发送消息逻辑
createTopicKey AUTO_CREATE_TOPIC_KEY 自动创建topic时,以此默认topic为模板创建指定topic
defaultTopicQueueNums 4 自动创建topic队列数量
sendMsgTimeout 3000 发送消息的超时时间,单位ms
compressMsgBodyOverHowmuch 4098 消息体超过多大会进行压缩,单位字节
retryTimesWhenSendFailed 2 同步发送消息,发送失败重试次数
retryTimesWhenSendAsyncFailed 2 异步发送消息,发送失败的重试次数
retryAnotherBrokerWhenNotStoreOK false 同步发送消息,消息存储失败是否重试其他broker
maxMessageSize 4194304 客户端限制消息的大小,默认4M
TransactionListener 事务消息时,必须设置的回查监听器

producer之group概念

我们在创建producer时必须要指定一个group,这里有两个作用:

  • 生产者一般会是集群部署的,group用来标识一类生产者,相同group的生产者一般要有相同的发送逻辑。
  • 在发送事务消息时,当事务消息异常,broker端来回查事务状态时,需要知道是由哪类生产者发送的事务消息,生产端会根据group名称来查找对应的producer来执行相应的回查逻辑。

producer的启动流程

简单说明下整个启动流程:

1、首先在DefaultMQProducerImpl中会做一些参数校验,如group是否合法;然后会创建MQClientInstance实例,此实例包含网络连接、线程资源等,相同的clientId会共享此实例,所以通过MQClientManager来管理。

2、核心的启动流程在MQClientInstance类中,如果nameserver地址没有配置的话,会先通过静态的http服务器地址去抓取nameserver的地址;再则启动netty客户端。

3、启动一些定时任务,跟producer有关的如下几个:

  • 如果producer没有配置nameserver地址,启动定时抓取nameserver的地址的定时任务,任务延时10s开始,每隔2分支执行一次。
  • 轮询nameserver定时任务,主要是定时更新topic的路由信息,任务延时10ms开始,每隔30s执行一次。
  • 清除下线的broker和向broker发送心跳,任务延时1s执行,每隔30s执行一次

Producer如何寻址

RocketMQ 有多种配置方式可以令客户端找到 NameServer, 然后通过 NameServer 再找到 Broker,分别如下,
优先级由高到低,高优优先级会覆盖低优先级

1、代码中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

2、启动参数指定

-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

3、环境变量指定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

4、HTTP 静态服务器寻址(默认)

如果以上三种都没有设置name server的地址,客户端启动后先会访问一个静态http服务器获取name server的地址,然后会启动一个定时任务访问这个静态 HTTP 服务器,地址如下:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

这是默认的地址,当然你也可以更改,做如下设置:

代码:

System.setProperty("rocketmq.namesrv.domain","localhost");
System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")

或者启动参数指定:

-Drocketmq.namesrv.domain=localhost
-Drocketmq.namesrv.domain.subgroup=nameServer

以上设置后http服务器地址就变成:

http://localhsot:8080/rocketmq/nameServer

这个 URL 的返回内容格式如下:

192.168.0.1:9876;192.168.0.2:9876

客户端每隔 2 分钟访问一次这个 HTTP 服务器,并更新本地的 Name Server 地址。

推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群可以热升级。

发送消息时如何获取路由信息

1、broker在启动的时候通过参数autoCreateTopicEnable设置是否自动创建topic,默认为true,此时会创建一个名为TBW102(4.3版本已经改名为AUTO_CREATE_TOPIC_KEY)的topic(参见类TopicConfigManager),broker在向namesrv注册时会把默认的topic注册上去。如果设置false,则不会注册。

2、producer在发送消息时会在本地获取路由信息,第一次发送的话本地肯定没有,就会去namesrv获取,如果此时namesrv也没有,则会获取TBW102的topic信息(参见DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此为模板创建topic,然后选择topic下的一台broker发,broker创建后,会通过心跳注册到namesrv上。

3、如果autoCreateTopicEnable设置false的话,producer发送消息会报找不到路由的异常,此时必须手动创建topic。

  • 建议autoCreateTopicEnable设置false,基于以上第二步,自动创建topic后,以后所有该TOPIC的消息,都将发送到刚才选择的这台broke上,达不到负载均衡的目的。所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
  • 可以通过管理工具mqadmin来手动创建topic

    sh mqadmin updateTopic -c [集群名称] -n [nameserver地址] -t [topic名称] -w [写队列数] -r [读队列数]
  • 手动创建了Topic后,producer就可以轮询的发送到不同的broker了。

topic的队列数

这里讲一下自动创建的topic的队列数如何设置,首先broker创建的模板topic=AUTO_CREATE_TOPIC_KEY的队列是8,参见类TopicConfigManager:

public TopicConfigManager(BrokerController brokerController) { //省略无关代码if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;TopicConfig topicConfig = new TopicConfig(topic);this.systemTopicList.add(topic);topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;topicConfig.setPerm(perm);this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);}//省略无关代码
}

BrokerConfig:

private int defaultTopicQueueNums = 8;

DefaultMQProducer端默认知道要创建的topic的队列数是4

private volatile int defaultTopicQueueNums = 4;

MQClientInstance类的方法updateTopicRouteInfoFromNameServer中有这样一段逻辑:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {//省略无关代码for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}//省略无关代码}

创建队列是取两者最小的一个,也就是4,所以要设置topic的队列数量,很明显了设置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。这是自动创建Topic时队列数的设置方法,上面也提到生成环境一般不会开启自动创建Topic的功能,可以通过上面的手动创建Topic的指令来设置读写队列数。你可能注意到了Topic下有读写队两个队列数,分别代表上面意思呢?读写队列其实是个逻辑概念,一个broker下topic的总队列数是以写队列为准,而读队列意思是允许多少队列可以被消费者消费,也就是说读多写少的情况下,没有问题,队列都可以被消费掉,如果写多读少的话,那么就会存在队列不会被消费的情况。

消息发送

前面我们讲到了如何获取topic的路由信息,如何创建topic的队列数,一个topic下有多个队列,又可以分布在不同的broker上面,所以topic的总队列数应该是所有broker上的topic下队列数的总和。

备注:如果手动在每个broker上分别创建topic的话,相同topic在不同broker上的队列数可以不一样。

那么问题来了,在发送消息时根据怎么样的策略来选择一个队列发送呢?rocketmq提供了一个MQFaultStrategy策略类来负责选择队列,这里会有一个参数sendLatencyFaultEnable是否开启延迟故障,

  • 该值默认为false,在不开启的情况下,相同线程发送消息是轮询topic下的所有队列,不同线程发送是随机的,核心代码如下:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {//省略不必要的代码......}return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    //以上代码逻辑参见类MQFaultStrategy.selectOneMessageQueue
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {//省略不必要的代码......}
    }
    public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
    }
    //以上代码逻辑参见类TopicPublishInfo
    public int getAndIncrement() {Integer index = this.threadLocalIndex.get();//ThreadLocal中获取if (null == index) {//为空,随机生成一个index = Math.abs(random.nextInt());if (index < 0)index = 0;this.threadLocalIndex.set(index);}index = Math.abs(index + 1);if (index < 0)index = 0;this.threadLocalIndex.set(index);return index;
    }
    //以上代码参见类ThreadLocalIndex

    每次获取index的时候都是从本地线程变量ThreadLocal中获取,没有的情况下就是随机生成一个,加1取绝对值后返回,再对队列列表的长度取模,所以在同一线程中,会轮训的从队列列表获取队列。而如果是不同线程的话,index是随机生成的,所以就是随机从队列列表中获取。如下图所示:

    可以看到选择队列方法的入参有一个lastBrokerName的入参,此参数的目的是在发送消息失败的情况下,producer会重试再次发送,而再次发送选择的队列需要另选一个broker,lastBrokerName就是要过滤掉失败的broker,选择下一个broker的队列进行发送消息。

  • 开启延迟故障,每当发送完一次消息,不管成功还是失败,都会把此次存储消息的broker给保存下来,记录故障情况下此broker需要延长多长时间才能再次发送,目前看到在代码里面写死了,故障下30s之内是不能再向此broker发送消息了。

消息重试

producer的send方法本身支持内部重试,重试逻辑如下:

1、最大重试次数默认2次,可以通过参数retryTimesWhenSendFailed设置

2、发送失败,则轮询到下一个broker,如果此时只有一个broker在线呢?那就会轮训这个broker下的其他队列。

3、这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认为3s。

如果发送消息,broker返回结果超时,这种超时不会进行重试了;如果是方法本身耗时超过sendMsgTimeout ,还未来得及调用发送消息,此时的超时也不会重试。

以上策略其实也很难保证同步发送消息一定成功,如果应用要保证消息不丢失,最好先把消息存储到db,后台启线程定时重试,确保消息一定存储到broker。

rocketmq之producer解析相关推荐

  1. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  2. RocketMQ的Producer详解之分布式事务消息(代码实现以及过程分析)

    执行流程 1. 发送方向 MQ 服务端发送消息. 2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. 3. 发送方开始执行本地事务逻辑. 4. ...

  3. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

  4. RocketMQ架构原理解析(一):整体架构

    一.概述 RocketMQ作为一个apache的顶级项目,拥有将近16K的star,它稳定的系统及强悍的性能,无疑在国内已经成为了企业消息队列的首选.接下来的一段时间,鄙人准备出一系列文章,从源码出发 ...

  5. RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  8. RocketMQ源码解析-Producer启动

    RocketMQ中生产者通过DefaultProducer来创建. protected final transient DefaultMQProducerImpl defaultMQProducerI ...

  9. RocketMQ源码解析-Producer消息发送

    首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...

最新文章

  1. oracle recyclebin详解,oracle recyclebin详解
  2. LeetCode Super Ugly Number
  3. JS 星号 * 处理手机号和名称
  4. 帆软报表多行多条数据写入表_在线报表FineReport中多数据集如何实现层式报表...
  5. jzoj3189-解密【字符串hash】
  6. 关于Linux线程的线程栈以及TLS
  7. 试列出种计算机组生产率的公式,农业机械化生产学思考题
  8. Mybatis-01-简介及入门
  9. markdown的基本使用方法
  10. python程序编写应注意哪些问题_Python程序员鲜为人知但你应该知道的17个问题
  11. java编写一个web 留言板_Java Web在线留言板
  12. 【Matlab优化预测】贝叶斯网络优化LSTM预测【含源码 1158期】
  13. 引进博士:130平米住房+20万经费+15万年薪;硕士24万引进费+可聘讲师
  14. 用3DSMAX制作室内效果图的九大步骤
  15. 基于springBoot+MyBatis+Vue的前后端分离旅游管理系统
  16. 苏州企业如何免费办理软件著作权
  17. 程序员要不要懂底层原理
  18. LC-恢复二叉搜索树(JavaScript实现)
  19. AttributeError: module ‘win32com.gen_py.00020905-0000-4B30-A977-D214852036FF
  20. ubuntu 14.04.5 firefox 浏览器flash插件安装

热门文章

  1. CentOS下Docker安装
  2. 编写程序,子进程通过管道向父进程发出字符串ok.
  3. (原创)无废话C#设计模式之十二:Bridge
  4. python image 转成字节_(推荐)谈谈Python生态圈图像格式转换问题:含实例解析
  5. 【洛谷】【动态规划/二维背包】P1855 榨取kkksc03
  6. 兼容ie8 rgba()用法
  7. 洛谷 P3376 【模板】网络最大流
  8. 创建一个windows服务的小程序及注意事项
  9. 不要在同一客户端同时使用超过两个的 HTTP 长连接
  10. 使用ASP.NET Atlas开发随输入内容自动调整行数的textarea(转)