文章目录

    • 1.MQ引言
      • 1.1 什么是MQ
      • 1.2 主流MQ以及其特点
        • ActiveMQ
        • Kafka
        • RocketMQ
        • RabbitMQ
      • 1.3 MQ的作用
    • 2.RabbitMQ 的引言
      • 2.1 RabbitMQ 原理解析
      • 2.2 RabbitMQ 的安装
      • 2.3 RabbitMQ 的模型架构
    • 3.RabbitMQ 实践
      • 3.1 引入依赖
      • 3.2 第一种模型(直连)
        • 1. 开发生产者
        • 2. 开发消费者
      • 3.4 第二种模型(work quene)
        • 1. 开发生产者
        • 2.开发消费者-1
        • 3.开发消费者-2
        • 4.测试结果
        • 5.消息自动确认机制
      • 3.5 第三种模型(fanout)
        • 1. 开发生产者
        • 2. 开发消费者-1
        • 3. 开发消费者-2
        • 4.开发消费者-3
        • 5. 测试结果
      • 3.6 第四种模型(Routing)
        • 4.6.1 Routing 之订阅模型-Direct(直连)
          • 1. 开发生产者
          • 2.开发消费者-1
          • 3.开发消费者-2
          • 4.测试生产者发送Route key为error的消息时
          • 5.测试生产者发送Route key为info的消息时
        • 4.6.2 Routing 之订阅模型-Topic
          • 1.开发生产者
          • 2.开发消费者-1
          • 3.开发消费者-2
          • 4.测试结果
  • 总结

1.MQ引言

1.1 什么是MQ

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者消费者模型,生产者和消费者都是异步向队列中生产消息或者消费消息,俩者之间不直接通信且无业务逻辑的入侵,注重消息的生产和消费,隐藏了RPC以及网络通信协议的细节。通过高效可靠的消息传递提供了以低耦合的集成应用程序的一种机制。

1.2 主流MQ以及其特点

当今市面上有很多主流的消息中间件,如老牌的ActiveMQRabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。这里简单介绍其主要特点。

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!,支持JDBC持久化到数据库。
API成熟、集群模式成熟,但是吞吐量较差,适合中小性企业。

Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,合产生大量数据的互联网服务的数据收集业务。
吞吐量高,语言丰富多平台,但性能稳定性较差,有消息丢失。适合大量数据且允许极少数据丢失的业务场景。

RocketMQ

RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
消息都是持久化的、模型简单、吞吐量高、源码是java语言可以深究,但其成熟度并不高,且性能不稳定,与kafka类似。

RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息(MOM)、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
spring默认支持的rabbitmq,能很好的整合、吞吐量中等、稳定性极高。能做到消息不丢失,而且也有图形化管理界面。适合应用于安全性极高的应用场景,比如涉及钱相关的系统。

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

可参考常见消息中间件的对比、rabbitmq相关知识

1.3 MQ的作用

解耦易扩展:消费者和生产者不直接联系,无业务逻辑干涉,只关注消息的产生和消费。

持久化:消息的处理过程有可能会失败,那么MQ可以将数据持久化,多次反复处理,直到消息被完全处理。

削峰填谷:在访问量剧增的情况,消息中间件能够把“这个峰值给削平”,不会造成宕机或者动用其他资源来满足这个峰值的消耗。

异步通信:在前面就说到过,消费者和生产者都是异步消费或生产消息,不需要立即处理这个消息,而是在需要的时候慢慢处理,比如下单的时候会提示客户下单成功再增加会员积分,这个积分就可以异步处理,而不是非得登积分加了后再提示下单成功。

扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容 易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。


2.RabbitMQ 的引言

2.1 RabbitMQ 原理解析

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

官网: https://www.rabbitmq.com/

官方教程: https://www.rabbitmq.com/#getstarted

AMQP 协议
AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:


