1.什么是request-reply?

  RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
2. 使用场景

  • 快速搭建服务总线,实现rpc框架
  • 调用链追踪分析
  • 跨网络区域实现系统间同步调用

3.使用方法

  • producer端

  producer端调用request(final Message msg, final long timeout)方法以同步方式等待consumer端消费完消息并返回响应消息;调用request(final Message msg, final RequestCallback requestCallback, final long timeout)方法以异步方式等待consumer端消费完消息并返回响应消息。

同步方式:

public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "RequestTopic0218";String topic = "RequestTopic";long ttl = 300000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}

异步方式:

public class AsyncRequestProducer {private static final InternalLogger log = ClientLogger.getLog();public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "AsynRequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();producer.request(msg, new RequestCallback() {@Overridepublic void onSuccess(Message message) {long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);}@Overridepublic void onException(Throwable e) {System.err.printf("request to <%s> fail.", topic);}}, ttl);} catch (Exception e) {log.warn("", e);}/* shutdown after your request callback is finished */
//        producer.shutdown();}
}
  • consumer端

  consumer端程序在原来的基础上会增加以下内容:

  (1)创建producer用来发送消息

  (2)在消费完消息后调用RocketMQ提供的MessageUtil.createReplyMessage(final Message requestMessage, final byte[] body)方法来构建响应消息

  (3)调用send方法将响应消息发回给生产者

public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "ReplyProducer0218";String consumerGroup = "ResponseConsumer0218";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 300000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();System.out.printf("Consumer Started.%n");}
}
  1. 源码分析

在RocketMQ中producer端可以通过调用以下两个方法发送消息并等待consumer端返回响应消息:

  • request(final Message msg, final long timeout)
  • request(final Message msg, final RequestCallback requestCallback, final long timeout)

下面以producer同步等待consumer响应消息为例分析整个request-reply的过程:

public Message request(Message msg,long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginTimestamp = System.currentTimeMillis();prepareSendRequest(msg, timeout);final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);try {final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);long cost = System.currentTimeMillis() - beginTimestamp;this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {requestResponseFuture.setSendRequestOk(true);}@Overridepublic void onException(Throwable e) {requestResponseFuture.setSendRequestOk(false);requestResponseFuture.putResponseMessage(null);requestResponseFuture.setCause(e);}}, timeout - cost);return waitResponse(msg, timeout, requestResponseFuture, cost);} finally {RequestFutureTable.getRequestFutureTable().remove(correlationId);}}

(1)获取系统当前时间,方便后续进行超时判断

(2)调用prepareSendRequest(final Message msg, long timeout)函数将待发送给broker的消息进行改造,具体改造如下:

  • 调用CorrelationIdUtil.createCorrelationId()生成该消息的correlationId,并将correlationId添加到消息的扩展属性CORRELATION_ID
  • 获取producer的clientId并将其添加到消息的扩展属性REPLY_TO_CLIENT,该属性的作用在于后续consumer端发送响应消息时broker知道将消息发送给哪个producer端
  • 将超时时间添加到消息的扩展属性TTL

(3)构建RequestResponseFuture对象,这里需要详细解释RequestResponseFuture对象,RequestResponseFuture是实现request-reply特性的关键,producer发送的每条消息都会new一个RequestResponseFuture对象:

  • correlationId是CorrelationIdUtil.createCorrelationId()方法随机生成的UUID字符串,correlationId是用来标识从发送每条消息到conumer端发送响应消息的请求
  • requestMsg是consumer端返回的响应消息
  • countDownLatch在消息发送时会阻塞producer线程(调用了await实现阻塞),等到响应消息返回时激活producer线程,最后返回consumer端响应消息,所以虽然在内部实现上是以异步方式发送消息但是结合countDownLatch达到了同步的效果
  • 由于是同步发送所以requestCallback为null
public class RequestResponseFuture {private final String correlationId;private final RequestCallback requestCallback;private final long beginTimestamp = System.currentTimeMillis();private final Message requestMsg = null;private long timeoutMillis;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile Message responseMsg = null;private volatile boolean sendRequestOk = true;private volatile Throwable cause = null;

(4)将<correlationId, requestResponseFuture>添加到requestFutureTable,后续consumer向broker发送RequestCode.SEND_REPLY_MESSAGE_V2请求将响应消息发送到broker,broker在处理这个请求时会调用pushReplyMessage方法发送RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求给producer,此时producer端会根据响应消息中correlationId在requestFutureTable中获取其对应的requestResponseFuture,并且会将响应消息赋给requestResponseFuture中的responseMsg。

(5)调用sendDefaultImpl方法以异步的方式发送消息,虽然是以异步方式发送消息但是结合RequestResponseFuture中的countDownLatch到达了同步效果。此时producer发送了RequestCode.SEND_MESSAGE请求给broker,broker后续的处理过程与发送普通消息是一样的。

(6)consumer在正常消费完消息后,需要调用MessageUtil.createReplyMessage方法构建响应消息,该方法有两个参数,分别是producer发送消息和响应消息体内容,该方法会从producer发送的消息的扩展属性中获取“CLUSTER”、“REPLY_TO_CLIENT”、“CORRELATION_ID”和“TTL”,并根据这些扩展属性以及响应消息体内容构建响应消息。这里需要注意,新构建的响应消息的topic是由producer发送的消息的扩展属性中的CLUSTER与REPLY_TOPIC拼接起来,即“集群名称_REPLY_TOPIC”,这个是一个系统级别的topic,是由broker自己创建的。

public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {if (requestMessage != null) {Message replyMessage = new Message();String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);replyMessage.setBody(body);if (cluster != null) {String replyTopic = MixAll.getReplyTopic(cluster);replyMessage.setTopic(replyTopic);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);return replyMessage;} else {throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");}}throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");}

