RabbitMQ指南(下)
在上一小节中我们改进了log系统,由于使用fanout
类型的exchange只能进行全局的广播,因此我们使用direct
类型的exchange做了代替, 使得我们可以选择性的接收消息。尽管使用fanout exchange改进了log系统,但它仍然有限制——不能基于多个条件做路由。
Topics
在log系统中可能不只是基于不同的日志级别作订阅,也可能会基于日志的来源。你也许听过Unix下名为syslog
的工具, 它把日志按照严重级别(info/warn/crit…)和设备(auth/cron/ker…)进行路由。
这会给我们许多的灵活性,也许我们只想监听’cron’中的’critical’级别的错误日志,以及所有’kern’中的日志。 为了实现这种日志系统,我们需要学习一个更复杂的topic
类型的exchange。
Topic exchange
发送到topic exchange中的消息不能有一个任意的routing_key
——它必须是一个使用点分隔的单词列表。单词可以是任意的, 但是通常会指定消息的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的长度限制为255个字节数。
binding key也必须是相同的形式。topic exchange背后的逻辑类似于direct——一条使用特定的routing key发送的消息将会被传递至所有使用与该routing key相同的binding key进行绑定的队列中。 然而,对binding key来说有两种特殊的情况:
- *(star)可以代替任意一个单词
- #(hash)可以代替0个或多个单词
使用一张图可以很简单地来说明:
在图中,我们将要发送被描述的动物的消息。消息的routing key将由三个单词组成(通过两个点分隔)。routing key中的第一个单词将描述速度, 第二个是颜色,第三个是物种:"<speed>.<colour>.<species>"
。
我们创建三个绑定:Q1使用binding key"*.orange.*"
来绑定,Q2使用"*.*.rabbit"
以及lazy.#
绑定。
这些绑定可以被总结为:
- Q1对所有橘色的的动物感兴趣
- Q2想要接收所有关于兔子的消息以及所有关于lazy的动物的消息
一条使用routing key"quick.orange.rabbit"
发送的消息将被同时传递到两个队列中。消息"lazy.orange.elephant"
同样如此。 另一方面,"quick.orange.fox"
只会被第一个queue接收,"lazy.brown.fox"
只会被第二个queue接收。 "lazy.pink.rabbit"
只会被传递到Q2一次,即使它对两个binding key都匹配。"quick.brown.fox"
与两个queue的binding key都不匹配, 因此将被丢弃。
如果打破我们的约定,使用一个单词或者四个单词的routing key例如"orange"
,"quick.orange.male.rabbit"
发送消息将会发生什么? 这些消息不会匹配任何绑定,因此会丢失。
但是对于"lazy.orange.male.rabbit"
,即使它有四个单词,但是它与第二个queue的binding key匹配,因此将会被发送到第二个queue中。
当一个queue使用"#"
(hash)作为binding key,那么它将会接收所有的消息,忽略routing key,就好像使用了fanout exchange。 当特殊字符”*“(star)和”#“(hash)在绑定中没有用到,topic exchange将会与direct exchange的行为相同。
了解了topic exchange之后,我们将它用在我们的log系统中,我们定义的routing key将会有两个单词组成:"<facility>.<severity>"
。
完成的EmitLogTopic.java
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs" ;
public static void main(String[] argv)
throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost( "localhost" );
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic" );
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null , message.getBytes());
System.out.println( " [x] Sent '" + routingKey + "':'" + message + "'" );
connection.close();
}
//...
}
|
完整的ReceiveLogsTopic.java
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs" ;
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost( "localhost" );
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic" );
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1 ) {
System.err.println( "Usage: ReceiveLogsTopic [binding_key]..." );
System.exit( 1 );
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte [] body) throws IOException {
String message = new String(body, "UTF-8" );
System.out.println( " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" );
}
};
channel.basicConsume(queueName, true , consumer);
}
}
|
运行的时候从命令行中输入binding key来进行绑定,接收不同的消息。
Remote procedure call (RPC)
在第二小节中我们学习了如何使用Work Queues
来在多个workers中分发耗时的任务。但是如果我们需要调用远程计算机上的一个函数并等待结果返回呢? 这就是另外一个故事了。这种模式通常称为远程过程调用或RPC。
在这一小节我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有实际的耗时任务用来分发, 因此我们将创建一个虚拟的RPC服务返回Fibonacci数。
Client interface
为了说明RPC服务是如何使用的,我们将创建一个简单的客户端类。它将暴露一个名为call
的方法发送一次RPC请求并且阻塞直到结果返回:
1
2
3
|
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call( "4" );
System.out.println( "fib(4) is " + result);
|
Callback queue
使用RabbitMQ来进行RPC是非常简单的。客户端发送一个请求到服务端,服务端接收后返回响应的消息。为了接收到响应的消息,我们需要在请求中发送一个callback 的queue地址。我们可以使用默认的queue(在Java的client中它是exclusive的)。
1
2
3
4
5
6
7
8
9
10
|
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish( "" , "rpc_queue" , props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
|
Message properties
AMQP协议预定义了消息的14种属性。大部分的都很少使用,除了以下这些:
deliveryMode
:标记一条消息是持久化的(使用值2)还是非持久化的(使用其它值)。在第二节中有过介绍。contentType
:用来描述mime类型的编码。例如使用JSON的话就这样设置属性:application/json
。replyTo
:一般用来命名一个回调queue。correlationId
:用来关联RPC的请求和响应。
我们需要导入新的类:
1
|
import com.rabbitmq.client.AMQP.BasicProperties;
|
Correlation Id
在之前的方法中我们建议为每个RPC请求创建一个回调queue。这显得有点影响性能,幸运的是有一种更好的方式——每个客户端只创建一个回调queue。 但这产生了一个新问题,无法将相应的Response和Request对应起来。这个时候就需要用到correlationId
属性。对于每个请求它都将有一个唯一的值。 当我们在回调queue中接收到消息之后,检查该属性,看是否与Request匹配。如果是一个未知的correlationId
值,那么我们可以安全的忽略这条消息, 因为它不属于我们的请求。
你也许会问,为什么我们应该忽略回调queue中未知的消息而不是抛出异常?这是因为服务端可能会出现竞争条件。尽管不太常见,但是也有可能RPC server在发送响应后挂了, 并且也没有接收到客户端发送的ack。如果发生了这种情况,RPC server在重启后将会重新处理这个请求。这就是为什么在客户端我们需要优雅的处理重复的响应, RPC应该是幂等的。
Summary
我们的RPC整个过程是这样的:
- 当客户端启动,它创建一个匿名的并且是exclusive的回调queue。
- 在一次RPC请求中,客户端发送的消息有两个属性:
replyTo
,放置的是回调queue的信息。correlationId
,放置的是每个请求唯一的值。 - 请求被发送到一个rpc_queue中。
- RPC服务端在queue的另一端等待请求。当请求到来时,它处理任务并将消息的结果发送回客户端,使用
replyTo
中设置的queue。 - 客户端在回调queue中等待响应的数据,当消息出现时,它先检查
correlationId
属性。如果匹配的话就将结果返回到应用中。
最后来看一下完整的代码实现。
Fibonacci函数:
1
2
3
4
5
|
private static int fib( int n) throws Exception {
if (n == 0 ) return 0 ;
if (n == 1 ) return 1 ;
return fib(n- 1 ) + fib(n- 2 );
}
|
完整的RPCServer.java
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
private static final String RPC_QUEUE_NAME = "rpc_queue" ;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost( "localhost" );
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false , false , false , null );
channel.basicQos( 1 );
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false , consumer);
System.out.println( " [x] Awaiting RPC requests" );
while ( true ) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println( " [.] fib(" + message + ")" );
String response = "" + fib(n);
channel.basicPublish( "" , props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false );
}
|
server端的代码非常直观:
- 首先创建一个连接、channel和声明一个queue。
- 我们也许想要运行不止一个服务端进程。为了在多个server间做到负载均衡,通过channel.basicQos设置
prefetchCount
。 - 我们使用
basicConsume
来进入queue。然后使用无限循环来等待请求的消息,处理之后再返回响应。
完整的RPCClient.java
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue" ;
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost( "localhost" );
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true , consumer);
}
public String call(String message) throws Exception {
String response = null ;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish( "" , requestQueueName, props, message.getBytes());
while ( true ) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break ;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
|
客户端代码有一点点的复杂:
- 我们创建连接和channel,以及声明一个exclusive的回调queue用来接收响应的消息。
- 订阅回调queue,这样就可以接收到RPC服务端响应的消息。
- call方法发出一个RPC请求。
- 我们首先生成一个唯一的
correlationId
数字并且保存它——在while循环中使用它来匹配相应的response。 - 下一步,发送请求的消息,使用两个属性:
replyTo
和correlationId
。 - 之后就是等待响应的消息返回。
- 在while循环中做了一些简单的工作,检查响应的消息的
correlationId
是否与Request相匹配。如果是的话,则保存响应。 - 最终向用户返回响应。
发送客户端请求:
1
2
3
4
5
6
7
|
RPCClient fibonacciRpc = new RPCClient();
System.out.println( " [x] Requesting fib(30)" );
String response = fibonacciRpc.call( "30" );
System.out.println( " [.] Got '" + response + "'" );
fibonacciRpc.close();
|
这样就通过RabbitMQ简单的实现了RPC的通信。
from: http://www.importnew.com/24329.html
RabbitMQ指南(下)相关推荐
- RabbitMQ指南(三) 队列
RabbitMQ指南(三) 队列 3.1 持久化 3.2 排他 3.3 自动删除 3.4 其他参数 声明队列使用的queueDeclare()方法包含一系列入参,这些入参定义了队列的属性.需要注意 ...
- java 并发指南_并发编程 :Concurrent 用户指南 ( 下 )
原标题:并发编程 :Concurrent 用户指南 ( 下 ) 来源:高广超, www.jianshu.com/p/8cb5d816cb69 20. 锁 Lock java.util.concurre ...
- i.MX Yocto项目用户指南 -- 下
i.MX Yocto项目用户指南 – 下 5映像构建 本节提供了构建映像的详细信息和过程. 5.1构建配置 i. MX提供了一个脚本fsl-setup-release.sh,它简化了i.MX机器的设置 ...
- RabbitMQ指南之二:工作队列(Work Queues)
在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息.消费从从该命名队列中消费消息.在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务.工作队列(即任务队列)的主要思想 ...
- RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者.本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式. 为了阐述这个 ...
- 数据科学家成长指南(下)
点击上方"Datawhale",选择"星标"公众号 第一时间获取价值内容 本文是数据科学家学习路径的的完结篇,算上<数据科学家成长指南(上)>和&l ...
- linux7 yum安装rabbitmq,CentOS7linux下yum安装RabbitMQ以及使用顶
CentOS7 linux下yum安装RabbitMQ以及使用,亲测可用!亲测可用!亲测可用! 系统版本:CentOS 7 RabbitMQ-Server:3.5.1 一.安装erlang 1.安装准 ...
- RabbitMQ指南(中)
原文出处: Listen 在上一篇文章中,介绍了使用RabbitMQ的Hello World例子, 以及如何创建一个work queue.在work queue的例子中每条消息都只会被传递到一个wor ...
- RabbitMQ指南(上)
原文出处: Listen RabbitMQ是一个消息中间件,在一些需要异步处理.发布/订阅等场景的时候,使用RabbitMQ可以完成我们的需求. 下面是我在学习RabbitMQ的过程中的一些记录,内容 ...
最新文章
- 江西财经大学第一届程序设计竞赛 G题小Q的口袋校园
- libiconv2.dll
- RTX操作系统库方式移植
- AB1601中断的问题
- php 比较,PHP类型比较
- 字符串系列之最长回文子串
- OAuth1.0介绍
- wtf是转换ttf_WTF入门,它是终端的仪表板
- 一行一个链接代码_AI最优论文+代码查找神器:966个ML任务、8500+论文任你选
- TestNG安装及配置
- c# MVC在WEB.Config中配置MIME
- uniapp底部导航栏中间凸起_PPT导航栏,真的非常好用!
- java jdbc 连接mysql 数据库
- 用口诀背英语单词绿色版简介
- stata计量之前数据清洗的必备步骤
- Java练习题之循环输出数字图形。例如1 22 333 4444 55555每输出一种数字换行。
- 考拉消息中心消息盒子处理重构(策略模式)
- 2021年全国安全生产月 安全知识网络竞赛 链工宝“测测你的安全力”题库 三百多道真题含答案
- 5 降维 Dimention Reduction
- android qq协议分析工具,分析Androidqq协议之收到qq群消息
热门文章
- oracle pde文件导入
- ubuntu下安装vue-cli后输入vue提示无法找到vue命令解决办法
- 4. Nest :module (Model)
- java同一包protect_Java基础知识 - 欢迎来到夜故事,一个人的故事 - OSCHINA - 中文开源技术交流社区...
- matlab抓取股票数据,Matlab经过sina web接口获取个数即时股票数据函数实现代码
- UnicodeDecodeError: ‘utf-8‘ codec can‘t decode byte 0x8b in position 1: invalid start byte
- 华为M2 e服务器信息解析失败,关于华为:华为审核被拒检查HMS-Core更新失败
- 安卓屏幕适配方案(根据今日头条方案,升级版)
- android 创建选项卡,Android用Fragment创建选项卡
- java类函数默认的保护级别_事件说明