这张图也是RabbitMQ的底层原理图,一个一个解析。
server:相当于RabbitMQ的一个节点或者说RabbitMQ的容器,用来接收和分发消息的。
virtual host:出于安全因素设计的,把基本组件(下面组件)划分到一个虚拟的分组中,类似于xml中的namespace概念,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分多个virtual host,“相互隔离的div”。
connection:这个组件在上图中没有描写出来,它是publisher以及consumer与server建立的一个连接,它是一个总称,其内部有多个channel组件。
channel:如果每次访问RabbitMQ都建立一个connection,在大量消息的时候,建立大量的connection,这是巨消耗性能的,所以有了channel,每个线程创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端的和server能够识别channel,所以channel之间是完全隔离的。
Exchange:当message到达server的第一站,根据不同的发放规则,匹配表中的routing key,分发到不同的消息queue队列中去。
queue:消息最终被送到这里等待。
binding:Exchange和queue之间的虚拟连接,binding中可以包含routing key,binding信息被保存到exchange中的查询表中,用于message的分发依据。这个binding关系非常重要,后面讲的一些业务场景,基本就是根据binding去实现的。

2.2 RabbitMQ 的安装

# 1.将rabbitmq安装包上传到linux系统中erlang-22.0.7-1.el7.x86_64.rpmrabbitmq-server-3.7.18-1.el7.noarch.rpm# 2.安装Erlang依赖包rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm# 3.安装RabbitMQ安装包(需要联网)yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
# 4.复制配置文件cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config# 5.查看配置文件位置ls /etc/rabbitmq/rabbitmq.config# 6.修改配置文件(参见下图:)vim /etc/rabbitmq/rabbitmq.config

将上图中配置文件中红色部分去掉%%,以及最后的,逗号 修改为下图:

为了能够在web管理界面对其属性操作。不打开的话是操作不了的。

# 7.执行如下命令,启动rabbitmq中的插件管理rabbitmq-plugins enable rabbitmq_management# 8.启动RabbitMQ的服务systemctl start rabbitmq-serversystemctl restart rabbitmq-serversystemctl stop rabbitmq-server# 9.查看服务状态(见下图:)systemctl status rabbitmq-server
# 10.关闭防火墙服务systemctl disable firewalldsystemctl stop firewalld   # 11.访问web管理界面http:/虚拟机ip:15672/ 端口为15672
# 12.登录管理界面username:  guestpassword:  guest

其他的一些命令

# 1.服务启动相关systemctl start|restart|stop|status rabbitmq-server# 2.管理命令行  用来在不使用web管理界面情况下命令操作RabbitMQrabbitmqctl  help  可以查看更多命令# 3.插件管理命令行rabbitmq-plugins enable|list|disable

当我们安装后登录可以看到这个界面,类似于dubbo的admin管理界面。我们后面的一些操作都可以在这里看到效果,有些操作也可以直接在这个界面操作。


上面介绍过AMQP,为了更好的理解下文的实践,这里在仔细介绍rabbitmq的模型架构。

2.3 RabbitMQ 的模型架构


这图…勉强够看吧,确实少了点美术细胞。

RabbitMQ其实本质上就是个消费与生产者的模型,只是其内部结构与消费方式或者说消费模型并不简单,首先生产者生产消息要放到RabbitMQ容器里面去,意味着要连接RabbitMQ容器,但是一有消息就连接容器,每一个生产者都连接容器,显然这是不合理的,所以一个Connection里有多个Channel管道,相当于细分为多个子Connecttion,只需要通过Channel管道通信。

来到RabbitMQ中,其内部有多个Virtual Host虚拟主机,相当于把容器分为多个子容器,每个容器都互不相干,相互独立。这样我们在某个虚拟机里面就可以进行操作了,当消息来到某个虚拟主机,也不是直接放到队列中的。而是可以有一定规律的,好比一个厂商发货,A产品要发湖南、湖北俩个省份,B产品要发广东、云南、湖南三个省份,C产品发本地,那么分发的时候可以给A指定规则,发湖南、湖北,发B也是一样制定其相应的规则,发C比较简单,不用制定规则,就直接拿自己车送就行。而我们消息来到虚拟主机中也是类似的机制。

