RabbitMQ有以下几种工作模式 :

1、Work queues  工作队列

2、Publish/Subscribe 发布订阅

3、Routing      路由

4、Topics        通配符

5、Header      Header 转发器

6、RPC     远程调用

进入浏览器,输入:http://localhost:15672

初始账号和密码:guest/guest

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收   到消息

测试:

1、使用入门程序,启动多个消费者。

2、生产者发送多个消息。结果:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

交换机 :
      参数:String exchange, String type
            参数明细:
                 1、交换机的名称
                 2、交换机的类型
               fanout:对应的rabbitmq的工作模式是 publish/subscribe
               direct:对应的Routing    工作模式
               topic:对应的Topics工作模式
               headers: 对应的headers工作模式

发布订阅模式代码

生产者:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:10**/
public class Producer02_publish {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe 发布订阅模式* direct:对应的Routing 工作模式  路由* topic:对应的Topics工作模式  通配符* headers: 对应的headers工作模式  转发器*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<5;i++){//消息内容String message = "send inform message to user";channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

邮件消费者:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

短信消费者:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_sms {//队列名称private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);}
}

RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码相关推荐

  1. php订阅系统,php redis pub/sub(Publish/Subscribe,发布/订阅的信息系统)之基本使用

    一.场景介绍 最近的一个项目需要用到发布/订阅的信息系统,以做到最新实时消息的通知.经查找后发现了redis pub/sub(发布/订阅的信息系统)可以满足我的开发需求,而且学习成本和使用成本也比较低 ...

  2. RabbitMQ Tutorials 3 - Publish/Subscribe 发布/订阅

    发布/订阅 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日 ...

  3. 译: 3. RabbitMQ Spring AMQP 之 Publish/Subscribe 发布和订阅

    在第一篇教程中,我们展示了如何使用start.spring.io来利用Spring Initializr创建一个具有RabbitMQ starter dependency的项目来创建spring-am ...

  4. Publish/Subscribe 发布与订阅模式

    Publish/Subscribe 发布与订阅: 通过交换机来实现,一个生产者可以让不同队列的消费者同时得到消息 生产者: package Fanout; import com.rabbitmq.cl ...

  5. RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)

    在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...

  6. RabbitMQ工作模式

    RabbitMQ总共有六种工作模式: 1.工作队列模式 work queues 2.发布/订阅模式 Publish/Subscribe 3.路由模式 Routing 4.通配符模式 Topics 5. ...

  7. 3.RabbitMQ工作模式介绍

    3.RabbitMQ工作模式介绍.md 文章目录 3.RabbitMQ工作模式介绍.md 1.简单模式 1.1总结 2.Work Queues 工作队列模式 2.1 模式说明 2.2 代码编写 2.3 ...

  8. rabbitMq工作模式特性及整合springboot

    因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq mq(message queue)消息队列 官网:www.rabbitmq.com 使用消息队列的优点:1. ...

  9. RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing

    第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...

最新文章

  1. 语义分割常用loss介绍——及pytorch实现
  2. VMware Fusion:Windows程序和Mac无缝结合
  3. firefox浏览器 插件--【维基百科+谷歌翻译】高级应用之 带图翻译
  4. 阅读书源最新2020在线导入_书源篇三及6.5.0版本介绍
  5. 【数据结构与算法】大根堆和优先队列的Java实现
  6. Unity3d打开的时候,卡在loading界面白屏的解决方法
  7. 老板可以停,因为可能赚了钱
  8. (算法)从10000个数中找出最大的10个
  9. 踩着七彩祥云来接你的人不一定是意中人,也可能是阿里云
  10. 【原创】黑群晖向白群晖 DS920+ 迁移
  11. 对幅度谱和相位谱的理解
  12. c++11伪随机数生成库:random
  13. 数据分析基础篇---统计学基础
  14. Linux服务器卡顿如何排查
  15. 404究竟是什么意思呢?像404,200,503等数字究竟是什么东西
  16. 【愚公系列】2023年05月 网络安全高级班 067.WEB渗透与安全(Havij实战-SQL自动化注入)
  17. MAC maven安装配置
  18. 希尔伯特黄变换(Hilbert-Huang)原理、HHT求时频谱、边际谱,及MATLAB(2018rb)实现
  19. iframe、frame、frameset那些事
  20. power bi 度量值SUMX(FILTER和EARLIER结合

热门文章

  1. 抓虫系列(三) 不要轻视web程序中常用的三个池 之数据库连接池
  2. gitlab用户,组,项目权限管控
  3. 获取winform 运行时debug路径
  4. 源码实现 -- strdel
  5. 基类显式继承接口,类继承基类时又继承同一接口,引发接口方法混乱(显式继承接口的弊端)...
  6. EF Code First建库 增删改查
  7. Microsoft Visual Studio 2005 怎么更改安装路径?
  8. linux 间隔时间中断测试
  9. SHELL编程Nginx源码多版本脚本
  10. 官方文档---ubuntu 安装OpenStack