RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现
本文主要分享RabbitMQ exchange类型的功能和使用、RabbitMQ延时队列、一个springboot服务发送消息到多虚拟主机
1.RabbitMQ exchange
exchange交换机,负责分发消息,为解决消息不同的业务场景,也提供了不同的交换机类型。
- 基于springboot2.5.6
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- yaml配置
spring:rabbitmq:addresses: 192.168.0.221:5672username: guestpassword: guestvirtual-host: /listener:simple:#手动确认 当有自动确认机制 又手动ACK会报406错误acknowledge-mode: manual
2.DirectExchange
默认交换机,一对一,默认下根据队列名下发,有routerKey时根据key下发
2.1 java中实现
- 创建队列与绑定
@Bean
public boolean createDirectQueue(@Autowired ConnectionFactory connectionFactory) {String exchange = "direct_exchange";String queue = "direct_queue";RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.declareExchange(new DirectExchange(exchange));/*** Queue可传递参数说明* @params1 :队列名称* @params2 :队列是否持久化(如果是,则重启服务不会丢失)* @params3 :是否是独占队列(如果是,则仅限于此连接)* @params4 :是否自动删除(最后一条消息消费完毕,队列是否自动删除)*/rabbitAdmin.declareQueue(new Queue(queue));rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,exchange, "", null));return true;
}@GetMapping("/send")
public String send() {JSONObject object = new JSONObject();object.put("hello", "22222");//如有设置routingKey则需要指定keyrabbitTemplate.convertAndSend("direct_exchange", "", object.toJSONString());return "success";
}
- 消费者
@RabbitHandler
@RabbitListener(queues = {"direct_queue"})
public void onMessage(Message message, Channel channel) throws Exception {String msgBodyString = new String(message.getBody());JSONObject json = JSONObject.parseObject(msgBodyString);log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());//进行消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2.2 测试日志
调用接口发送消息到RabbitMQ,查看控制台日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},direct_queue
消费确认后就会被删除,只能消费一次
3.FanoutExchange
广播消息,所有队列都发,所有队列消费完毕后删除,如无消费者会保存在队列(开启持久化)等待一个消费者消费后删除
3.1 java中实现
- 创建队列与绑定、发送消息
@Bean
public boolean createFanoutQueue(@Autowired ConnectionFactory connectionFactory) {String exchange = "fanout_exchange";String queue = "fanout_queue";RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.declareExchange(new FanoutExchange(exchange));//该交换机为广播消息创建两个不同的队列for (int i = 0; i < 2; i++) {rabbitAdmin.declareQueue(new Queue(queue + i));rabbitAdmin.declareBinding(new Binding(queue + i, Binding.DestinationType.QUEUE,exchange, "", null));}return true;
}@GetMapping("/send")
public String send() {JSONObject object = new JSONObject();object.put("hello", "22222");//如有设置routingKey则需要指定key//fanoutrabbitTemplate.convertAndSend("fanout_exchange", "", object.toJSONString());return "success";
}
- 消费者
@RabbitHandler
@RabbitListener(queues = {"fanout_queue0", "fanout_queue1"})
public void fonoutOnMessage(Message message, Channel channel) throws Exception {String msgBodyString = new String(message.getBody());JSONObject json = JSONObject.parseObject(msgBodyString);log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());//进行消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
3.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},fanout_queue1
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},fanout_queue0
一条消息会广播到两个队列
4.TopicExchange
通配符模糊匹配routerKey,满足条件就发送
4.1 java中实现
- 创建队列与绑定、发送
@Bean
public boolean createTopicQueue(@Autowired ConnectionFactory connectionFactory) {String exchange = "topic_exchange";String queue = "topic_queue";RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.declareExchange(new TopicExchange(exchange));//匹配前缀的队列rabbitAdmin.declareQueue(new Queue(queue + "_before"));rabbitAdmin.declareBinding(new Binding(queue + "_before", Binding.DestinationType.QUEUE,exchange, "*." + queue, null));//匹配后缀的队列rabbitAdmin.declareQueue(new Queue(queue + "_after"));rabbitAdmin.declareBinding(new Binding(queue + "_after", Binding.DestinationType.QUEUE,exchange, queue + ".*", null));return true;
}//topic 发送代码 同一个exchange 不同的routingKey
rabbitTemplate.convertAndSend("topic_exchange", "video.topic_queue", object.toJSONString());
rabbitTemplate.convertAndSend("topic_exchange", "topic_queue.music", object.toJSONString());
4.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},topic_queue_before
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},topic_queue_after
经测试,均能正常消费
5.HeadersExchange
可以配置多个key-value形式的密钥,生产者在头部配置任意一个key-value即可发送到对应的队列
5.1 java中实现
- 创建队列与绑定、发送
@Bean
public boolean createHeaderQueue(@Autowired ConnectionFactory connectionFactory) {String exchange = "header_exchange";String queue = "header_queue";RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.declareExchange(new HeadersExchange(exchange));//创建两组密钥Map<String, Object> map = new HashMap<>();map.put("header_key1", "12345");map.put("header_key2", "123456");rabbitAdmin.declareQueue(new Queue(queue));rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(queue)).to(new HeadersExchange(exchange))//任意匹配一对 whereAny还提供了单个key 多个key匹配的方法.whereAny(map).match());return true;
}//发送 header 头部的key-value必须是完全匹配的
object.put("hello", "header_key1");
rabbitTemplate.convertAndSend("header_exchange", null,MessageBuilder.withBody(object.toJSONString().getBytes()).setHeader("header_key1", "12345").build());
object.put("hello", "header_key2");
rabbitTemplate.convertAndSend("header_exchange", null,MessageBuilder.withBody(object.toJSONString().getBytes()).setHeader("header_key2", "123456").build());
5.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"header_key1"},header_queue
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"header_key2"},header_queue
能正常消费到发送的内容
6.RabbitMQ延时队列
6.1 延时队列原理说明
RabbitMQ提供消息过期队列,一条消息在指定时候后没有被消费掉则会被定义为过期,过期的消息可以通过配置转到其他的交换机去,如不配置则直接抛掉。
那么开发实现过程为:
新建
delayed_exchange_ttl
和delayed_queue_ttl
队列,通过配置设置消息存活时间和过期以后存放的死信队列。发送需要延迟的消息到该交换机,该交换机下的队列无消费者。成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)新建
delayed_exchange
和delayed_queue
队列,存放已过期消息的死信队列,延时消息的消费者监听该队列
6.2 Java中实现
- 创建与绑定、发送
@Bean
public boolean createDelayedQueue(@Autowired ConnectionFactory connectionFactory) {String exchange = "delayed_exchange";String queue = "delayed_queue";RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);//创建一个有定时过期时间的队列rabbitAdmin.declareExchange(new DirectExchange(exchange + "_ttl"));//队列的配置Map<String, Object> map = new HashMap<>();//过期时间map.put("x-message-ttl", 30000);//过期消息转达的死信交换机map.put("x-dead-letter-exchange", "delayed_exchange");//死信队列的routingKeymap.put("x-dead-letter-routing-key", "delayed_exchange_key");rabbitAdmin.declareQueue(new Queue(queue + "_ttl", true, false, true, map));rabbitAdmin.declareBinding(new Binding(queue + "_ttl", Binding.DestinationType.QUEUE,exchange + "_ttl", "", null));//创建真正的消费者队列rabbitAdmin.declareExchange(new DirectExchange(exchange));rabbitAdmin.declareQueue(new Queue(queue));rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,exchange, "delayed_exchange_key", null));return true;
}//发送代码
log.info("发送消息:{}",new Date().getTime());rabbitTemplate.convertAndSend("delayed_exchange_ttl", null, object.toJSONString());
- 最终队列的图如下,ttl队列中的消息达到指定时间后会转存到
delayed_queue
队列去
- 消费者代码
@RabbitHandler
@RabbitListener(queues = {"delayed_queue"})
public void onMessage(Message message, Channel channel) throws Exception {String msgBodyString = new String(message.getBody());JSONObject json = JSONObject.parseObject(msgBodyString);log.info("消费消息:{},{},{}", json,message.getMessageProperties().getConsumerQueue(),new Date().getTime());//进行消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
6.3 测试日志
c.e.rabbitmq.controller.Controller : 发送消息:1655967316743
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"1234567"},delayed_queue,1655967344564
总体来说是实现了延时功能,但是延时的时间有误差,只适合不是很重要的场景使用
需要注意的一个点:
队列一经创建,不可再次修改。
至于延时时间动态的,可以先注解@Autowired ConnectionFactory connectionFactory
,然后再通过生产者声明创建指定时间的队列。
7.多虚拟主机实现
使用场景:业务服务需要发送消息到用户和订单服务,但是这两个服务又是不一样的虚拟主机下。这种情况就只能通过代码进行连接了。
7.1 生产者
配置我就省略了,反正不用自动装配就行
@Bean("orderPushRabbit")
public RabbitTemplate orderPushRabbit() {//Mq连接信息CachingConnectionFactory rabbitFactory = new CachingConnectionFactory();rabbitFactory.setAddresses(amqpAddress);rabbitFactory.setChannelCacheSize(Runtime.getRuntime().availableProcessors() * 2);rabbitFactory.setUsername(amqpUserName);rabbitFactory.setPassword(amqpPassword);rabbitFactory.setVirtualHost(vhost);// 初始化RabbitTemplateRabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitFactory);MessageConverter serializerMessageConverter = new SerializerMessageConverter();rabbitTemplate.setMessageConverter(serializerMessageConverter);//设置exchange信息和routingKeyrabbitTemplate.setExchange(exchange);rabbitTemplate.setRoutingKey(routingKey); return rabbitTemplate;
}
如果有多个虚拟主机的复制上述配置即可,需要注意配置的不同即可。
生产者代码中使用
@Autowired
@Qualifier("orderPushRabbit")
RabbitTemplate PushRabbit;@Autowired
@Qualifier("userPushRabbit")
RabbitTemplate userPushRabbit;
7.2 消费者
根据虚拟主机声明不同的连接工厂
@Bean(name = "orderConnectionFactory")
public ConnectionFactory orderConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(amqpAddress);connectionFactory.setUsername(amqpUserName);connectionFactory.setPassword(amqpPassword);connectionFactory.setVirtualHost(vhost); // /userreturn connectionFactory;
}@Bean("orderListenerContainer")
@Primary
public RabbitListenerContainerFactory orderListenerContainer(@Qualifier("orderConnectionFactory") ConnectionFactory orderConnectionFactory) {SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();//设置对应虚拟主机的连接工厂container.setConnectionFactory(orderConnectionFactory);container.setMaxConcurrentConsumers(1);container.setConcurrentConsumers(1);container.setPrefetchCount(prefetch);//手动确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setMessageConverter(new SimpleMessageConverter());return container;
}
以此类推,如果有多个虚拟主机,则有多份上述的配置。
@Primary
注解标识在没有声明监听工厂时默认使用的
消费者代码
//containerFactory 指定队列的监听工厂
@RabbitListener(queues = {"order_exchange"}, containerFactory = "orderListenerContainer")
public void onMessage(Message message, Channel channel) throws Exception {String msgBodyString = new String(message.getBody());JSONObject json = JSONObject.parseObject(msgBodyString);log.info("消费消息:{}", json);//进行消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
8.总结
上面有说明死信的条件,如没有死信队列,可能会导致死循环。
消息确认不通过或者异常会重新进入到队列头部,接下来消费者又会消费此条消息由此造成死循环。
因为消费者最好遵循以下几点:
1.捕获消息处理过程中的异常,一定要自己手动确认还是不确认(由其他消费者消费)
2.重试次数记录,如果规范消息协议应该是有次数字段的,消费者根据重试次数来做异常消息日志等操作
3.根据配置自动丢到死信队列,前提是创建队列时需要指定死信队列和key
listener:simple:concurrency: 5prefetch: 10retry:enabled: true # 允许消息消费失败的重试max-attempts: 3 # 消息最多消费次数3次
以上就是本章的全部内容了。
上一篇:RabbitMQ第一话 – docker安装RabbitMQ以及Springboot集成RabbitMQ
下一篇:RabbitMQ第三话 – RabbitMQ高可用集群搭建
贵有恒何必三更眠五更起,最无益只怕一日曝十日寒
RabbitMQ第二话 -- Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现相关推荐
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct, ...
- RabbitMQ 四种Exchange
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchang ...
- SpringBoot的四种异步处理,写这篇文章,我自己先学到了
最近更新了一系列关于异步和回调的文章,比如<一篇文章,搞明白异步和多线程的区别>.<两个经典例子让你彻底理解java回调机制>.<异步请求和异步调用有区别?>,大家 ...
- ML之DTRFRExtraTRGBR:基于四种算法(DT、RFR、ExtraTR、GBR)对Boston(波士顿房价)数据集(506,13+1)进行价格回归预测并对比各自性能
ML之DT&RFR&ExtraTR&GBR:基于四种算法(DT.RFR.ExtraTR.GBR)对Boston(波士顿房价)数据集(506,13+1)进行价格回归预测并对比各自 ...
- SpringBoot:四种读取properties文件的方式
前言 在项目开发中经常会用到配置文件,配置文件的存在解决了很大一份重复的工作.今天就分享四种在Springboot中获取配置文件的方式. 注:前三种测试配置文件为springboot默认的applic ...
- headers java_RabbitMQ四种Exchange类型之Headers(Java)
版权声明:本文为博主原创文章,如果转载请给出原文链接:http://doofuu.com/article/4156157.html Headers 类型的Exchanges是不处理路由键的,而是根据发 ...
- RabbitMQ下的生产消费者模式与订阅发布模式
所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 假 ...
- rabbitMq第四种模型--direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费. 但是,在某些场景下,我们希望不同的消息被不同的队列消费. 这时就要用到Direct类型的Exchange. 在Direct模型下:队列与交换 ...
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
一.Direct Exchange 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue. 1.一般情况可以使用rabbitMQ自带的Exchange:&quo ...
最新文章
- “智源 — INSPEC 工业大数据质量预测赛” 上线,为硬核工业制造炼就 AI 之心...
- python编程小游戏-python编程游戏有哪些
- update 两个表关联_你真的了解全量表,增量表及拉链表吗?
- 【转】Ubuntu VI基本用法
- DVWA Brute Force(low)
- jemeter python接口自动化测试平台_ant+jmeter+Jenkins接口自动化测试实战(1)
- ISP运营商实验室测试机架拓扑搭建经验分享
- 米莱迪机器人加物理攻击_王者荣耀:“不死流”白起崛起,秒回8500血完克米莱迪...
- jquery背景动画插件使用
- mysql5.5默认引擎_InnoDB 作为默认存储引擎(从mysql-5.5.5开始)
- 贴吧粉丝怎么全部移除_亚马逊FBA怎么发货?怎么把货发到FBA仓库?
- 32 位和 64 位版本的 Office 2010 之间的兼容性
- 软件工程 实践者的研究方法 第三章答案
- 关于SCI论文发表的五不准原则是
- Linux基础命令之tar解压缩详解
- CUDA C 编程权威指南 Grossman 第4章 全局内存
- html中鱼眼效果,鱼眼镜头使用入门指南:鱼眼镜头应该怎么用(附后期鱼眼效果)...
- linux运维实验,自制Linux系统实验
- 使用Scintilla编写语法高亮文本编辑器
- 2020年终总结,可能是我人生收获最多的一年