可以与Exchange交换机Binding绑定关系,制定特殊的key关系,来指定消息发送到哪里去,这里涉及俩个key,一个是Routing Key 一个是Binding Key,路由key是消息和Exchange交换机绑定的,而绑定key是Exchange交换机和队列的绑定关系,其性质是一样的,为了建立准确的关系即消息应该准确发往哪里。

当然不声明Exchange交换机的话,那么消息就是点对点的简单模式,没有那么多规则。就写在一个队列中,谁来消息这个消息,就意味着消息往哪里发。

以上 大致介绍了RabbitMQ模型的大致流程。其中Exchange交换机是重点,有与交换机绑定的模型机制也有不与交换机绑定的模型机制,这里大致说一下,下节再仔细介绍细节。

点对点模式也称做简单模式,一个生产者生产消息,放至队列中,一个消费者消费消息。一对一服务,至尊VIP。

工作模式也称竞争模式,一个生产者生产消息,放至队列中,多个消息者抢定消息,一对多服务,这里也有值得注意的地方,该模式,生产者默认根据消费者的数量平均分配消息,真是个公正公平的法官,但是也可以设置能者多劳的机制,比如一个生产者生产10条消息,有俩个消费者,那么原来的情况是每个消费者得到5个消息,但是某个消费者突然出情况, 才消费一个消息就嗝屁了,这是另一个消费者已经消费完了所分配的5个消息,那么采取能者多劳的机制,该消费者将继续工作,为嗝屁的消费者消费其余的消息。

前面俩种模式都是放至一个队列中,单个或者多个消费者消费不一样的消息,而随着业务的发展,比如一个消息,要被ABC消费者消费,总不能先生产消息让A消费完了,再生产同样的消息让BC再消费吧。这样显得很呆,于是就有了发布订阅模式也称广播模式, 这个模式第一次引入Exchange交换机的概念,每个消费者都有队列,每个队列绑定对应的交换机,比如ABC消费者绑定某个交换机,那么消息一来到交换机,就由交换机分发给ABC,值得注意的是,这里是共享,意味着我一个消息可以让ABC三个都消费,哪个绑定了这个交换机就有资格共享消费交换机里面的消息,很有意思的是,可以拿广播来举例,交换机就像个广播,消息放里面,但凡绑定过这个广播频道的,交换机能把消息共享到每个消费者。

那么问题又来了,原来一直采用发布订阅模式,让一个消息让ABC三个消费者共享,突然因为某种业务要求,决定不让C消费了,很简答嘛直接解绑不就行了吗,这种思考方式确实没问题,但是还不够完美。来看看RabbitMQ是怎么做的,交换机不解绑,而是通过引入routing key来精确细化Exchange对消息的发送管理。比如ABC绑定的是同一个交换机,但是路由keyAB是一样的,C不一样,那么生产者具体到交换机再具体到路由,就能准确发送给AB避开C了。可以将路由key理解为交换机的一个属性,生产者具体到某个交换机的属性进行生产消息,更颗粒化更准确。这种叫做发布订阅直连模式

通过上面的一步步演变,基本上接近完美了,但是还是存在不足的地方,比如上面讲到的AB声明了一个路由key,来与C区分,这种前提是在同一个交换机的情况下, 如果A要满足多个交换机岂不是要声明多个路由key?,而实际情况中,能用到RabbitMQ的项目说明业务并不简单,一个消费者或者生产者要满足多个路由,岂不是要声明多次,注意这个不会覆盖,A消费者声明一个路由key再声明一个路由key,意味着能消费这俩个路由key的消息。但是情况一旦复杂起来,增加路由key都要在消费者或产生者多声明一个路由key,这样也是显得很呆。所以发布订阅模式升级为主题模式,也就是动态的发布订阅模式,它通过声明路由key的时候,将这个值大致化,模糊化,来达到动态的效果,比如我声明路由key为“student.name.zhangsan”,意味着消费者接受路由key为student.name.zhangsan的消息(交换机确定的情况下),消费者这是要接受student.name.wangwu的消息,那么可以将之前的student.name.zhangsan改为student.name.*,代表student.name.开头的后面什么名字都接受,也可以用#代替,*可以代替一个词。#可以代替零个或多个单词。关于这一点官网上讲的很清楚。

