3.RabbitMQ工作模式介绍
3.RabbitMQ工作模式介绍.md
文章目录
- 3.RabbitMQ工作模式介绍.md
- 1.简单模式
- 1.1总结
- 2.Work Queues 工作队列模式
- 2.1 模式说明
- 2.2 代码编写
- 2.3 总结
- 3.Pub/Sub 订阅模式
- 3.1 模式说明
- 3.2 使用场景
- 3.3 代码实现
- 3.4 总结:
- 4.Routing 路由模式
- 4.1 模式说明
- 4.2 代码编写
- 4.3 总结:
- 5. Topics 通配符模式
- 5.1 模式说明
- 6、工作模式总结
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、 Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:https://www.rabbitmq.com/getstarted.htm
1.简单模式
1.1案例实现
需求:使用简单模式完成消息传递
maven依赖如下
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency>
生产者如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接Connection connection = connectionFactory.newConnection();//创建通信“通道” 相当于TCP的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用//第四个参数:是否自动删除,false 代表连接停掉后不自动删除这个队列//其他额外参数,nullchannel.queueDeclare("helloWord",false,false,false,null);String message = "你好RabbitMQ";//第一个参数 exchange 交换机,暂时不用,进行发布订阅的时候才用//第二个参数:队列名称//第三个参数:额外设置属性//第四个参数:消息字节数组channel.basicPublish("","helloWord",null,message.getBytes());channel.close();connection.close();System.out.println("===发送成功===");}
}
消费者如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接Connection connection = connectionFactory.newConnection();//创建通信“通道” 相当于TCP的虚拟连接final Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用//第四个参数:是否自动删除,false 代表连接停掉后不自动删除这个队列//其他额外参数,nullchannel.queueDeclare("helloWord",false,false,false,null);//第一个参数:队列名//第二个参数:参数代表是否自动确认收到消息,false 代表手动确认消息,是MQ推荐做法//第三个参数:传入DefaultConsumer 的实现类。channel.basicConsume("helloWord",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String (body);System.out.println("消费者接收到的消息:"+message);long tagId = envelope.getDeliveryTag();//第二个参数:只确认签收当前消息,设置true 代表签收该消息者所有未签收的消息channel.basicAck(tagId,false);}});}
}
1.1总结
上面案例使用的是简单模式如下图
上图概念如下:
- P:生产者,也就是要发送消息的程序
- C:消费者,消息的接受者,会一直等待消息到来
- Queue:消息队列,图中红色部分;类似一个邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息。
2.Work Queues 工作队列模式
2.1 模式说明
- Work Queues:与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提升任务处理的速度。
2.2 代码编写
Work Queues与简单模式的代码几乎是一样的。可以复制,并多复制一个消费者进行多个消费者同时对消息消费的测试。
连接工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接return connectionFactory.newConnection();}
}
消息发生产类:
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {String queueStr = "sm";Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(queueStr,false,false,false,null);for(int i=0;i<100;i++){SMS sms = new SMS("乘客"+i,"15600000000","你的车票已经预定成功");String jsonSms = new Gson().toJson(sms);channel.basicPublish("",queueStr,null,jsonSms.getBytes());}System.out.println("====发送数据成功===");channel.close();connection.close();}// 短信封装类static class SMS{private String name;private String mobile;private String content;public SMS(String name,String mobile,String content){this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}
}
消息消费类:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SMSSender1 {public static void main(String[] args) throws IOException, TimeoutException {String queueStr = "sm";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(queueStr,false,false,false,null);//如果不写basicQos(1) 则MQ自动将所有请求平均发送给所有的消费者//basicQos(1) ,MQ 不在对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个。channel.basicConsume(queueStr,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1===短信发送成功:"+jsonSMS);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
2.3 总结
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work Queue 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
- 消费是注意channel.basicQos(1); 处理完一个取一个。
3.Pub/Sub 订阅模式
3.1 模式说明
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来 Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,**接收生产者发送的消息。另一方面,知道如何处理消息,**例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange绑定,或者没有 符合路由规则的队列,那么消息会丢失!
3.2 使用场景
- 发布订阅模式因为所有的消费者获得相同的消息,所以特别适合“数据提供与应用”
- 例如:天气推送、公众号订阅、微博/抖音/快手的关注等
3.3 代码实现
订阅模式和之前的Work Queues工作队列模式相比多了一个Exchange交换机的新的概念,之前生产者直接发送到队列,现在直接发送到交换机。消费者还是直接从队列中获取消息,但是需要消费者创建队列并且把队列和交换机绑定。
注意:代码中注释的地方,一般是之前没出现过的,或者使用新的参数了,Exchange是需要通过管理界面创建的类型为Fanout 。
生产者代码:
获取Connection工具类省略了。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;import java.util.Scanner;public class WeatherBureau {public static void main(String[] args) throws Exception{String exchangeStr = "exchange-weather";Connection connection = ConnectionUtil.getConnection();//从控制台输入发布内容String input = new Scanner(System.in).next();Channel channel = connection.createChannel();//第一个参数:交换机名字channel.basicPublish(exchangeStr,"",null,input.getBytes());channel.close();connection.close();}
}
消费者代码:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;public class BaiduConsumer {public static void main(String[] args) throws Exception {String exchangeStr = "exchange-weather";String baiduQueue = "baidu-queue";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(baiduQueue,false,false,false,null);//queueBing 用于将队列和交换机绑定//参数1:队列名,参数2:交换机名,参数三:路由key 在路由模式用的channel.queueBind(baiduQueue,exchangeStr,"");channel.basicQos(1);channel.basicConsume(baiduQueue,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String weatherStr = new String(body);System.out.println("SMSSender1===短信发送成功:"+weatherStr);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
3.4 总结:
发布订阅模式比WorkQueue 工作模式多了一个交换机的概念,并且生产者发布消息不是直接到队列Queue,而是发给交换机,消费者需要创建队列,在通过Bing把交换机和队列绑定。
4.Routing 路由模式
4.1 模式说明
- 队列于交换机绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由Key)
- 消费的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。
上图解释如下:
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
4.2 代码编写
注意:获取Connection工具类省略了,Exchange需要在管理界面创建且类型为Direct
生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;public class WeatherBureau {public static void main(String[] args) throws Exception{String exchangeStr = "exchange_weather_routing";Map<String,String> area = new HashMap();area.put("china.beijing.20221128","北京20221128号天气晴朗!");area.put("china.zhengzhou.20221128","郑州20221128号天气小雪!");area.put("us.NewYork.20221129","纽约20221129号天气晴朗!");area.put("us.Washington.20221129","华盛顿20221129号天气小雪!");Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()){Map.Entry<String, String> m = itr.next();//第一个参数:交换机名字,第二个参数:消息的Routing keychannel.basicPublish(exchangeStr,m.getKey(),null,m.getValue().getBytes());}channel.close();connection.close();}
}
消费者代码:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;public class BaiduConsumer {public static void main(String[] args) throws Exception {String exchangeStr = "exchange_weather_routing";String baiduQueue = "baidu-queue";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(baiduQueue,false,false,false,null);//queueBing 用于将队列和交换机绑定//参数1:队列名,参数2:交换机名,参数三:路由key 在路由模式用的channel.queueBind(baiduQueue,exchangeStr,"china.beijing.20221128");channel.queueBind(baiduQueue,exchangeStr,"china.zhengzhou.20221128");channel.queueBind(baiduQueue,exchangeStr,"us.NewYork.20221129");channel.basicQos(1);channel.basicConsume(baiduQueue,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String weatherStr = new String(body);System.out.println("SMSSender1===短信发送成功:"+weatherStr);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
注意:生产者发送了4个路由ke,但消费者只接受了3个,剩余的一个会退回生产者,因为没有队列存储。
4.3 总结:
Routing 模式要求队列在绑定交换机时要指定routing key ,消息转发到符合routing key的队列。不符合的不转发,此模式比较麻烦,使用比较少。
5. Topics 通配符模式
5.1 模式说明
- Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
- 通配符规则:#匹配一个或多个词,*匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item. 只能匹配 item.insert
代码就不写和路由模式是一样的,消费者代码不需要每一个绑定路由,只需要写一个通配符就可以。
6、工作模式总结
- 简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。 - 工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。 - 发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。 - 路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 - 通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消
息到交换机后,交换机会根据 routing key 将消息发送到对应的队列.
3.RabbitMQ工作模式介绍相关推荐
- 消息中间件的应用场景与 RabbitMQ的六种工作模式介绍
消息中间件的应用场景与 RabbitMQ的六种工作模式介绍 消息中间件应用场景 异步处理 应用解耦 流量削峰 RabbitMQ的六种工作模式 简单模式 工作模式 发布订阅模式 路由模式 主题模式 PR ...
- RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing
第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...
- LVS三种工作模式介绍对比和十种调度算法介绍
2019独角兽企业重金招聘Python工程师标准>>> 工作模式介绍: 1.Virtual server via NAT(VS-NAT) 优点:集群中的物理服务器可以使用任何支持TC ...
- rsync的介绍及参数详解,配置步骤,工作模式介绍
rsync的介绍及参数详解,配置步骤,工作模式介绍 rsync是类unix系统下的数据镜像备份工具.它是快速增量备份.全量备份工具. Sync可以远程同步,支持本地复制,或者与其他SSH.rsync主 ...
- SHT30使用的学习过程1SHT30工作模式介绍
SHT30使用的学习过程1SHT30工作模式介绍 代码篇请点击这里 本人是新人小白,欢迎各位大佬指正,本文介绍的是SHT30两种工作模式[手动比心 by zwx lvmm] 看到网上对于sht30的博 ...
- 时间同步装置(时钟系统)工作模式介绍
时间同步装置(时钟系统)工作模式介绍 时间同步装置(时钟系统)工作模式介绍 PCS7系统基于TIA构建方式,在整个系统下包含了AS 自动化系统,OS 服务器/客户端,单站和各类远程站点等多种组件.这些 ...
- RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.
RabbitMq的提供了六种模式分别是:简单模式,工作模式,发布\订阅模式,路由模式,通配符模式,RPC远程调用模式 下面将详细介绍常用的前五种模式,附上测试代码. 公共的代码---连接工具类: pu ...
- 学成在线--13.RabbitMQ工作模式
文章目录 一.Work queues 二.Publish/subscribe 1.工作模式 2.代码 1)生产者 2)消费者 3.测试 4.思考 三.Routing 1.工作模式 2.代码 1)生产者 ...
- RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码
RabbitMQ有以下几种工作模式 : 1.Work queues 工作队列 2.Publish/Subscribe 发布订阅 3.Routing 路由 4.Topics 通 ...
最新文章
- 一切尽显眼前:如何预防在虚拟化环境中丢失可视性和安全性
- (七)渐变 矩形渐变 放射渐变
- ajax传formdata类型的数据_JQuery.Ajax()的data参数类型
- java商城_商城系统常见开发语言及特点分享
- 如何在 SAP CRM WebClient UI 里在 Context node 上下文里访问其他 Context Node 的数据
- 定时任务重启后执行策略_C语言操作时间函数time.ctime,实现定时执行某个任务小例子...
- selenium 基本的键盘方法
- 隐式类型转换中显式申明的非必要性
- 九度OJ 题目1011:最大连续子序列
- C/C++端口复用SO_REUSEADDR(setsockopt参数)
- 清涟基因--顾大夫工作室相关网站
- python局域网大文件_利用Python+pyftpdlib实现在局域网中互传文件
- 2020年5G通信工程类项目一览,哪些企业成功抢滩?
- 通达信指标转python_通达信转python
- LOX-8 Grease Paste tufoil fluoramics
- 基于matlab的指纹识别程序
- GeoServer发布osm数据地图服务
- 阿里技术专家:技术人员如何快速成长,实现职场跃迁?14页ppt干货分享
- 【智能材料】用人工智能发现新型材料,金属玻璃可替代钢材
- 模拟LED屏幕文字滚动开发和悬浮