RocketMQ源码分析之request-reply特性
1.什么是request-reply?
RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
2. 使用场景
- 快速搭建服务总线,实现rpc框架
- 调用链追踪分析
- 跨网络区域实现系统间同步调用
3.使用方法
- producer端
producer端调用request(final Message msg, final long timeout)方法以同步方式等待consumer端消费完消息并返回响应消息;调用request(final Message msg, final RequestCallback requestCallback, final long timeout)方法以异步方式等待consumer端消费完消息并返回响应消息。
同步方式:
public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "RequestTopic0218";String topic = "RequestTopic";long ttl = 300000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}
异步方式:
public class AsyncRequestProducer {private static final InternalLogger log = ClientLogger.getLog();public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "AsynRequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();producer.request(msg, new RequestCallback() {@Overridepublic void onSuccess(Message message) {long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);}@Overridepublic void onException(Throwable e) {System.err.printf("request to <%s> fail.", topic);}}, ttl);} catch (Exception e) {log.warn("", e);}/* shutdown after your request callback is finished */
// producer.shutdown();}
}
- consumer端
consumer端程序在原来的基础上会增加以下内容:
(1)创建producer用来发送消息
(2)在消费完消息后调用RocketMQ提供的MessageUtil.createReplyMessage(final Message requestMessage, final byte[] body)方法来构建响应消息
(3)调用send方法将响应消息发回给生产者
public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "ReplyProducer0218";String consumerGroup = "ResponseConsumer0218";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 300000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();System.out.printf("Consumer Started.%n");}
}
- 源码分析
在RocketMQ中producer端可以通过调用以下两个方法发送消息并等待consumer端返回响应消息:
- request(final Message msg, final long timeout)
- request(final Message msg, final RequestCallback requestCallback, final long timeout)
下面以producer同步等待consumer响应消息为例分析整个request-reply的过程:
public Message request(Message msg,long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginTimestamp = System.currentTimeMillis();prepareSendRequest(msg, timeout);final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);try {final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);long cost = System.currentTimeMillis() - beginTimestamp;this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {requestResponseFuture.setSendRequestOk(true);}@Overridepublic void onException(Throwable e) {requestResponseFuture.setSendRequestOk(false);requestResponseFuture.putResponseMessage(null);requestResponseFuture.setCause(e);}}, timeout - cost);return waitResponse(msg, timeout, requestResponseFuture, cost);} finally {RequestFutureTable.getRequestFutureTable().remove(correlationId);}}
(1)获取系统当前时间,方便后续进行超时判断
(2)调用prepareSendRequest(final Message msg, long timeout)函数将待发送给broker的消息进行改造,具体改造如下:
- 调用CorrelationIdUtil.createCorrelationId()生成该消息的correlationId,并将correlationId添加到消息的扩展属性CORRELATION_ID
- 获取producer的clientId并将其添加到消息的扩展属性REPLY_TO_CLIENT,该属性的作用在于后续consumer端发送响应消息时broker知道将消息发送给哪个producer端
- 将超时时间添加到消息的扩展属性TTL
(3)构建RequestResponseFuture对象,这里需要详细解释RequestResponseFuture对象,RequestResponseFuture是实现request-reply特性的关键,producer发送的每条消息都会new一个RequestResponseFuture对象:
- correlationId是CorrelationIdUtil.createCorrelationId()方法随机生成的UUID字符串,correlationId是用来标识从发送每条消息到conumer端发送响应消息的请求
- requestMsg是consumer端返回的响应消息
- countDownLatch在消息发送时会阻塞producer线程(调用了await实现阻塞),等到响应消息返回时激活producer线程,最后返回consumer端响应消息,所以虽然在内部实现上是以异步方式发送消息但是结合countDownLatch达到了同步的效果
- 由于是同步发送所以requestCallback为null
public class RequestResponseFuture {private final String correlationId;private final RequestCallback requestCallback;private final long beginTimestamp = System.currentTimeMillis();private final Message requestMsg = null;private long timeoutMillis;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile Message responseMsg = null;private volatile boolean sendRequestOk = true;private volatile Throwable cause = null;
(4)将<correlationId, requestResponseFuture>添加到requestFutureTable,后续consumer向broker发送RequestCode.SEND_REPLY_MESSAGE_V2请求将响应消息发送到broker,broker在处理这个请求时会调用pushReplyMessage方法发送RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求给producer,此时producer端会根据响应消息中correlationId在requestFutureTable中获取其对应的requestResponseFuture,并且会将响应消息赋给requestResponseFuture中的responseMsg。
(5)调用sendDefaultImpl方法以异步的方式发送消息,虽然是以异步方式发送消息但是结合RequestResponseFuture中的countDownLatch到达了同步效果。此时producer发送了RequestCode.SEND_MESSAGE请求给broker,broker后续的处理过程与发送普通消息是一样的。
(6)consumer在正常消费完消息后,需要调用MessageUtil.createReplyMessage方法构建响应消息,该方法有两个参数,分别是producer发送消息和响应消息体内容,该方法会从producer发送的消息的扩展属性中获取“CLUSTER”、“REPLY_TO_CLIENT”、“CORRELATION_ID”和“TTL”,并根据这些扩展属性以及响应消息体内容构建响应消息。这里需要注意,新构建的响应消息的topic是由producer发送的消息的扩展属性中的CLUSTER与REPLY_TOPIC拼接起来,即“集群名称_REPLY_TOPIC”,这个是一个系统级别的topic,是由broker自己创建的。
public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {if (requestMessage != null) {Message replyMessage = new Message();String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);replyMessage.setBody(body);if (cluster != null) {String replyTopic = MixAll.getReplyTopic(cluster);replyMessage.setTopic(replyTopic);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);return replyMessage;} else {throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");}}throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");}
(7)调用send方法发送响应消息到broker,在发送的过程中会判断消息的类型,由于该消息是reply类型的,所以向broker发送的请求类型是RequestCode.SEND_REPLY_MESSAGE_V2
(8)broker处理RequestCode.SEND_REPLY_MESSAGE_V2请求的是ReplyMessageProcessor,具体操作如下:
- 根据请求中响应消息的topic、queueId、消息体内容、消息标记、消息的扩展属性、消息产生的时间、消息的来源等信息构建MessageExtBrokerInner对象
- 调用pushReplyMessage方法构建RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求,然后根据消息扩展属性REPLY_TO_CLIENT获取broker与producer连接的channel,最后将请求发送给producer。这里有个问题:RocketMQ如何保证请求原路返回?首先producer产生的消息会发送到broker上,此时broker中存储的producer产生的消息的扩展属性中是包含存储的broker的集群名称的,接着consumer消息该消息并根据该消息构造出响应消息,在构造响应消息时,其topic是“集群名称_REPLY_TOPIC”,这样就保证了consumer在发送响应消息到broker是原路返回,即这里的broker是与producer连接的broker。
- 判断broker端的配置文件中storeReplyMessageEnable配置项的值是否为true,如果为true,则会将响应消息存储在broker端。storeReplyMessageEnable的默认值是true。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
}
(9)producer处理broker发送的RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求的是ClientRemotingProcessor,具体如下:
- 根据请求还原响应消息MessageExt
- 获取响应消息扩展属性CORRELATION_ID的值correlationId,在producer端的requestFutureTable中根据correlationId获取该消息对应的requestResponseFuture,然后将响应消息放入到requestResponseFuture中的responseMsg并将countDownLatch的值减一,此时producer端调用request方法的线程就激活了
- 从requestFutureTable中删除key为correlationId的数据项
private void processReplyMessage(MessageExt replyMsg) {final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);if (requestResponseFuture != null) {requestResponseFuture.putResponseMessage(replyMsg);RequestFutureTable.getRequestFutureTable().remove(correlationId);if (requestResponseFuture.getRequestCallback() != null) {requestResponseFuture.getRequestCallback().onSuccess(replyMsg);} else {requestResponseFuture.putResponseMessage(replyMsg);}} else {String bornHost = replyMsg.getBornHostString();log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",correlationId, bornHost));}}
(10)producer端调用request方法线程激活后会调用waitResponse方法返回requestResponseFuture中的responseMsg,这里最终调用的waitResponseMessage方法中带有一个参数:超时时间,如果到了超时时间后consumer端的响应消息没有被producer端收到,线程也会被激活,这样的设置也是防止producer线程一直被阻塞。
参考资料: 官方视频链接.
RocketMQ源码分析之request-reply特性相关推荐
- 《RocketMQ源码分析》NameServer如何处理Broker的连接
<RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- rocketmq源码分析 -生产者
概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...
- RocketMQ源码分析(十二)之CommitLog同步与异步刷盘
文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...
- RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...
- zuul源码分析之Request生命周期管理
为什么80%的码农都做不了架构师?>>> zuul核心框架 zuul是可以认为是一种API-Gateway.zuul的核心是一系列的filters, 其作用可以类比Servle ...
- javaweb_笔记2(Servlet源码分析;request详解;请求域;转发和重定向;WebServlet注解;jsp基础语法,JavaBean。)
1.HttpServlet源码分析 HttpServlet类是专门为HTTP协议准备的.比GenericServlet更加适合HTTP协议下的开发. HttpServlet在哪个包下? jakarta ...
- 【RocketMQ|源码分析】namesrv启动停止过程都做了什么
简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
最新文章
- mac https本地跨域配置
- sql语句练习(三):LeetCode
- python的数值类型_Python的数值类型
- java嵌套类与内部类
- 由浅入深理解----java反射技术
- HTML表div布局,html使用列表 以及div的布局和table的布局
- Linux下vsftpd基本配置和虚拟用户设置的安全方法
- MapReduce Python
- puppet详解(九)——puppet项目实战
- IE FF css兼容
- Denise God Mode for mac(磁带饱和器音频插件)
- Extended VINS-Mono: 大规模户外环境进行绝对和相对车辆定位的系统性方法(IROS2021)...
- Emacs 配置 latex
- 独木舟上的旅行(贪心算法)
- IntelliJ IDEA 文件只读
- 微信公众号图文如何添加PDF附件
- 针对RK3328平台搭建支持KVM的Linux环境
- 上众筹,智能手环走“全民路线”破局?
- 【0730】docker 入门(上)
- VR头戴显示器对健康有害吗?会引发晕动症、视觉辐辏调节冲突