概念

生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口。

组件

Producer

消息生产者。在rocketmq中,生产者对应MQProducer接口:

public interface MQProducer extends MQAdmin {//启动void start() throws MQClientException;//关闭void shutdown();List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;//同步发送消息SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//异步发送消息void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;//oneway形式发送消息,相较于异步发送,其实就是没有注册回调函数void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;//发送事务消息TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException;//批量发送消息SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//还有各种形式的重载的发送消息的方法,省略了。。。//for rpcMessage request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final RequestCallback requestCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,InterruptedException;void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback,final long timeout) throws MQClientException, RemotingException,InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueue mq, final long timeout)throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}

有两个实现:

  • DefaultMQProducer:默认实现。没有实现发送事务消息的方法
  • TransactionMQProducer:继承自DefaultMQProducer,实现了发送事务消息的方法

Message

在rocketmq中,Message类就代表着生产者产出的消息。一次看看它的属性:

  • topic:主题

  • flag:一些特殊的消息标记,int类型。标记的含义定义在MessageSysFlag中:

    public final static int COMPRESSED_FLAG = 0x1;
    public final static int MULTI_TAGS_FLAG = 0x1 << 1;
    public final static int TRANSACTION_NOT_TYPE = 0;
    public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
    public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
    public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
    public final static int BORNHOST_V6_FLAG = 0x1 << 4;
    public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5;
    
  • properties:额外的一些属性,Map类型。已经使用的扩展属性有:

    • tags:消息过滤用
    • keys:索引,可以有多个
    • waitStoreMsgOK
    • delayTimeLevel:消息延迟级别,用于定时消息或者消息重试
  • body:消息的内容

  • transactionId:事务消息用

MQClientInstance

见:MQClientInstance

MQFaultStrategy

SendMessageHook

发送消息时的hook函数,可以再消息发送前后做一些业务操作。接口定义如下:

public interface SendMessageHook {//命名String hookName();//发送消息前调用void sendMessageBefore(final SendMessageContext context);//发送消息后调用void sendMessageAfter(final SendMessageContext context);
}

使用

实现

本篇文章,我们会逐个分析下列过程:

  • 生产者的启动
  • 发送消息

(代码有删减,去除了try catch和日志等不太要紧的部分)

生产者的启动

在创建好Producer之后,使用它来发消息之前,需要先启动它,即调用它的start()方法,代码如下:

public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));//DefaultMQProducerImpl的启动this.defaultMQProducerImpl.start();if (null != traceDispatcher) {//TraceDispatcher相关见:traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());}
}

本文主要看DefaultMQProducerImpl的启动,代码如下

public void start(final boolean startFactory) throws MQClientException {//一些配置校验this.checkConfig();//如果没有特别指定producerGroup,就会把instanceName设置为进程idif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//创建一个MQClientInstance实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注册到MQClientInstanceboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {//启动MQClientInstance,见[MQClientInstance](https://blog.csdn.net/yuxiuzhiai/article/details/103828284)mQClientFactory.start();}//发送心跳消息给brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//启动线程定时清理超时的请求  this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {RequestFutureTable.scanExpiredRequest();}}, 1000 * 3, 1000);
}

发送消息

已最普通的同步消息发送为例。主要实现在DefaultMQProducerImpl.sendDefaultImpl()方法中。进入方法

private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback,long timeout){final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//1.查找主题的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//最大可能的发送次数,同步的话就是重试次数+1,异步或者oneway就是只发一次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];//重试机制for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//2.选择消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//3.发送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} //4.异常处理机制catch (各种异常 e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);exception = e;continue;}}if (sendResult != null) {return sendResult;}//如果sendResult是null,则说明有异常,进行异常处理}
}

1.查找主题的路由信息

如果是第一次,则会从namesrv获取topic元数据,获取后会缓存下来,以后从缓存中获取

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//尝试从缓存获取TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//如果本地缓存没有或者有问题,则从namesrv获取if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//获取到了,返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//默认topic:TBW102this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}

2.选择消息队列

在rocketmq中,选择消息队列,大体上有两种方案。根据sendLatencyFaultEnable(故障延迟机制)属性值来判断(默认为false)。如果sendLatencyFaultEnable = true:

2.1. 开启了故障延迟机制下的MessageQueue选择

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {//对应这一段代码//大概意思就是根据MessageQueue的数量,round-robin负责均衡int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//如果正常的话,是走不到这里的。走到这里说明故障延迟机制下没有可用的brokerName//这个时候就强行挑一个发送final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}return tpInfo.selectOneMessageQueue();}//见2.2.2的分析return tpInfo.selectOneMessageQueue(lastBrokerName);
}

故障延迟机制见故障延迟机制

2.2. 默认机制(未开启故障延迟机制)

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//这条消息是否是首次发送,如果不是的话,则lastBrokerName就是上次发送失败的broker的nameif (lastBrokerName == null) {//如果这条消息是第一次发送,则直接用一种round-robin方式挑选MessageQueuereturn selectOneMessageQueue();} else {//如果消息不是第一次发送,则本次挑选MessageQueue的时候尽量避免上次失败的brokerint index = this.sendWhichQueue.getAndIncrement();for (int i = 0; i < this.messageQueueList.size(); i++) {int pos = Math.abs(index++) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//如果就一个broker,那避免不了,就硬着头皮随机选一个return selectOneMessageQueue();}
}