这里只是把几种模式用大白话文初略翻译了一下,建议细读,对后文的代码理解相当有帮助。


3.RabbitMQ 实践

3.1 引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.2</version>
</dependency>

3.2 第一种模型(直连)

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1. 开发生产者
  //创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.23.129");//地址connectionFactory.setPort(5672);//端口connectionFactory.setUsername("ems");//访问虚拟主机的用户名密码connectionFactory.setPassword("123");connectionFactory.setVirtualHost("/ems");//虚拟主机(当前操作节点)Connection connection = connectionFactory.newConnection();//建立连接后创建通道Channel channel = connection.createChannel();//创建一个通道//参数1: 是否持久化  参数2:是否独占队列 参数3:是否自动删除  参数4:其他属性channel.queueDeclare("hello",true,false,false,null);//声明队列//参数1: 交换机名称 参数2:队列名称  参数3:传递消息额外设置  参数4:消息的具体内容channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());//发布队列channel.close();//关闭通道connection.close();//关闭连接

很多技术都是相似的,相信看过我JDBC的博客很容易察觉,这些代码和JDBC很像,很多持久化框架内部封装了JDBC,而RabbitMQ也会这样做,与spring无缝整合的时候,也会将这段大部分代码封装成一个对象或者组件,但是很有必要了解最初的模样。
这里再解析一个重要的方法
queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
queue:队列名称,如果该队列名不存在则自动创建,如果该队列存在,且其他属性和已存在的队列不同,则会报错。
durable:是否持久化,如果为false那么该队列是零时的,当服务重启便不存在,类似于临时缓存一样。为true则会存入硬盘之中。重启还会存在。
exclusive:是否独占队列,意思是当前队列是否只能被当前这个连接能够访问
autoDelete:消费完后是否自动删除,不管是false还是true,没消费都不会自动删除,消费后才会删除或不删除。
arguments:队列的其他参数,一般为null。

2. 开发消费者
  //创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.23.129");connectionFactory.setPort(5672);connectionFactory.setUsername("ems");connectionFactory.setPassword("123");connectionFactory.setVirtualHost("/ems");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("hello", true, false, false, null);channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});

消费者代码和生产者代码几乎一样,这俩者的关系用齿轮来形容,最恰当不过。生产者的消息属性,哪个为false哪个为true必须一一对应,才能进行工作。

basicConsume(String queue, boolean autoAck, Consumer callback) throws lOException;
autoAck:是否自动确认消息被消费,
callback:重写handleDelivery接受boby消息


3.4 第二种模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者-2:领取任务并完成任务,假设完成速度快
1. 开发生产者

生产10条消息

channel.queueDeclare("hello", true, false, false, null);
for (int i = 0; i < 10; i++) {channel.basicPublish("", "hello", null, (i+"====>:我是消息").getBytes());
}
2.开发消费者-1

正常消费

channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}
});
3.开发消费者-2

模拟消费异常情况

channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);   //处理消息比较慢 一秒处理一个消息} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2: "+new String(body));  }
});
4.测试结果

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

可以明显发现,尽管消费者2消费的很慢,生产者也是将消息平均分配给他了,是因为消息分到消费者就被当作已经消费了,也就是basicConsume中第二个属性。那要是消费者2不仅仅只是休眠一秒而是一分钟十分钟或者直接嗝屁宕机了,那么分给消费者2的消息就会很长时间再消费或者部分丢失,很显然这样是不行的,所以我们首先需要将消费者2限制一条一条的来,这条消费完了再下一条,那么就需要做出以下调整。

