MQ全称为Message Queue, 即消息队列,RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列应用场景

1,任务异步处理

将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间

2,应用程序解耦合

MQ相当于一个中介,生产通过MQ与消费方交互,它将应用程序进行解耦合。

市场上还有哪些消息队列?

AactiveMQ,RabbitMQ, ZeroMQ,kafka,MetaMQ,RocketMQ,Redis

为什么使用RabbitMq呐?

1, 使用简单,功能强大

2,基于AMQP协议

3,社区活跃,文档完善

4,高并发性能,这主要得益于Erlang语言

5,Spring Boot 默认已集成RabbitMQ

RabbitMQ 的工作原理

5, 消费者接收到消息

otp_win64_20.3.exe已经上传百度网盘

rabbitmq-server-3.7.3.exe 已上传百度网盘

安装完rabbitMq-server-3.7.3.exe 后,RabbitMQ 服务就已经启动了,可以在win服务中查看,

安装完rabbitMq,电脑开始页面会有rabbitMq 的管理菜单

如果没有的话,可以使用黑窗口命令:

使用管理员权限执行上面的cmd命令,

启动完服务后,还需要执行下安装插件命令,才可在浏览器端查看管理rabbitMq

执行安装插件命令时,第一次执行,提示重启rabbitMQ 服务,重启后,再次执行命令方可成功

成功后执行 localhost:15672,

用户名秘密皆为guest

2.2 Hello World

练习rabbitmq 使用