3.发送消息

发送消息的入口方法:DefaultMQProducerImpl.sendKernelImpl()

private SendResult sendKernelImpl( Message msg, MessageQueue mq, CommunicationMode communicationMode,SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout){long beginStartTime = System.currentTimeMillis();//根据topic的路由信息拿到broker的地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;//有个是否启用vip通道的配置项,如果开启了,则会使用broker的另一个端口发送消息brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();//设置一个唯一的idif (!(msg instanceof MessageBatch)) {//给消息加上一个属性UNIQ_KEYMessageClientIDSetter.setUniqID(msg);}//namespace,还没搞懂boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//根据消息长度看看是不是要压缩一下if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//判断是否是事务消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();//设置各种属性checkForbiddenContext.setNameSrvAddr/Group/CommunicationMode/BrokerAddr/Message/Mq/UnitMode..();this.executeCheckForbiddenHook(checkForbiddenContext);}//SendMessageHook是一个用于自定义在消息发送前后做一些自定义处理的接口if (this.hasSendMessageHook()) {context = new SendMessageContext();//设置各种属性context.setProducer/ProducerGroup/CommunicationMode ..();String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}//判断是否是延时消息if (msg.getProperty("__STARTDELIVERTIME") != null ||msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null){context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//构建SendMessageRequestHeaderSendMessageRequestHeader requestHeader = new SendMessageRequestHeader();//省略设置各种属性的代码if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC://异步发送消息Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;case ONEWAY:case SYNC://同步或者oneway发送消息long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;default:assert false;break;}//SendMessageHook发送消息后的回调if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;
}

4.异常处理机制

可以看到,当发送消息出现异常时,都有一句:this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false)。具体分析请看故障延迟机制

批量发送消息

DefaultMQProducer.send(Collection msgs)方法定义如下:

public SendResult send(Collection<Message> msgs){//主要逻辑就是batch()方法return this.defaultMQProducerImpl.send(batch(msgs));
}

batch()方法的实现:

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {//将批量的消息转化为一个MessageBatch对象MessageBatch msgBatc = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {//为每一条单独的message设置uniq key、topicValidators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//批量消息跟普通消息的发送没有啥差别,只是将消息序列化成字节数组的时候有点不一样msgBatch.setBody(msgBatch.encode());msgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;
}

序列化MessageBatch的过程:

public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {//按照固定的格式序列化每一条messagebyte[] tmp = encodeMessage(message);encodedMessages.add(tmp);allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}return allBytes;
}

单独一条消息的序列化过程:

public static byte[] encodeMessage(Message message) {byte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXshort propertiesLength = (short) propertiesBytes.length;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(0);// 3 BODYCRCbyteBuffer.putInt(0);// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array();
}

结语

(参考丁威、周继峰<<RocketMQ技术内幕>>。水平有限,最近在看rocketmq源码,记录学习过程,也希望对各位有点微小的帮助。如有错误,请指正~)

rocketmq源码分析 -生产者相关推荐

  1. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  2. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  3. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  4. RocketMQ源码分析之request-reply特性

    1.什么是request-reply?   RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返 ...

  5. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

  6. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  7. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  8. mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析

    在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...

  9. RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

    RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache ...

最新文章

  1. 爱情也许是最忧伤的童话
  2. windows下expdp自动备份脚本
  3. C#全局键盘监听(Hook)
  4. python django开发工具_利用pyCharm编辑器创建Django项目开发环境-python开发工具第一篇...
  5. c语言动态双端栈的原理,数据结构(C语言版)例题(第三章:栈和队列)
  6. CCF CSP201903-2二十四点
  7. Docker Image执行流程
  8. PDF文件打开密码解密
  9. web项目的中英文切换功
  10. centos离线安装docker-ce 18.03.0-ce
  11. 单点登录SSO:可一键运行的完整代码
  12. 创业半年回顾(没饭吃了,我再也不想创业了)
  13. 手机连上蓝牙耳机没有声音
  14. Perforce使用中文教程: p4 client
  15. Banner信息收集和美杜莎使用(9.26 第十二天)
  16. android 位于底部的tab,GitHub - DevinFu/BottomTabBar: Android应用中位于底部的tab栏
  17. 数据库系统概论 实验报告答案 实验二:创建及管理数据表
  18. TCP/IP 网络模型
  19. latex中——misplaced \noalign. \hline,\bottomrule,\midrule问题
  20. C#之AES加密解密

热门文章

  1. Python精美地理可视化绘制——以中国历年GDP数据为例
  2. 五、框架协议——合同
  3. Java IDE MyEclipse 使用教程:使用 MyEclipse 应用服务器(一)
  4. 自适应图片九宫格 css,一张背景实现自适应九宫格
  5. 专门用于管理企业与自己客户之间所有信息的客户管理系统
  6. AndroidStudio 3.1 截取手机屏幕
  7. 霏凡年度十大原创精华集最终版+随书附件包【最终版】
  8. python需要掌握的词汇量_北大保安,词汇量15000、会Python编程,网友:当保安都不够格!...
  9. DM3730学习日记-写在前面
  10. STC15系列单片机SPI使用教程(一)