rocketmq源码分析 -生产者
概念
生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口。
组件
Producer
消息生产者。在rocketmq中,生产者对应MQProducer接口:
public interface MQProducer extends MQAdmin {//启动void start() throws MQClientException;//关闭void shutdown();List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;//同步发送消息SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//异步发送消息void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;//oneway形式发送消息,相较于异步发送,其实就是没有注册回调函数void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;//发送事务消息TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException;//批量发送消息SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//还有各种形式的重载的发送消息的方法,省略了。。。//for rpcMessage request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final RequestCallback requestCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,InterruptedException;void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback,final long timeout) throws MQClientException, RemotingException,InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueue mq, final long timeout)throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
有两个实现:
- DefaultMQProducer:默认实现。没有实现发送事务消息的方法
- TransactionMQProducer:继承自DefaultMQProducer,实现了发送事务消息的方法
Message
在rocketmq中,Message类就代表着生产者产出的消息。一次看看它的属性:
topic:主题
flag:一些特殊的消息标记,int类型。标记的含义定义在MessageSysFlag中:
public final static int COMPRESSED_FLAG = 0x1; public final static int MULTI_TAGS_FLAG = 0x1 << 1; public final static int TRANSACTION_NOT_TYPE = 0; public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; public final static int BORNHOST_V6_FLAG = 0x1 << 4; public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5;
properties:额外的一些属性,Map类型。已经使用的扩展属性有:
- tags:消息过滤用
- keys:索引,可以有多个
- waitStoreMsgOK
- delayTimeLevel:消息延迟级别,用于定时消息或者消息重试
body:消息的内容
transactionId:事务消息用
MQClientInstance
见:MQClientInstance
MQFaultStrategy
SendMessageHook
发送消息时的hook函数,可以再消息发送前后做一些业务操作。接口定义如下:
public interface SendMessageHook {//命名String hookName();//发送消息前调用void sendMessageBefore(final SendMessageContext context);//发送消息后调用void sendMessageAfter(final SendMessageContext context);
}
使用
实现
本篇文章,我们会逐个分析下列过程:
- 生产者的启动
- 发送消息
(代码有删减,去除了try catch和日志等不太要紧的部分)
生产者的启动
在创建好Producer之后,使用它来发消息之前,需要先启动它,即调用它的start()方法,代码如下:
public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));//DefaultMQProducerImpl的启动this.defaultMQProducerImpl.start();if (null != traceDispatcher) {//TraceDispatcher相关见:traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());}
}
本文主要看DefaultMQProducerImpl的启动,代码如下
public void start(final boolean startFactory) throws MQClientException {//一些配置校验this.checkConfig();//如果没有特别指定producerGroup,就会把instanceName设置为进程idif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//创建一个MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注册到MQClientInstanceboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {//启动MQClientInstance,见[MQClientInstance](https://blog.csdn.net/yuxiuzhiai/article/details/103828284)mQClientFactory.start();}//发送心跳消息给brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//启动线程定时清理超时的请求 this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {RequestFutureTable.scanExpiredRequest();}}, 1000 * 3, 1000);
}
发送消息
已最普通的同步消息发送为例。主要实现在DefaultMQProducerImpl.sendDefaultImpl()方法中。进入方法
private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback,long timeout){final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//1.查找主题的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//最大可能的发送次数,同步的话就是重试次数+1,异步或者oneway就是只发一次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];//重试机制for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//2.选择消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//3.发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} //4.异常处理机制catch (各种异常 e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);exception = e;continue;}}if (sendResult != null) {return sendResult;}//如果sendResult是null,则说明有异常,进行异常处理}
}
1.查找主题的路由信息
如果是第一次,则会从namesrv获取topic元数据,获取后会缓存下来,以后从缓存中获取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//尝试从缓存获取TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//如果本地缓存没有或者有问题,则从namesrv获取if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//获取到了,返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//默认topic:TBW102this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}
2.选择消息队列
在rocketmq中,选择消息队列,大体上有两种方案。根据sendLatencyFaultEnable(故障延迟机制)属性值来判断(默认为false)。如果sendLatencyFaultEnable = true:
2.1. 开启了故障延迟机制下的MessageQueue选择
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {//对应这一段代码//大概意思就是根据MessageQueue的数量,round-robin负责均衡int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//如果正常的话,是走不到这里的。走到这里说明故障延迟机制下没有可用的brokerName//这个时候就强行挑一个发送final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}return tpInfo.selectOneMessageQueue();}//见2.2.2的分析return tpInfo.selectOneMessageQueue(lastBrokerName);
}
故障延迟机制见故障延迟机制
2.2. 默认机制(未开启故障延迟机制)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//这条消息是否是首次发送,如果不是的话,则lastBrokerName就是上次发送失败的broker的nameif (lastBrokerName == null) {//如果这条消息是第一次发送,则直接用一种round-robin方式挑选MessageQueuereturn selectOneMessageQueue();} else {//如果消息不是第一次发送,则本次挑选MessageQueue的时候尽量避免上次失败的brokerint index = this.sendWhichQueue.getAndIncrement();for (int i = 0; i < this.messageQueueList.size(); i++) {int pos = Math.abs(index++) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//如果就一个broker,那避免不了,就硬着头皮随机选一个return selectOneMessageQueue();}
}
3.发送消息
发送消息的入口方法:DefaultMQProducerImpl.sendKernelImpl()
private SendResult sendKernelImpl( Message msg, MessageQueue mq, CommunicationMode communicationMode,SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout){long beginStartTime = System.currentTimeMillis();//根据topic的路由信息拿到broker的地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;//有个是否启用vip通道的配置项,如果开启了,则会使用broker的另一个端口发送消息brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();//设置一个唯一的idif (!(msg instanceof MessageBatch)) {//给消息加上一个属性UNIQ_KEYMessageClientIDSetter.setUniqID(msg);}//namespace,还没搞懂boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//根据消息长度看看是不是要压缩一下if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//判断是否是事务消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();//设置各种属性checkForbiddenContext.setNameSrvAddr/Group/CommunicationMode/BrokerAddr/Message/Mq/UnitMode..();this.executeCheckForbiddenHook(checkForbiddenContext);}//SendMessageHook是一个用于自定义在消息发送前后做一些自定义处理的接口if (this.hasSendMessageHook()) {context = new SendMessageContext();//设置各种属性context.setProducer/ProducerGroup/CommunicationMode ..();String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}//判断是否是延时消息if (msg.getProperty("__STARTDELIVERTIME") != null ||msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null){context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//构建SendMessageRequestHeaderSendMessageRequestHeader requestHeader = new SendMessageRequestHeader();//省略设置各种属性的代码if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC://异步发送消息Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;case ONEWAY:case SYNC://同步或者oneway发送消息long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;default:assert false;break;}//SendMessageHook发送消息后的回调if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;
}
4.异常处理机制
可以看到,当发送消息出现异常时,都有一句:this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false)。具体分析请看故障延迟机制
批量发送消息
DefaultMQProducer.send(Collection msgs)方法定义如下:
public SendResult send(Collection<Message> msgs){//主要逻辑就是batch()方法return this.defaultMQProducerImpl.send(batch(msgs));
}
batch()方法的实现:
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {//将批量的消息转化为一个MessageBatch对象MessageBatch msgBatc = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {//为每一条单独的message设置uniq key、topicValidators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//批量消息跟普通消息的发送没有啥差别,只是将消息序列化成字节数组的时候有点不一样msgBatch.setBody(msgBatch.encode());msgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;
}
序列化MessageBatch的过程:
public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {//按照固定的格式序列化每一条messagebyte[] tmp = encodeMessage(message);encodedMessages.add(tmp);allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}return allBytes;
}
单独一条消息的序列化过程:
public static byte[] encodeMessage(Message message) {byte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXshort propertiesLength = (short) propertiesBytes.length;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(0);// 3 BODYCRCbyteBuffer.putInt(0);// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array();
}
结语
(参考丁威、周继峰<<RocketMQ技术内幕>>。水平有限,最近在看rocketmq源码,记录学习过程,也希望对各位有点微小的帮助。如有错误,请指正~)
rocketmq源码分析 -生产者相关推荐
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- 《RocketMQ源码分析》NameServer如何处理Broker的连接
<RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- RocketMQ源码分析之request-reply特性
1.什么是request-reply? RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...
- RocketMQ源码分析(十二)之CommitLog同步与异步刷盘
文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...
- 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析
在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...
- RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想
RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache ...
最新文章
- 爱情也许是最忧伤的童话
- windows下expdp自动备份脚本
- C#全局键盘监听(Hook)
- python django开发工具_利用pyCharm编辑器创建Django项目开发环境-python开发工具第一篇...
- c语言动态双端栈的原理,数据结构(C语言版)例题(第三章:栈和队列)
- CCF CSP201903-2二十四点
- Docker Image执行流程
- PDF文件打开密码解密
- web项目的中英文切换功
- centos离线安装docker-ce 18.03.0-ce
- 单点登录SSO:可一键运行的完整代码
- 创业半年回顾(没饭吃了,我再也不想创业了)
- 手机连上蓝牙耳机没有声音
- Perforce使用中文教程: p4 client
- Banner信息收集和美杜莎使用(9.26 第十二天)
- android 位于底部的tab,GitHub - DevinFu/BottomTabBar: Android应用中位于底部的tab栏
- 数据库系统概论 实验报告答案 实验二:创建及管理数据表
- TCP/IP 网络模型
- latex中——misplaced \noalign. \hline,\bottomrule,\midrule问题
- C#之AES加密解密
热门文章
- Python精美地理可视化绘制——以中国历年GDP数据为例
- 五、框架协议——合同
- Java IDE MyEclipse 使用教程:使用 MyEclipse 应用服务器(一)
- 自适应图片九宫格 css,一张背景实现自适应九宫格
- 专门用于管理企业与自己客户之间所有信息的客户管理系统
- AndroidStudio 3.1 截取手机屏幕
- 霏凡年度十大原创精华集最终版+随书附件包【最终版】
- python需要掌握的词汇量_北大保安,词汇量15000、会Python编程,网友:当保安都不够格!...
- DM3730学习日记-写在前面
- STC15系列单片机SPI使用教程(一)