按照官方教程(http://www.rabbitmq.com/getstarted.html)测试helloworld:

首先 在springBoot 上创建 两个模块

test-rabbitmq-producer

test-rabbitmq-consumer

创建完成后,引用rabbitmq 的包,

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>xc-framework-parent</artifactId><groupId>com.xuecheng</groupId><version>1.0-SNAPSHOT</version><relativePath>../xc-framework-parent/pom.xml</relativePath></parent><modelVersion>4.0.0</modelVersion><artifactId>test-rabbitmq-producer</artifactId><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp‐client</artifactId><version>4.0.3</version><!-- 此版本与 springboot  1.5.9版本匹配 --></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId></dependency></dependencies></project>

引用的就是以下代码:

 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp‐client</artifactId><version>4.0.3</version><!-- 此版本与 springboot  1.5.9版本匹配 --></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

我是从pdf 上复制粘贴的代码,字体格式编码有问题,所以出现无法注入maven 依赖的问题,找不到maven 依赖,重新手打印一下即可。

生产者代码如下:

package com.test.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.DateFormat;
import java.time.LocalDate;
import java.util.concurrent.TimeoutException;public class Producer01 {//队列名称private static final String QUEUE = "helloWord";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");try {//建立新连接Connection connection = null;connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();//声明队列,如果队列在mq中没有则要创建// 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments/*** 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE,true,false,false,null);LocalDate date = LocalDate.now();String message = "zjk "+date;/*** 发送消息方法* 参数:String exchange, String routingKey, BasicProperties props, byte[] body* exchange:Exchange的名称,如果没有指定,则使用Default Exchange* routingKey:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列* props:消息包含的属性* body:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或者解除绑定默认的交换机* routingKey 等于队列名称**/channel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("SEND MESSAGE IS:"+message);} catch (Exception e) {e.printStackTrace();} finally {}}
}

此代码调用了rabbitmq 方法,即可发送消息,通过安装的rabbitmq服务管理网站,可以查看到我们发送的消息

消费者


public class Consumer01 {//队列名称private static final String QUEUE = "helloWord";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq中没有则要创建// 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments/*** 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE,true,false,false,null);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("JIE MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};//监听队列/*** 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}

4,工作模式

RabbitMQ 有一下几种工作模式

1,Work queues

2,Publish/Subscribe

3,Routing

4,Topics

5,Header

6,RPC

4.1 Work queues (工作队列模式)

work queues 与入门程序相比,多个一个消费端,两个消费端共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

这个和入门程序的不同是,多了消费者,这里的消费者是两个,我们可以根据显示中的场景,部署多个消费者,每个消费者将会轮询接收消息,

测试:

1,使用入门程序,启动多个消费者

2,生产者发送多个消息

结果;

1,一条消息只会被一个消费者接收

2,rabbit采用轮询的方式将消息是平均的发送给消费者

3,消费者在处理完某条消息后,才会收到下一个消息

4.2 Publish/subscribe(发布订阅模式)

发布订阅模式:

1,每个消费者监听自己的队列

2,生产者将消息发给broker,由交换机将消息转发绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

3,与交换机绑定的有多个队列,每个消费者监听自己的队列

4,生产者将消息发送给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

5,如果消息发给没有绑定队列的交换机上消息将消失。

Publish/Subscribe 与work queues 有什么区别?

1,publish/subscribe 可以定义一个交换机绑定多个队列,一个消息可以发送给多个队列

2,work queues 无需定义交换机,一个消息一次只能发送给一个队列

3,publish/subscribe 与work queues 的功能更强大,publish/subscribe 也可以将多个消费者监听同一个队列实现work queues 的功能。

4.2.2 代码

案例:

用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信,邮件多种方法。

发布订阅模式代码

生产者:

package com.test.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;public class Producer02_Publish {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* 参数明细:String exchange,BuiltinExchangeType type* 1,交换机名称* 2,交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 交换机和队列绑定* 参数明细 String queue,String exchange,String routingKey* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");/*** 发送消息方法* 参数:String exchange, String routingKey, BasicProperties props, byte[] body* exchange:Exchange的名称,如果没有指定,则使用Default Exchange* routingKey:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列* props:消息包含的属性* body:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或者解除绑定默认的交换机* routingKey 等于队列名称**/String message = "郑继坤帅哥 "+ LocalDateTime.now();channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("SEND MESSAGE IS:"+message);} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

消费者

package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_Publish_Email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";//交换机名称private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}
package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_Publish_SMS {//队列名称private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}

4.3 Routing(路由模式)

4.3.1 工作模式

路由模式:

1,每个消费者监听自己的队列,并且设置routingkey

2,生产者将消息发给交换机,由交换机根据routingKey来转发消息到指定的队列。

在发送消息前,会设置路由key类型,当队列有这个类型时,才会将消息发送给指定路由key 的队列。

4.3.2 代码

1,生产者

声明exchange_routing_inform 交换机

声明两个队列并且绑定到此交换机,绑定时需要指定routingkey

发送消息时需要指定routingkey

生产者代码:

package com.test.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;public class Producer02_routing {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";//路由keyprivate static final String INFORM_EMAIL = "inform_email";private static final String INFORM_SMS = "inform_sms";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* 参数明细:String exchange,BuiltinExchangeType type* 1,交换机名称* 2,交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 交换机和队列绑定* 参数明细 String queue,String exchange,String routingKey* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,INFORM_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,INFORM_SMS);/*** 发送消息方法* 参数:String exchange, String routingKey, BasicProperties props, byte[] body* exchange:Exchange的名称,如果没有指定,则使用Default Exchange* routingKey:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列* props:消息包含的属性* body:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或者解除绑定默认的交换机* routingKey 等于队列名称**/String message = "郑继坤帅哥EMAIL "+ LocalDateTime.now();String msgSms = "郑继坤帅哥SMS "+ LocalDateTime.now();channel.basicPublish(EXCHANGE_ROUTING_INFORM,INFORM_EMAIL,null,message.getBytes());channel.basicPublish(EXCHANGE_ROUTING_INFORM,INFORM_SMS,null,msgSms.getBytes());System.out.println("SEND MESSAGE IS:"+message);} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

消费者代码:

package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_ROUTING_SMS {//队列名称private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";//路由keyprivate static final String INFORM_SMS = "inform_sms";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,INFORM_SMS);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}
package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_Routing_Email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";//交换机名称private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";//路由keyprivate static final String INFORM_EMAIL = "inform_email";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,INFORM_EMAIL);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}

4.4 Topics

工作模式

路由模式:

1,一个交换机可以绑定多个队列,每个队列可以设置一个或者多个带通配符的routingKey

2,生产者将消息发给交换机,交换机根据routingKey的值来匹配队列,匹配时采用通配符方式,匹配成功的将消息转发给指定的队列

案例:

根据用户的通知设置去通知用户,设置接收Email 的用户只接收Email,设置接收sms 的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

规则,通过生产者发送消息,生产者设置路由key

消费者根据通配符设置路由,如果匹配,生产者就可以发送消息给消费者

通配符规则:

中间以"." 分隔。

符号# 可以匹配多个词,符号* 可以匹配一个词语

生产者:

消费者:

1,生产者

声明交换机,指定topic 类型:

package com.test.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeoutException;public class Producer02_Topic {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";//路由keyprivate static final String INFORM_EMAIL_SMS = "inform_email_sms";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* 参数明细:String exchange,BuiltinExchangeType type* 1,交换机名称* 2,交换机类型,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 交换机和队列绑定* 参数明细 String queue,String exchange,String routingKey* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPIC_INFORM,INFORM_EMAIL_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPIC_INFORM,INFORM_EMAIL_SMS);/*** 发送消息方法* 参数:String exchange, String routingKey, BasicProperties props, byte[] body* exchange:Exchange的名称,如果没有指定,则使用Default Exchange* routingKey:消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息队列* props:消息包含的属性* body:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或者解除绑定默认的交换机* routingKey 等于队列名称**/String message = "郑继坤帅哥EMAIL "+ LocalDateTime.now();String msgSms = "郑继坤帅哥SMS "+ LocalDateTime.now();channel.basicPublish(EXCHANGE_TOPIC_INFORM,INFORM_EMAIL_SMS,null,message.getBytes());//channel.basicPublish(EXCHANGE_TOPIC_INFORM,INFORM_SMS,null,msgSms.getBytes());System.out.println("SEND MESSAGE IS:"+message);} catch (Exception e) {e.printStackTrace();} finally {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

消费者

package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_Topic_SMS {//队列名称private static final String QUEUE_INFORM_SMS = "queue_inform_sms";//交换机名称private static final String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";//路由keyprivate static final String INFORM_SMS = "inform.#.sms.#";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPIC_INFORM,INFORM_SMS);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}
package com.test.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02_Topic_Email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";//交换机名称private static final String EXCHANGE_TOPIC_INFORM = "exchange_topic_inform";//路由keyprivate static final String INFORM_EMAIL = "inform.#.email.#";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();/*** 声明交换机* param1 exchange:交换机名称* param2 type:交换机名称,fanout,topic,direct,headers*/channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);/*** 声明队列,如果队列在mq中没有则要创建* 参数: String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* 参数明细:* 1,queue 队列名称* 2,durable 是否持久化,如果持久化,mq重启后队列还在* 3,exclusive队列是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置true可用于临时队列的创建* 4,autoDelete 队列不再使用时是否自动删除此队列* 5,arguments 参数,可以设置一个队列的拓展参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);/*** 队列与交换机绑定* 参数;String queue,String exchange,String routingKey* 参数明细:* 1,queue:队列名称* 2,exchange:交换机名称* 3,routingKey:路由key*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPIC_INFORM,INFORM_EMAIL);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel){/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume() 去指定* @param envelope  消息包的内容,可从中消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body, "utf-8");System.out.println("accept MESSAGE IS:"+msg);super.handleDelivery(consumerTag, envelope, properties, body);}};/*** 监听队列* 监听队列 String queue,boolean autoAck,Consumer callback* 参数明细* param1,队列名称* param2,是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置*    为false 则需要手动回复* param3,消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);} catch (Exception e) {e.printStackTrace();} finally {}}
}

topic 模式更多更强大,他可以实现routing,publish/subscrirbe 模式的功能

消息中间件RabbitMQ相关推荐

  1. 面试必过之消息中间件RabbitMQ面试总结大全!

    本篇基于海量的学员大厂面试,面试中关于消息中间件RabbitMQ的问题专题整理,希望对即将入行的同学有所帮助,祝一切顺利! 1.使用RabbitMQ有什么好处? 抢购活动,削峰填谷,防止系统崩塌. 延 ...

  2. 分布式系统消息中间件——RabbitMQ的使用基础篇

    分布式系统消息中间件--RabbitMQ的使用基础篇 转载于:https://www.cnblogs.com/zhehan54/p/9679101.html

  3. 基于消息中间件RabbitMQ实现简单的RPC服务

    转载自  基于消息中间件RabbitMQ实现简单的RPC服务 RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议.对于两台机器而言,就是A服务器上的应用程序调用B ...

  4. 分布式日志sleuth+分布式追踪系统zipkin+消息中间件rabbitMQ+MySQL存储跟踪数据

    一.了解分布式架构下系统的监控问题 接口监控问题 监测性能瓶颈 解决方案:Sleuth 日志监控问题 日志分散 解决方案:ELK+Kafka 二.使用Sleuth实现大觅网微服务跟踪 1.打开一个分布 ...

  5. 分布式系统消息中间件-RabbitMQ介绍及其应用

    分布式系统消息中间件-RabbitMQ 一.消息中间件 1.1 中间件 1.1.1 什么是中间件? 中间件(Middleware)是处于操作系统和应用程序之间的软件.人们在使用中间件时,往往是一组中间 ...

  6. 消息中间件Rabbitmq核心概念讲解

    概述 Rabbitmq是消息中间件的一种落地开源实现,使用Erlang语言编写,基于AMQP消息协议. 核心概念 Message:消息是不具名的,由消息头和消息体组成,消息体是不透明的,也就是可以设置 ...

  7. 【MQ】MQ消息中间件RabbitMQ

    第一部分:RabbitMQ 一.MQ 概念 MQ,Message Queue,消息队列.本质是队列,遵循FIFO先进先出原则.只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于 ...

  8. springboot(十)SpringBoot消息中间件RabbitMQ

    github地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service 1. ...

  9. 又拍网架构 -- 前端PHP后台Python +消息中间件 RabbitMQ + 分库步骤

     又拍网架构 (http://www.bopor.com/?p=652), 很有价值. 又拍网的服务器端开发语言主要是PHP和Python,其中PHP用于编写Web逻辑(通过HTTP和用户直接打交道) ...

  10. 消息中间件---RabbitMQ

    rabbitmq和spring是同一个公司,支持性最好 1.什么是中间件 中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分.人们在使用中间件时, ...

最新文章

  1. Mysql进阶(4)——基于MHA的MySQL高可用架构
  2. java 1000个线程_关于Java多线程的一个问题
  3. php读取数组修改内容,php 数组如何修改值
  4. 【笔记】mybatis的sqlSession和Mapper详解
  5. Linq distinct去重方法之一
  6. c++获取数组长度_灵魂拷问:Java如何获取数组和字符串的长度?length还是length()?...
  7. CentOS安装cheat和tldr
  8. window+Apache 配置虚拟主机(2)
  9. C++ 连接数据库的入口和获取列数、数据
  10. 王超:奇虎360 MongoDB
  11. Mysql学习总结(79)——MySQL常用函数总结
  12. 03-17 APP自动遍历测试技术
  13. UML类图各符号含义
  14. 【LwM2M】LwM2M相关的开源项目
  15. android百度经纬度转gps坐标,GPS经纬度怎么转百度经纬度
  16. 线性分类器——Fisher线性判别
  17. win10在设备管理器里找不到蓝牙的三种解决办法
  18. oracle按照in的顺序进行排序
  19. C# 操作Word书签(一)插入、删除书签
  20. ksoftirqd内核线程-处理软中断

热门文章

  1. springboot房屋租赁合同报修系统java-ssm
  2. python统计字符串中大写英文、小写英文、汉字、数字等个数
  3. 网站安全测试-安全性缺陷
  4. 写SQL的21个好习惯
  5. 基于Java Swing的小游戏-连连看
  6. Unity3d用脚本实现Button图片的更改
  7. Linux内核同步原语之信号量(Semaphore)
  8. 边境的悍匪—机器学习实战:第十四章 使用卷积神经网络的深度计算机视觉
  9. ICC II setupfloorplan
  10. mysql用户名密码忘了怎么办_mysql用户密码忘了怎么办