Apache RocketMQ源码学习之生产者发送消息
源码地址:https://gitee.com/pengyd950812/rocket
一、消息发送方式
RocketMQ 支持常见的三种发送方式:SYNC、ASYNC、ONEWAY
SYNC : 同步的发送方式,会等待发送结果后才返回。
ASYNC : 异步的发送方式,发送完后,立刻返回。
ONEWAY : 发出去后,什么都不管直接返回。
二、RocketMQ生产者发送消息流程
三、跟着源码阅读其实现过程
1.核心实现方法 - DefaultMQProducerImpl.sendDefaultImpl()
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode, // 发送类别final SendCallback sendCallback, // 如果是异步发送方式,则需要实现SendCallback回调final long timeout // 超时时间) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 合法性检查this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();// 开始发消息时间long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 选择发送topic信息,此步骤数据由·MQClientInstance·中启动的定时任务维护// topicPublishInfo里面维护了该topic相关的broker和队列信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 发送消息次数,同步3次,其他都是一次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;// 第几次发送int times = 0;// 存储每次发送消息选择的broker名字String[] brokersSent = new String[timesTotal];//循环重复发送几次for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 根据topic路由表及broker名称,获取一个messageQueueMessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}// 发送消耗的时间long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;// 超时中断break;}// 发送消息到选中的队列sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 更新本次borker可用性(容错)this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);// 如果SYNC模式下发送失败进行重试,ASYNC和ONEWAY模式下直接返回nullswitch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
这里我们关注一下方法的入参:
Message msg: 需要发送的消息
CommunicationMode communicationMode:发送类别是个枚举类(SYNC、ASYNC、ONEWAY)
SendCallback sendCallback:如果是异步发送方式,则需要实现SendCallback回调
long timeout: 超时时间
2.选择发送topic信息 - tryToFindTopicPublishInfo()
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 一个Topic可能含有多个Broker上的多个可写的MessageQueue// 从缓存的topic路由表中获取topic路由TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {// 不存在则向NameServer发起查找this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 根据topic获取路由信息,从nameserver中获取,并更新本地缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 路由表中存在路由信息if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {// 如果nameServer中还是没有,则会使用默认的topic "TBW102"去获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}
Apache RocketMQ源码学习之生产者发送消息相关推荐
- RocketMQ源码解析:Producer发送消息+Broker消息存储
文章目录 1. Producer 发送消息 2. Broker 接收消息 1. Producer 发送消息 先上一段简单的生产者代码 public static void main(String[] ...
- RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?
RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...
- RocketMQ源码学习
RocketMQ源码学习 文章目录 RocketMQ源码学习 Producer 是怎么将消息发送至 Broker 的? 同步发送 异步发送 队列选择器 事务消息 原理 Broker 是怎么处理客户端发 ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
- RocketMQ源码学习(六)-Name Server
问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co ...
- RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析
深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...
- RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】
基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...
- rocketmq源码分析 -生产者
概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...
- Apache log4j-1.2.17源码学习笔记
(1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
最新文章
- GRE核心词汇助记与精练-List10感觉,感情
- 常用CSS元素div ul dl dt ol的简单解释
- 假设有50瓶饮料,喝完3个空瓶可以换一瓶饮料,依次类推,请问总共喝了多少瓶饮料???
- 32位网卡驱动 2008_DPDK之网卡收包流程
- 简单的vue入门案例
- 递归算法 流程图_什么是算法?如何学习算法?算法入门的学习路径
- Python语音信号处理
- docker启动停止操作命令
- mpu6050 重力加速度_MPU6050抄底解读
- ASP.NET加密解密
- justauth对接facebook、linkedin、twitter登陆
- Intel Optane 内存个人安装与使用步骤 Dell 灵越
- 大学计算机之软件设计程序框图,《程序框图、顺序结构》教学设计
- Xubuntu Linux发行版放弃即时消息软件Pidgin
- Could not find acceptable representation 原因探究
- adb一打开就闪退_常用的adb命令
- mysql的altertable_【编程词典】mysql ALTER TABLE语句
- 谷歌地球飞行模式使用教程
- 数控系统的计算机仿真论文,计算机仿真技术及其在数控加工中的应用数控仿真技术论文.doc...
- c语言课程设计(学生籍贯管理系统)学完c语言你可以做的案例