(7)调用send方法发送响应消息到broker,在发送的过程中会判断消息的类型,由于该消息是reply类型的,所以向broker发送的请求类型是RequestCode.SEND_REPLY_MESSAGE_V2

(8)broker处理RequestCode.SEND_REPLY_MESSAGE_V2请求的是ReplyMessageProcessor,具体操作如下:

  • 根据请求中响应消息的topic、queueId、消息体内容、消息标记、消息的扩展属性、消息产生的时间、消息的来源等信息构建MessageExtBrokerInner对象
  • 调用pushReplyMessage方法构建RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求,然后根据消息扩展属性REPLY_TO_CLIENT获取broker与producer连接的channel,最后将请求发送给producer。这里有个问题:RocketMQ如何保证请求原路返回?首先producer产生的消息会发送到broker上,此时broker中存储的producer产生的消息的扩展属性中是包含存储的broker的集群名称的,接着consumer消息该消息并根据该消息构造出响应消息,在构造响应消息时,其topic是“集群名称_REPLY_TOPIC”,这样就保证了consumer在发送响应消息到broker是原路返回,即这里的broker是与producer连接的broker。
  • 判断broker端的配置文件中storeReplyMessageEnable配置项的值是否为true,如果为true,则会将响应消息存储在broker端。storeReplyMessageEnable的默认值是true。
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
}

(9)producer处理broker发送的RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT请求的是ClientRemotingProcessor,具体如下:

  • 根据请求还原响应消息MessageExt
  • 获取响应消息扩展属性CORRELATION_ID的值correlationId,在producer端的requestFutureTable中根据correlationId获取该消息对应的requestResponseFuture,然后将响应消息放入到requestResponseFuture中的responseMsg并将countDownLatch的值减一,此时producer端调用request方法的线程就激活了
  • 从requestFutureTable中删除key为correlationId的数据项
private void processReplyMessage(MessageExt replyMsg) {final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);if (requestResponseFuture != null) {requestResponseFuture.putResponseMessage(replyMsg);RequestFutureTable.getRequestFutureTable().remove(correlationId);if (requestResponseFuture.getRequestCallback() != null) {requestResponseFuture.getRequestCallback().onSuccess(replyMsg);} else {requestResponseFuture.putResponseMessage(replyMsg);}} else {String bornHost = replyMsg.getBornHostString();log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",correlationId, bornHost));}}

(10)producer端调用request方法线程激活后会调用waitResponse方法返回requestResponseFuture中的responseMsg,这里最终调用的waitResponseMessage方法中带有一个参数:超时时间,如果到了超时时间后consumer端的响应消息没有被producer端收到,线程也会被激活,这样的设置也是防止producer线程一直被阻塞。

参考资料: 官方视频链接.

RocketMQ源码分析之request-reply特性相关推荐

  1. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  2. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  3. rocketmq源码分析 -生产者

    概念 生产者producer,用于生产消息,在rocketmq中对应着MQProducer接口. 组件 Producer 消息生产者.在rocketmq中,生产者对应MQProducer接口: pub ...

  4. RocketMQ源码分析(十二)之CommitLog同步与异步刷盘

    文章目录 版本 简介 FlushCommitLogService 同步刷盘 GroupCommitService 异步刷盘 CommitRealTimeService FlushRealTimeSer ...

  5. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

  6. zuul源码分析之Request生命周期管理

    为什么80%的码农都做不了架构师?>>>    zuul核心框架 zuul是可以认为是一种API-Gateway.zuul的核心是一系列的filters, 其作用可以类比Servle ...

  7. javaweb_笔记2(Servlet源码分析;request详解;请求域;转发和重定向;WebServlet注解;jsp基础语法,JavaBean。)

    1.HttpServlet源码分析 HttpServlet类是专门为HTTP协议准备的.比GenericServlet更加适合HTTP协议下的开发. HttpServlet在哪个包下? jakarta ...

  8. 【RocketMQ|源码分析】namesrv启动停止过程都做了什么

    简介 namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos.它的主要作用是为消息生产者和消息消费者 ...

  9. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

最新文章

  1. mac https本地跨域配置
  2. sql语句练习(三):LeetCode
  3. python的数值类型_Python的数值类型
  4. java嵌套类与内部类
  5. 由浅入深理解----java反射技术
  6. HTML表div布局,html使用列表 以及div的布局和table的布局
  7. Linux下vsftpd基本配置和虚拟用户设置的安全方法
  8. MapReduce Python
  9. puppet详解(九)——puppet项目实战
  10. IE FF css兼容
  11. Denise God Mode for mac(磁带饱和器音频插件)
  12. Extended VINS-Mono: 大规模户外环境进行绝对和相对车辆定位的系统性方法(IROS2021)...
  13. Emacs 配置 latex
  14. 独木舟上的旅行(贪心算法)
  15. IntelliJ IDEA 文件只读
  16. 微信公众号图文如何添加PDF附件
  17. 针对RK3328平台搭建支持KVM的Linux环境
  18. 上众筹,智能手环走“全民路线”破局?
  19. 【0730】docker 入门(上)
  20. VR头戴显示器对健康有害吗?会引发晕动症、视觉辐辏调节冲突

热门文章

  1. 七月的风,八月的雨,卑微的我喜欢遥远的你。
  2. 开源php商城系统选择
  3. wordpress教程之函数讲解
  4. 使用香橙派zero2及其他单网口开发板搭建UU加速盒
  5. jQuery的css()如何修改背景图片
  6. luoguP3799 妖梦拼木棒
  7. 2022企业邮箱移动oa办公系统使用攻略
  8. 销售易和纷享销客的“生存经”
  9. KISSY基础篇乄KISSY之Seed
  10. KISSY基础篇乄KISSY之HelloWorld