1、类层次结构

2、producer的启动

首先设置组,及NameServer,然后调用start启动。启动关键逻辑主要在MQClientInstance中。

(1)启动NettyRemotingClient,创建请求响应处理channel

(2)启动周期性任务

每2分钟获取NameServerAddr

默认每30秒从nameserver中拉取topic信息

默认每30秒清除离线的broker,发送心跳给所有的broker

默认每5秒持久化consumeroffset

默认每1分钟调节核心线程数

(3)开启pullmessage线程服务

(4)开启rebanlance线程服务

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}private void startScheduledTask() {if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);}

其序列图为

3、producer发送消息

支持同步、异步、单向发送三种类型。主要的发送逻辑在DefaultMqProducerImpl类sendDefaultImpl方法中,首先会调用tryToFindTopicPublishInfo查找得到topic的发布信息,然后调用MQFaultStrategy类选择一个消息队列,如果开启发送消息延时容错,会调用LatencyFaultTolerance选择队列,否则直接调用TopicPublishInfo选择队列,然后调用sendKernalImpl发送消息。

其时序图为

4、事务消息

基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚。

producer调用rocketmq消息发送接口,发送状态为preprare的消息。消息发送成功后,会调用本地事件监听程序,记录消息的本地事务状态,该相关标记与本地业务操作同属一个事务,确保消息发送与本地事务的原子性。

broker在收到类型为prepare消息时,会首先备份消息的原主题与原消息消费队列,然后将消息存储在主题 为RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中。

broker会开启一个线程TransactionalMessageCheckService,来消费RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息发送端发起事务状态回查,producer根据保存的事务状态回馈消息服务器事务的状态。如果是提交或回滚,则broker提交或回滚消息。如果是未知,待下一次回查,broker允许设置一条消息的回查间隔与回查次数,如果在超过回查次数后依赖无法获知消息的事务状态,则默认回滚消息。

其处理交互图为

4.1 消息发送

事务消息发送是通过TransactionMQProcuder,其类结构为

TransactionListener transactionListener事务监听器,主要定义实现本地事务状态执行、本地事务状态回查

ExecutorService executorService:事务状态回查异步执行线程池。

消息发送步骤为

  1. 添加属性PROPERTY_TRANSCATION_PREPARED和PROPERTY_PRODUCER_GROUP,分别表示消息为prepare消息,消息所属消息生产者组。设置消息生产者组的目的是在查询事务消息本地事务状态时,从该生产者组中随机选择一个消息生产者即可,然后通过同步单向调用方式发送消息。
  2. 根据消息发送结果执行相应操作,如果发送成功,执行TransactionListener#executeLocalTransaction方法,记录事务消息的本地事务状态。如果发送失败,设置事务状态为ROLLBACK_MESSAGE
  3. 结束事务,根据上步的事务状态进行提交、回滚或暂时不处理事务。
broker在收到消息存储请求时,SendMessageProcessor#asyncSendMessage,如果消息为prepare消息,执行TransactionalMessageService#asyncPrepareMessage,否则执行普通消息的存储流程。

事务消息备份消息的原主题与原消息消费队列,然后将主题修改为RMQ_SYS_TRANS_HALF_TOPIC,消费队列修改为0,然后消息按照普通消息存储在commitlog文件进而转发到RMQ_SYS_TRANS_HALF_TOPIC主题对应的消息消费队列。通过TransactionalMessageCheckService线程定时去消费该主题,然后将该消息在满足特定条件下恢复消息主题 ,进而被消费者消费。

4.2 事务提交或者回滚

事务提交或者回滚是通过DefaultMQProducerImpl#endTransaction,根据消息的消息队列获取broker的ip和端口号,然后发送结束事务命令,通过本地执行事务的状态分别提交、回滚或者不处理。

brokere的结束事务处理器为EndTranscationPRocessor。如果是提交事务

  • 首先从结束事务请求命令中获取消息的物理像偏移量(commitLogOffset),其实现是由TransactionalMessageService#commitMessage完成
  • 恢复消息主题、消费队列,构造新的消息对象,由EndTransactionProcessor#endMessageTransaction完成。
  • 将消息再次存储在commitlog文件中,此时的消息主题则为业务方发送的消息,将被转发到对应的消息消费队列,供消息消费者消费。由EndTransactionProcessor#sendFinalMessage完成。
  • 消息存储后,删除prepare消息,并不是真正的删除,将prepare消息存放到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中,表示该事务已经处理过(提交或者回滚),为未处理的事务进行事务回查提供查找依据。

4.3 事务消息回查

其相关类图为

事务消息存储在消息服务器时主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,执行完本地事务返回本地事务状态为UN_KNOW时,结束事务时不做任何处理,而是通过事务状态定时回查以得到发送端明确的事务操作(提交事务或者回滚事务)。通过TransactionalMessageCheckService线程定时检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态,默认检测频率为1分钟。

transactionTimeOut:事务过期时间,只有当消息的存储 时间加上过期时间大于系统当前时间,才对消息执行事务状态回查,否则在下一次周期中执行事务回查操作。

transactionCheckMax:事务回查最大检测次数,如果超过最大检测次数还是无法获知消息的事务状态,broker将不会继续对消息进行事务回查,而是直接丢弃,即回滚事务。

其实现类为TransactionalMessageServiceImpl#check,获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有消息队列,根据事务消息队列获取与之对应的消息队列,其实就是获取已处理消息的消息消息队列,其主题为RMQ_SYS_TRANS_OP_HALF_TOPIC。fillOpRemoveMap根据当前的处理进度依次从已处理队列拉取32条消息,方便判断当前处理的消息是否已经处理过,如果处理过㞱则无须再次发送事务状态回查请求,避免重复发送事务回查请求。

