前言

与消息发送紧密相关的几行代码:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Override

public void start() throws MQClientException {

this.defaultMQProducerImpl.start();

}

调用了默认生成消息的实现类 -- DefaultMQProducerImpl

调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()

另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:

上图中的三个部分中涉及的内容:

1.1 初始化MQClientInstance

一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:

public void start() throws MQClientException {

synchronized (this) {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server

if (null == this.clientConfig.getNamesrvAddr()) {

this.mQClientAPIImpl.fetchNameServerAddr();

}

// Start request-response channel

this.mQClientAPIImpl.start();

// Start various schedule tasks

this.startScheduledTask();

// Start pull service

this.pullMessageService.start();

// Start rebalance service

this.rebalanceService.start();

// Start push service

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

log.info("the client factory [{}] start OK", this.clientId);

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

break;

case SHUTDOWN_ALREADY:

break;

case START_FAILED:

throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

default:

break;

}

}

}

1.2 注册producer

该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:

-- selectProducer

-- updateTopicRouteInfoFromNameServer

-- prepareHeartbeatData

-- isNeedUpdateTopicRouteInfo

-- shutdown

注:

根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定义:

public class DefaultMQProducerImpl implements MQProducerInner {

private final Logger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();

它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 发送心跳包

MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。

sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:

// Producer

for (Map.Entry entry : this.producerTable.entrySet()) {

MQProducerInner impl = entry.getValue();

if (impl != null) {

ProducerData producerData = new ProducerData();

producerData.setGroupName(entry.getKey());

heartbeatData.getProducerDataSet().add(producerData);

}

二、. SendResult sendResult = producer.send(msg)

首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:

public SendResult send(Message msg,

long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

}

sendDefaultImpl做了啥?

2.1. 获取topicPublishInfo

根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 选择消息发送的队列

普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。

private SendResult sendSelectImpl(

Message msg,

MessageQueueSelector selector,

Object arg,

final CommunicationMode communicationMode,

final SendCallback sendCallback, final long timeout

) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

发送消息包(普通消息默认为同步方式):

SendResult sendResult = null;

switch (communicationMode) {

case SYNC:

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

msg,

requestHeader,

timeout,

communicationMode,

context,

this);

break;

处理来自broker端的响应数据包:

private SendResult sendMessageSync(

final String addr,

final String brokerName,

final Message msg,

final long timeoutMillis,

final RemotingCommand request

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

return this.processSendResponse(brokerName, msg, response);

}

broker端处理request数据包后会将消息存储到commitLog,具体过程后续分析。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

java 本地 mq_java rocketmq--消息的产生(普通消息)相关推荐

  1. RocketMQ 源码分析 事务消息

    为什么80%的码农都做不了架构师?>>>    1. 概述 必须必须必须 前置阅读内容: <事务消息(阿里云)> 2. 事务消息发送 2.1 Producer 发送事务消 ...

  2. 消息队列之事务消息,RocketMQ 和 Kafka是如何做的?

    一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID. 通常我们理解的事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统 ...

  3. RocketMQ的消费者消息重试和生产者消息重投

    详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...

  4. RocketMQ - 6 生产者,顺序消息

    Producer类型 Producer主要职能就是生产消息,发送消息.它可以对多个主题发送消息,甚至可以通过Tag定义些简单的过滤.更复杂的过滤可以使用filter组件来进行相应的业务操作 Rocke ...

  5. RocketMQ 源码阅读 ---- 消息消费(普通消息)

    RocketMQ Consumer 消费拉取的消息的方式有两种 1.      Push方式:rocketmq 已经提供了很全面的实现,consumer 通过长轮询拉取消息后回调 MessageLis ...

  6. java消息 框架_java 框架-消息队列ActiveMQ

    https://www.jianshu.com/p/ecdc6eab554c ActiveMQ从入门到精通(一) 22017.03.11 21:40:42字数 2650阅读 57286 这是关于消息中 ...

  7. java 消息队列服务_ActiveMQ 消息队列服务

    1 ActiveMQ简介 1.1 ActiveMQ是什么 ActiveMQ是一个消息队列应用服务器(推送服务器).支持JMS规范. 1.1.1 JMS概述 全称:Java Message Servic ...

  8. 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

    系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...

  9. Apache RocketMQ 正式开源分布式事务消息

    摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...

最新文章

  1. nginx环境的搭建
  2. php mysql orm_PHP ORM操作MySQL数据库
  3. java随机数语句_Java语言程序设计(七)Math类生成随机数及if语句
  4. spring boot : Invalid Keystore format Error 解决方法
  5. 命犯编程!因为名字特殊,她被苹果封了 6 个月
  6. c语言中二叉树中总结点,C语言二叉树的三种遍历方式的实现及原理
  7. 谈谈Java虚拟机——Class文件结构
  8. python nltk book_nltk book的下载
  9. dajngo电商数据库设计图,通用版本
  10. 峰值信噪比公式_关于 PSNR (Peak Signal-to-Noise Ratio) 峰值信噪比的个人理解
  11. Unity3D 颜色选择器
  12. Win10:修改电脑桌面路径
  13. BiLSTM文本分类实践
  14. ccf计算机认证考试题集,【计算机本科补全计划】CCF计算机职业资格认证 2017-03 试题初试...
  15. 扫码报修开启校园报修管理系统新时代
  16. python的开发者太负责任了_人生苦短,我用 Python
  17. 专业计算机能力考试 技巧,计算机二级考试复习技巧
  18. 告诉你什么是挖洞最清奇的脑回路
  19. [转帖]奋斗5年 从月薪3500到700万!
  20. jxl导入/导出Excel

热门文章

  1. 访问 GitHub 的速度很慢?试试这几种方法
  2. 自己动手写CPU(2)流水线数据相关问题
  3. 关于使用两个GTP/GTX出现[DRC RTSTAT-1]error([route 35-54] critical warning)的问题详解
  4. matlab获得帮助的途径,在MATLAB 中获得帮助的途径 ()。A、帮助浏览器B、help 命令C、lookfor 命令D、模糊查询...
  5. ios点击大头针气泡不弹出_画家双手抖不停,画不了画丢了工作,却迎合抖动创造出一个个奇迹...
  6. 监控url_熬夜之作:一文带你了解Cat分布式监控
  7. mysql基础表和修理表_MySQL基础知识——创建数据库和表
  8. F. 更改apache端口号
  9. js Math用法jquery是否为空对象判断
  10. Codeforces Round #420 E