源码地址:https://gitee.com/pengyd950812/rocket

一、消息发送方式

RocketMQ 支持常见的三种发送方式:SYNC、ASYNC、ONEWAY

SYNC : 同步的发送方式,会等待发送结果后才返回。

ASYNC : 异步的发送方式,发送完后,立刻返回。

ONEWAY : 发出去后,什么都不管直接返回。

二、RocketMQ生产者发送消息流程

三、跟着源码阅读其实现过程

1.核心实现方法 - DefaultMQProducerImpl.sendDefaultImpl()

private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,  // 发送类别final SendCallback sendCallback, // 如果是异步发送方式,则需要实现SendCallback回调final long timeout // 超时时间) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 合法性检查this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();// 开始发消息时间long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 选择发送topic信息,此步骤数据由·MQClientInstance·中启动的定时任务维护// topicPublishInfo里面维护了该topic相关的broker和队列信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 发送消息次数,同步3次,其他都是一次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;// 第几次发送int times = 0;// 存储每次发送消息选择的broker名字String[] brokersSent = new String[timesTotal];//循环重复发送几次for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 根据topic路由表及broker名称,获取一个messageQueueMessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}// 发送消耗的时间long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;// 超时中断break;}// 发送消息到选中的队列sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 更新本次borker可用性(容错)this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);// 如果SYNC模式下发送失败进行重试,ASYNC和ONEWAY模式下直接返回nullswitch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

这里我们关注一下方法的入参:

Message msg: 需要发送的消息
CommunicationMode communicationMode:发送类别是个枚举类(SYNC、ASYNC、ONEWAY)
SendCallback sendCallback:如果是异步发送方式,则需要实现SendCallback回调
long timeout: 超时时间

2.选择发送topic信息 - tryToFindTopicPublishInfo()

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 一个Topic可能含有多个Broker上的多个可写的MessageQueue// 从缓存的topic路由表中获取topic路由TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {// 不存在则向NameServer发起查找this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 根据topic获取路由信息,从nameserver中获取,并更新本地缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}// 路由表中存在路由信息if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {// 如果nameServer中还是没有,则会使用默认的topic "TBW102"去获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

Apache RocketMQ源码学习之生产者发送消息相关推荐

  1. RocketMQ源码解析:Producer发送消息+Broker消息存储

    文章目录 1. Producer 发送消息 2. Broker 接收消息 1. Producer 发送消息 先上一段简单的生产者代码 public static void main(String[] ...

  2. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  3. RocketMQ源码学习

    RocketMQ源码学习 文章目录 RocketMQ源码学习 Producer 是怎么将消息发送至 Broker 的? 同步发送 异步发送 队列选择器 事务消息 原理 Broker 是怎么处理客户端发 ...

  4. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  5. RocketMQ源码学习(六)-Name Server

    问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co ...

  6. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  7. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  8. rocketmq源码分析 -生产者

    概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...

  9. Apache log4j-1.2.17源码学习笔记

    (1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...

  10. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

最新文章

  1. GRE核心词汇助记与精练-List10感觉,感情
  2. 常用CSS元素div ul dl dt ol的简单解释
  3. 假设有50瓶饮料,喝完3个空瓶可以换一瓶饮料,依次类推,请问总共喝了多少瓶饮料???
  4. 32位网卡驱动 2008_DPDK之网卡收包流程
  5. 简单的vue入门案例
  6. 递归算法 流程图_什么是算法?如何学习算法?算法入门的学习路径
  7. Python语音信号处理
  8. docker启动停止操作命令
  9. mpu6050 重力加速度_MPU6050抄底解读
  10. ASP.NET加密解密
  11. justauth对接facebook、linkedin、twitter登陆
  12. Intel Optane 内存个人安装与使用步骤 Dell 灵越
  13. 大学计算机之软件设计程序框图,《程序框图、顺序结构》教学设计
  14. Xubuntu Linux发行版放弃即时消息软件Pidgin
  15. Could not find acceptable representation 原因探究
  16. adb一打开就闪退_常用的adb命令
  17. mysql的altertable_【编程词典】mysql ALTER TABLE语句
  18. 谷歌地球飞行模式使用教程
  19. 数控系统的计算机仿真论文,计算机仿真技术及其在数控加工中的应用数控仿真技术论文.doc...
  20. c语言课程设计(学生籍贯管理系统)学完c语言你可以做的案例

热门文章

  1. Linu修改系统时间
  2. windows7下面利用docker搭建jitsi-meet测试环境
  3. Redis Cluster 原生搭建(二)meet
  4. 香橙派3LTS部署ROS2阿克曼开源平台
  5. 计算机网络结构化布线的六个子系统,结构化网络综合布线系统有六大子系统组成...
  6. 发布工程到私有仓库maven
  7. android+美拍加表情,美拍怎么添加表情文字在哪
  8. 研究生综合英语unit4 前四段
  9. 常用的common function库(三)
  10. java算法合集-九阳神功第三式滑动窗口