在执行事务回查之前,把该消息存储在commitlog文件,新的消息设置最新的物理偏移量。因为下文 的发送事务消息是异步处理的,无法立刻知道其处理结果,为了简化prepare消息队列和处理队列的消息消费进度处理,先存储,然后消费进度向前推进,重复发送的消息在事务回查之前会判断是否处理过。另外一个目的就是需要修改消息的检查次数,rocketmq的存储设计采用顺序写,支修改已存储的消息,其性能无法高性能。

发送具体的事务回查命令,使用线程池来异步发送回查消息,为了回查消息进度保存的简化,只要发送了回查消息,当前回查进度会向前推进,如果回查失败,上一步新增的消息将可以再次发送回查消息。首先构造事务状态回查请求消息,核心参数消息offsetId,消息id(索引),消息事务id,事务消息队列中的偏移量,消息主题,消息队列,然后根据消息的生产组,从中随机选择一个消息发送者。最后向消息发送者发送事务回查命令。

事务回查命令的处理者为ClientRemotingProcessor#processRequest方法,将任务提交到TransactionMQProducer的线程池中执行,最终交给TransactinListener#checkLocalTransaction方法,返回事务状态,根据事务状态调用 processTransactionState

5、消息id

在发送消息时,会设置属性PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,如果没有设置,通过MessageClientIDSetter.setUniqID来设置。













												

rocketmq中producer设计与实现相关推荐

  1. rocketmq中consumer设计与实现

    0.结构图 1.类层次图 2.Consumer消息队列分配 消息队列分配使用模板方法模式,在ReblanceImpl中定义处理框架,对于变动部分提炼出抽象方法,交给子类来实现 是通过Rebalance ...

  2. 详解RocketMQ中的Producer

    上一篇博客讲解了如何安装RocketMQ,并且也简单的介绍了一下相关RocketMq的概念,那么这篇博客,来剖析一下MQ中的producer的角色,看看它是来干什么的? 上图就是MQ中Producer ...

  3. 消息中间件系列(九):详解RocketMQ的架构设计、关键特性、与应用场景

    内容大纲: RocketMQ的简介与演进 RocketMQ的架构设计 RocketMQ的关键特性 RocketMQ的应用场景 RocketMQ的简介 RocketMQ一个纯java.分布式.队列模型的 ...

  4. 深入理解RocketMQ中的NameServer

    本文来说下RocketMQ中的NameServer 文章目录 NameServer介绍 NameServer的作用 为什么要使用NameServer NameServer如何保证数据的最终一致 路由注 ...

  5. RocketMQ的架构设计详解

    本文来说下RocketMQ的架构设计 文章目录 RocketMQ的简介 RocketMQ的架构设计 RocketMQ的核心组件 RocketMQ的消息领域模型 RocketMQ的关键特性 消息的顺序 ...

  6. 消息中间件学习总结(5)——RocketMQ之Apache RocketMQ背后的设计思路与最佳实践

    摘要:为了更好地让开发者们更加深入了解阿里开源,阿里云云栖社区在3月1号了举办"阿里开源项目最佳实践"在线技术峰会,直播讲述了当前阿里新兴和经典开源项目实战经验以及背后的开发思路. ...

  7. RocketMQ 中Topic、Tag、GroupName基本概念介绍

    本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...

  8. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

  9. RocketMQ消息轨迹-设计篇

    RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...

最新文章

  1. Android 错误: 找不到符号 符号: 类 x 位置: 类 MainActivity
  2. 七内部排序算法汇总(插入排序、Shell排序、冒泡排序、请选择类别、、高速分拣合并排序、堆排序)...
  3. 君康人寿2019年排名_君康人寿易主后 内部提出五年上市计划
  4. Python--day26--封装和@property
  5. ATM取款机java的实现--练习代码
  6. 究竟是谁弄脏了我们的世界
  7. 航信3.0开票模拟系统
  8. 主板温度过高的原因是什么?主板温度高的原因和处理办法
  9. 计算机课做名片怎么做,第7课制作个人小名片(教学设计)
  10. 唐诗学习系统-java课程设计
  11. 咸阳强生告诉你吃什么食物养胃效果好
  12. ctf实验吧天网管理系统
  13. STM32CubeMX SDRAM的使用(二)
  14. [ Python ] 爬虫类库学习之 xpath,爬取彼岸图网的 小姐姐 图片
  15. 谷歌显示不安全连接到服务器地址,教您解决Chrome浏览器提示“网站连接不安全”的方法...
  16. macos U盘引导安装
  17. Hbase安装~Hbase安装过程中常见的问题
  18. 将数字转换成科学计数法
  19. 18年6月六级翻译词汇
  20. Arduino——正点原子sim800c模块

热门文章

  1. Canvas做股票数据走势图实践分享(一)
  2. VisualStudio2013 如何打开之前版本开发的(.vdproj )安装项目
  3. 绝地求生自定义服务器租用,绝地求生自定义服务器怎么开 自定义服务器设置方法...
  4. java list用法_java list的用法详解
  5. 哪本python入门书内容最详细-重磅 | 由浅入深的 AI 学习路线,最详细的资源整理!...
  6. 爬虫python下载-如何用Python爬虫实现百度图片自动下载?
  7. python怎么样才算入门编程-编程零基础应当如何开始学习 Python?
  8. 有道精品课python-115批量转存与提取sha1工具2020下载
  9. python 创建txt文件并写入字符串-python创建txt文件
  10. python timer使用-关于定时器的两种使用方法