RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 :
1、Work queues 工作队列
2、Publish/Subscribe 发布订阅
3、Routing 路由
4、Topics 通配符
5、Header Header 转发器
6、RPC 远程调用
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
交换机 :
参数:String exchange, String type
参数明细:
1、交换机的名称
2、交换机的类型
fanout:对应的rabbitmq的工作模式是 publish/subscribe
direct:对应的Routing 工作模式
topic:对应的Topics工作模式
headers: 对应的headers工作模式
发布订阅模式代码
生产者:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:10**/
public class Producer02_publish {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe 发布订阅模式* direct:对应的Routing 工作模式 路由* topic:对应的Topics工作模式 通配符* headers: 对应的headers工作模式 转发器*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<5;i++){//消息内容String message = "send inform message to user";channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
邮件消费者:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing 工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}
短信消费者:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_sms {//队列名称private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing 工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);}
}
RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码相关推荐
- php订阅系统,php redis pub/sub(Publish/Subscribe,发布/订阅的信息系统)之基本使用
一.场景介绍 最近的一个项目需要用到发布/订阅的信息系统,以做到最新实时消息的通知.经查找后发现了redis pub/sub(发布/订阅的信息系统)可以满足我的开发需求,而且学习成本和使用成本也比较低 ...
- RabbitMQ Tutorials 3 - Publish/Subscribe 发布/订阅
发布/订阅 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日 ...
- 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅
在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...
- Publish/Subscribe 发布与订阅模式
Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...
- RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)
在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...
- RabbitMQ工作模式
RabbitMQ总共有六种工作模式: 1.工作队列模式 work queues 2.发布/订阅模式 Publish/Subscribe 3.路由模式 Routing 4.通配符模式 Topics 5. ...
- 3.RabbitMQ工作模式介绍
3.RabbitMQ工作模式介绍.md 文章目录 3.RabbitMQ工作模式介绍.md 1.简单模式 1.1总结 2.Work Queues 工作队列模式 2.1 模式说明 2.2 代码编写 2.3 ...
- rabbitMq工作模式特性及整合springboot
因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq mq(message queue)消息队列 官网:www.rabbitmq.com 使用消息队列的优点:1. ...
- RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing
第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...
最新文章
- 语义分割常用loss介绍——及pytorch实现
- VMware Fusion:Windows程序和Mac无缝结合
- firefox浏览器 插件--【维基百科+谷歌翻译】高级应用之 带图翻译
- 阅读书源最新2020在线导入_书源篇三及6.5.0版本介绍
- 【数据结构与算法】大根堆和优先队列的Java实现
- Unity3d打开的时候,卡在loading界面白屏的解决方法
- 老板可以停,因为可能赚了钱
- (算法)从10000个数中找出最大的10个
- 踩着七彩祥云来接你的人不一定是意中人,也可能是阿里云
- 【原创】黑群晖向白群晖 DS920+ 迁移
- 对幅度谱和相位谱的理解
- c++11伪随机数生成库:random
- 数据分析基础篇---统计学基础
- Linux服务器卡顿如何排查
- 404究竟是什么意思呢?像404,200,503等数字究竟是什么东西
- 【愚公系列】2023年05月 网络安全高级班 067.WEB渗透与安全(Havij实战-SQL自动化注入)
- MAC maven安装配置
- 希尔伯特黄变换(Hilbert-Huang)原理、HHT求时频谱、边际谱,及MATLAB(2018rb)实现
- iframe、frame、frameset那些事
- power bi 度量值SUMX(FILTER和EARLIER结合