5.消息自动确认机制

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

channel.basicQos(1);//一次只接受一条未确认的消息
//参数2:关闭自动确认消息
channel.basicConsume("hello",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息}
});
  • 设置通道一次只能消费一个消息

  • 关闭消息的自动确认,开启手动确认消息



3.5 第三种模型(fanout)

fanout 扇出 也称为广播

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1. 开发生产者
//声明交换机
channel.exchangeDeclare("logs","fanout");//广播 一条消息多个消费者同时消费
//发布消息
channel.basicPublish("logs","",null,"hello".getBytes());

对于声明交换机exchangeDeclare有多个重载方法,上面代码写的可以说是最简单的,第一个参数表示交换机名称可以自己命名,第二个参数表示交换机类型,常见的交换机类型有fanout、direct、topic,这个小节要介绍的是fanout。

2. 开发消费者-1
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将临时队列绑定exchange
channel.queueBind(queue,"logs","");
//处理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}
});

当声明交换机后,就不再是queueDeclare去声明队列了,而是queueBind绑定交换机中的队列,
queueBind(String queue, String exchange, String routingKey)
queue:是交换机的队列
exchange:交换机的名字
routingKey:用来绑定队列和交换器的路由键;
这个地方默认为空,因为fanout模式还没有用到routingKey概念,后面会讲到。

3. 开发消费者-2
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将临时队列绑定exchange
channel.queueBind(queue,"logs","");
//处理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}
});
4.开发消费者-3
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//将临时队列绑定exchange
channel.queueBind(queue,"logs","");
//处理消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者3: "+new String(body));}
});
5. 测试结果

消费者123都能够得到hello的消息。


3.6 第四种模型(Routing)

4.6.1 Routing 之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

流程:

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
1. 开发生产者
//声明交换机  参数1:交换机名称 参数2:交换机类型 基于指令的Routing key转发
channel.exchangeDeclare("logs_direct","direct");
String key = "";
//发布消息
channel.basicPublish("logs_direct",key,null,("指定的route key"+key+"的消息").getBytes());
2.开发消费者-1
 //声明交换机
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warn");//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}
});
3.开发消费者-2
//声明交换机
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queue,"logs_direct","error");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}
});
4.测试生产者发送Route key为error的消息时

5.测试生产者发送Route key为info的消息时


4.6.2 Routing 之订阅模型-Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

# 统配符* (star) can substitute for exactly one word.    匹配不多不少恰好1个词# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:audit.#    匹配audit.irs.corporate或者 audit.irs 等audit.*   只能匹配 audit.irs
1.开发生产者
//生命交换机和交换机类型 topic 使用动态路由(通配符方式)
channel.exchangeDeclare("topics","topic");
String routekey = "user.save";//动态路由key
//发布消息
channel.basicPublish("topics",routekey,null,("这是路由中的动态订阅模型,route key: ["+routekey+"]").getBytes());
2.开发消费者-1

Routing Key中使用*通配符方式

 //声明交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列与交换机并设置获取交换机中动态路由
channel.queueBind(queue,"topics","user.*");//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}
});
3.开发消费者-2

Routing Key中使用#通配符方式

//声明交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列与交换机并设置获取交换机中动态路由
channel.queueBind(queue,"topics","user.#");//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}
});
4.测试结果

总结

本篇博客先从认识MQ开始研究了RabbitMQ,再实践其基本操作。至此对RabbitMQ已经有了熟悉的轮廓,下篇博客会继续写RabbitMQ稍微高级一些的知识。


