快速学习-RocketMQ-“Request-Reply”特性
“Request-Reply”特性
1 使用场景
随着服务规模的扩大,单机服务无法满足性能和容量的要求,此时需要将服务拆分为更小粒度的服务或者部署多个服务实例构成集群来提供服务。在分布式场景下,RPC是最常用的联机调用的方式。
在构建分布式应用时,有些领域,例如金融服务领域,常常使用消息队列来构建服务总线,实现联机调用的目的。消息队列的主要场景是解耦、削峰填谷,在联机调用的场景下,需要将服务的调用抽象成基于消息的交互,并增强同步调用的这种交互逻辑。为了更好地支持消息队列在联机调用场景下的应用,rocketmq-4.7.0推出了“Request-Reply”特性来支持RPC调用。
2 设计思路
在rocketmq中,整个同步调用主要包括两个过程:
(1)请求方生成消息,发送给响应方,并等待响应方回包;
(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。
整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:
2.1 请求方如何同步等待回包
这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。
public class RequestResponseFuture {private final String correlationId;private final RequestCallback requestCallback;private final long beginTimestamp = System.currentTimeMillis();private final Message requestMsg = null;private long timeoutMillis;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile Message responseMsg = null;private volatile boolean sendRequestOk = true;private volatile Throwable cause = null;
}
RequestResponseFuture中,利用correlationId来标识一个请求。如下图所示,Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。
2.2 consumer消费消息后,如何准确回包
(1)producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker,如下图所示。
(2)broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。
响应消息发送和一般的消息发送流程区别在于,响应消息不需要producer拉取,而是由broker直接推给producer。同时选择broker的策略也有变化:请求消息从哪个broker发过来,响应消息也发到对应的broker上。
Producer收到响应消息后,根据消息中的唯一标识符,从RequestResponseFuture的map中找到对应的RequestResponseFuture结构,设置响应消息,同时计数器减一,解除等待状态,使请求方收到响应消息。
3 使用方法
同步调用的示例在example文件夹的rpc目录下。
3.1 Producer
Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
调用接口替换为request即可。
3.2 Consumer
需要启动一个producer,同时在覆写consumeMessage方法的时候,自定义响应消息并发送。
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 3000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
4 接口参数
4.1 public Message request(Message msg,long timeout)
msg:待发送的消息
timeout:同步调用超时时间
4.2 public void request(Message msg, final RequestCallback requestCallback, long timeout)
msg:待发送的消息
requestCallback:回调函数
timeout:同步调用超时时间
4.3 public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout)
msg:待发送的消息
selector:消息队列选择器
arg:消息队列选择器需要的参数
timeout:同步调用超时时间
4.4 public void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback, final long timeout)
msg:待发送的消息
selector:消息队列选择器
arg:消息队列选择器需要的参数
requestCallback:回调函数
timeout:同步调用超时时间
4.5 public Message request(final Message msg, final MessageQueue mq, final long timeout)
msg:待发送的消息
mq:目标消息队列
timeout:同步调用超时时间
4.6 public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
msg:待发送的消息
mq:目标消息队列
requestCallback:回调函数
timeout:同步调用超时时间
快速学习-RocketMQ-“Request-Reply”特性相关推荐
- 快速学习Java8新特性第七讲——Optional类
在<快速学习Java8新特性第五讲--强大的Stream API>这一讲中,我就已经提及到了Optional类.在这一讲中,我将对其做一个更加细致的讲解. Optional类是什么? Op ...
- 快速学习使用springmvc、strust2、strust1以及它们的对比
1.如何快速学习springmvc 首先,我们需要在复制spring相关的jar包到web-inf/lib里面去,然后在web.xml里面加入以下代码,相当于springmvc里面的servlet,这 ...
- 阿里巴巴Java开发手册快速学习
Java作为一门名副其实的工业级语言,语法友好,学习简单,大规模的应用给代码质量的管控带来了困难,特别是团队开发中,开发过程中的规范会直接影响最终项目的稳定性. 善医者"未有形而除之&quo ...
- ElasticSearch快速学习---30分钟入门ElasticSearch
ElasticSearch快速学习 ElasticSearch原理,30分钟入门ElasticSearch 目录 1 解析es的分布式架构 2 分片和副本机制 3 单节点环境下创建索引分析 4 两个 ...
- 如何自学python爬虫-小白如何快速学习Python爬虫?
原标题:小白如何快速学习Python爬虫? 很多同学想学习 爬虫 ,对于小白来说,爬虫可能是一件非常复杂.技术门槛很高的事情.而且爬虫是入门 Python 最好的方式,没有之一. 我们可以通过爬虫获取 ...
- 【软件开发底层知识修炼】十五 快速学习GDB调试二 使用GDB进行断点调试
上一篇文章我们学习了使用GDB的最基本方法:[软件开发底层知识修炼]十四 快速学习GDB调试一 入门使用 本篇文章将学习GDB的断点调试.断点调试是一种非常重要的调试方法. 文章目录 1 断点类型 2 ...
- vue中集合取第一个_快速学习Vue框架(知识点集合)
学习Vue的小伙伴速度看过来,快速学习Vue框架知识点集合贴来啦.建议收藏,尤其基础并不是很扎实的同学,本篇集合贴就是你日后工作的参考手册. 基础知识: ·vue的生命周期:beforeCreate/ ...
- 快速学习JS的思考方法、有用
当人们尝试学习 JavaScript , 或者其他编程技术的时候,常常会遇到同样的挑战: 有些概念容易混淆,特别是当你学习过其他语言的时候. 很难找到学习的时间(有时候是动力). 一旦当你理解了一些东 ...
- GitHub快速学习-一
title: GitHub快速学习(一) categories: GitHub tags: github abbrlink: 3997432119 date: 2019-06-26 19:06:32 ...
- Java基础 快速学习(一)
注意:本系列博客是在作者已经有一年多的C++经验的基础上学习Java的个人总结,可能并不适合所有看到这篇博客的人. 一.数据类型 1.整数:byte,short,int,long 分别对应1,2,4 ...
最新文章
- 模式的学习笔记----转摘自把爱好作为自己工作的人
- synchronized不能锁静态变量_多线程编程不可错过——彻底理解synchronized
- C++——拷贝构造函数
- [RHEL5企业级Linux服务攻略]--第9季 Squid服务全攻略之高级配置
- 课程直播|极致AI助力新经济时代个性化精准营销
- 【渝粤教育】国家开放大学2019年春季 1362应用语言学 参考试题
- Matlab如何求离散点的导数
- 解读Android 4.0 Camera原生应用程序的设计思路
- 初学网站建设,要学习些什么?
- 如何攻克 C++ 中复杂的类型转换?
- activemq部署
- PHP下载服务器上的文件
- ubuntu stardict词典安装
- 2022辽宁最新消防设施操作员模拟试题题库及答案
- vue antd的menu组件使用
- 大菠萝已经完全变味了
- SpringBoot总结(六)--连接oracle数据库demo
- python解决猴子偷桃问题_猴子偷桃蟠桃园土地是知道的,不举报是不敢吗?
- excel减法函数_数据工作中常用到的EXCEL技巧之文本分析类
- Python生成中文词云图(二):不可指定词云特定的形状和颜色。
热门文章
- [深度学习] 使用LSTM实现股票预测
- php 多语言cms,帝国cms模板实现多国语言切换
- dell服务器sd卡装系统,DELL服务器通过sd卡安装系统(iDRAC-Use-vFlash-).docx
- 无线路由登不上服务器怎么办,登录不了无线路由器的管理界面怎么办?
- 安全多方计算之四:比特承诺
- Python——用面相对象写奥特曼大战小怪兽
- linux的内存占用分析,Linux 内存占用分析
- httpclient请求webservice接口
- 《Mysql必知必会》-----笔记(2)
- python图片批量处理(水印、重命名)