消息队列中间件之RabbitMQ(上)相关推荐

  1. rabbitmq实战:高效部署分布式消息队列_一文看懂消息队列中间件--AMQ及部署介绍...

    概述 最近有个小项目用到了AMQ来做消息队列,之前介绍的主要是rabbitmq,所以今天主要提一下AMQ,也简单介绍下两者的区别~ 消息队列中间件 消息队列中间件(简称消息中间件)是指利用高效可靠的消 ...

  2. ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程

    附上消息队列中间件百度网盘连接: 链接: https://pan.baidu.com/s/1FFZQ5w17e1TlLDSF7yhzmA 密码: hr63 转载于:https://www.cnblog ...

  3. 消息中间件系列(七):如何从0到1设计一个消息队列中间件

    消息队列作为系统解耦,流量控制的利器,成为分布式系统核心组件之一. 如果你对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现非常重要. 不仅知其然还要知其所以然,这才是一个优秀的工程师需要具 ...

  4. MQ消息队列中间件:

    MQ消息队列中间件: 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应. 异步通讯:就像发信息,不需要马上回复. 同步调用的优点: 时效性较强,可以立即得到结果 同步调用的问题: ...

  5. 消息队列中间件 Message Queue 简称:MQ

    一.什么是消息队列? 消息队列,一般我们会简称它为MQ(Message Queue) 队列是一种先进先出的数据结构. 现有常用的开源消息中间件有Kafka.CMQ.JBoss Messaging.JO ...

  6. 浅谈消息队列及常见的分布式消息队列中间件

    背景 分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列主要解决应用耦合.异步消 ...

  7. python消息队列中间件_常见的消息队列中间件介绍

    题目 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 什么是消息队列 在正式介绍和对比Kafka. ...

  8. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  9. 基于硬件的消息队列中间件 Solace 简介之二

    小短篇介绍关于Solace https://blog.csdn.net/aqudgv83/article/details/79495489 . 前面简单介绍了Solace来自于哪家公司, 主要能做哪些 ...

最新文章

  1. Jupyter 快速入门——写python项目博客非常有用!!!
  2. MyEclipse 设置字体
  3. [原创]TimeQuest约束外设之诡异的Create Generated Clocks用法
  4. 设计模式C++实现(2)——策略模式
  5. python算法实现源码_python 实现A_算法的示例代码
  6. 认识:人工智能AI 机器学习 ML 深度学习DL
  7. H3C交换机4核心节点IRF2虚拟化下检测机制
  8. 怎么用eclipse编写python_python用eclipse开发配置
  9. 注释可以出现在c语言任何位置,在c程序中,注释语句只能位于一条语句的后面吗...
  10. ECMAScript 6 基础入门
  11. reflexil教程_【转载】教你使用 Reflexil 反编译.NET
  12. 【图像隐写】基于matlab DCT数字水印嵌入+检测+攻击(测试鲁棒性)【含Matlab源码 1133期】
  13. Java面试题全集中
  14. 【粉丝福利,免费送书】SQL编程思想
  15. 在ArcGIS中自定义符号制作时,发现无法正常使用符号单位,是怎么回事?
  16. 迄今为止最完整的DDD实践
  17. Linux motd详解
  18. 2021青岛十九中高考成绩查询,@青岛高考生 2020新高考模拟考可以查成绩了
  19. PDF编辑器哪个好用,怎么在PDF上修改文字
  20. NetworkX 算法列表

热门文章

  1. 1分钟免费开通IDaaS云服务
  2. ZIP压缩包密码加密、解密
  3. 光流文件(.flo)转成图片(.png)
  4. 航空航天与国防数字化验证解决方案 | 达索系统百世慧®
  5. 中级育婴师证怎么考,需要些什么条件
  6. kalman滤波纯纯纯纯纯纯理论推导
  7. 惯导IMU和惯导INS
  8. 大数据——corejava学习笔记
  9. 035 Rust死灵书之Vec处理零尺寸类型
  10. Kafka,ActiveMQ,RabbitMQ等消息队列使用的场景介绍