RabbitMq集成SpirngBoot
rabbitmq不会创建队列
rabbitmq是懒加载 必须配置监听者才能 创建队列
延迟队列
- 延迟队列配置
- 配置
- 队列配置
- 生产者
- 消费者
- 结果
- 延迟队列优化( 消息设置超时,队列不设置超时时间)
- 消息设置过期时间
- 延迟问题的解决
- 基于延迟交换机的延迟队列
- 配置
- 生产者和消费者
- 运行结果
- 配置
- 回调配置
延迟队列配置
配置
1.引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.配置yml
spring.rabbitmq.host=**.**.**.**你的主机地址
spring.rabbitmq.port=5672
队列配置
package com.cunk.blog.rabbitMq;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TtlQueueConfig {//普通交换机public static final String DEADLATER_EXCHANEG = "X_E" ;//死信交换机public static final String NORMOLL_EXCHANEG = "Y_E" ;//普通队列1public static final String NORMOLL_Q1 = "N_Q1" ;//普通队列2public static final String NORMOLL_Q2 = "N_Q2" ;//死信队列public static final String DEADLATER_Q = "T_Q" ;@Bean(NORMOLL_EXCHANEG)public DirectExchange xExchage(){return new DirectExchange(NORMOLL_EXCHANEG) ;}//死信交换机@Bean(DEADLATER_EXCHANEG)public DirectExchange yExchage(){return new DirectExchange(DEADLATER_EXCHANEG) ;}//声明普通队列TTL 为 10 s q1@Bean(NORMOLL_Q1)public Queue queueA(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键arguments.put("x-message-ttl", 10000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q1, true, false, false, arguments);}//声明普通队列TTL 为 10 s q2@Bean(NORMOLL_Q2)public Queue queueB(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键arguments.put("x-message-ttl", 20000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q2, true, false, false, arguments);}//死信队列@Bean(DEADLATER_Q)public Queue queueD(){return QueueBuilder.durable(DEADLATER_Q).build();}/====绑定======/////q1 q2 绑定@Beanpublic Binding bindingEmail(@Qualifier(NORMOLL_Q1) Queue queue,@Qualifier(NORMOLL_EXCHANEG) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("XA").noargs();}@Beanpublic Binding q2BingExchange(@Qualifier(NORMOLL_Q2) Queue q2, @Qualifier(NORMOLL_EXCHANEG) Exchange e1){return BindingBuilder.bind(q2).to(e1).with("XB").noargs();}@Beanpublic Binding qdBingExchange(@Qualifier(DEADLATER_Q) Queue qd, @Qualifier(DEADLATER_EXCHANEG) Exchange e1){return BindingBuilder.bind(qd).to(e1).with("YD").noargs();}}
生产者
@GetMapping("/sendMsg/{msg}")public void sendMsg(@PathVariable("msg") String msg){System.out.println("发送当前时间"+new Date().toString()+msg);//发送消息rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG,"XA","消息来自10sttl队列"+msg);rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG,"XB","消息来自20sttl队列"+msg);}
消费者
@GetMapping("/getMsg")@RabbitListener(queues ={DEADLATER_Q})public void getMsg(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());System.out.println("当前时间"+new Date().toString()+"收到的死信队列消息是"+msg);}
结果
延迟队列优化( 消息设置超时,队列不设置超时时间)
生成者根据需求对消息进行超时进入死信队列
//声明普通队列 , 不设置过期时间@Bean(NORMOLL_Q3)public Queue queueC(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键// arguments.put("x-message-ttl", 20000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q3, true, false, false, arguments);}
消息设置过期时间
rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG, "XC", "消息来自自定义消息时间的队列" + msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration({**过期时间**});return message ;}});
⚠ 如果过期时间是10s 的先进队列 ,过期时间5s 的后进队列 (5s过期的消息进入阻塞,先等10s的发完)
出队顺序是 10s的先出 5s 的反而后出 (队列有先进先出的规定!!)
延迟问题的解决
下载延迟插件以及安装步骤
链接: link
基于延迟交换机的延迟队列
配置
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;//基于交换机的延迟队列
@Configuration
public class DelatedQueueConfig {//交换机public static final String DEADLATER_AUTO_EXCHANEG = "D_E" ;//延迟队列public static final String DELAYED_EXCHANGE_QUEUE ="D_Q" ;//keypublic static final String DELAYED_ROUNTING_KEY = "DelayRountKey" ;@Bean(DEADLATER_AUTO_EXCHANEG)public CustomExchange daleyExchange(){HashMap<String, Object> arguments = new HashMap<>();//设置延迟类型为直接类型arguments.put("x-delayed-type","direct");return new CustomExchange(DEADLATER_AUTO_EXCHANEG,"x-delayed-message",true,false,arguments);}@Bean(DELAYED_EXCHANGE_QUEUE)public Queue queueB(){return new Queue(DELAYED_EXCHANGE_QUEUE, true, false, false);}@Beanpublic Binding bindingEmail(@Qualifier(DELAYED_EXCHANGE_QUEUE) Queue queue,@Qualifier(DEADLATER_AUTO_EXCHANEG) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUNTING_KEY).noargs();}
}
生产者和消费者
//交换机public static final String DEADLATER_AUTO_EXCHANEG = "D_E" ;//延迟队列public static final String DELAYED_EXCHANGE_QUEUE ="D_Q" ;//keypublic static final String DELAYED_ROUNTING_KEY = "DelayRountKey" ;//生产者@GetMapping("/sendExchangeMsg/{msg}/{ttl}")public void sendDelayExchange111(@PathVariable("msg") String msg,@PathVariable("ttl") Integer ttl){System.out.println("发送当前时间"+new Date().toString()+msg);rabbitTemplate.convertAndSend(DEADLATER_AUTO_EXCHANEG, DELAYED_ROUNTING_KEY, "消息来自自定义消息时间的队列" + msg,message -> {//设置过期时间message.getMessageProperties().setDelay(ttl);return message ;});}//消费者@GetMapping("/getMsg")@RabbitListener(queues ={DELAYED_EXCHANGE_QUEUE})public void getMsg111(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());System.out.println("当前时间"+new Date().toString()+"收到的基于插件的延迟消息消息是"+msg);}
运行结果
消息可靠投递
- 延迟队列配置
- 配置
- 队列配置
- 生产者
- 消费者
- 结果
- 延迟队列优化( 消息设置超时,队列不设置超时时间)
- 消息设置过期时间
- 延迟问题的解决
- 基于延迟交换机的延迟队列
- 配置
- 生产者和消费者
- 运行结果
- 配置
- 回调配置
配置
publisher-confirm-type: correlated# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】publisher-returns: true# 消息在没有被队列接收时是否强行退回template:mandatory: true# 消费者手动确认模式,关闭自动确认,否则会消息丢失listener:simple:acknowledge-mode: manual
回调配置
package com.cunk.blog.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;//设置接口回调
@Configuration
public class RabbitMqConfig {/* 这是 FastJson 的消息转换器@Beanpublic FastJsonHttpMessageConverter messageConverter() {// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式return new FastJsonHttpMessageConverter();}
*/@Beanpublic MessageConverter messageConverter() {// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//确认消息送到交换机(Exchange)回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause){System.out.println("\n确认消息送到交换机(Exchange)结果:");System.out.println("相关数据:" + correlationData);System.out.println("是否成功:" + ack);System.out.println("错误原因:" + cause);}});//消息没有被队列接受的时候会触发回调//确认消息送到队列(Queue)回调/* 这里的代码在2022.8.3 月不能使用了 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){@Overridepublic void returnedMessage(ReturnedMessage returnedMessage){System.out.println("\n确认消息送到队列(Queue)结果:");System.out.println("发生消息:" + returnedMessage.getMessage());System.out.println("回应码:" + returnedMessage.getReplyCode());System.out.println("回应信息:" + returnedMessage.getReplyText());System.out.println("交换机:" + returnedMessage.getExchange());System.out.println("路由键:" + returnedMessage.getRoutingKey());}});*///消息没有被队列接受的时候会触发回调//确认消息送到队列(Queue)回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){log.error("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode,replyText, exchange, routingKey);}});return rabbitTemplate;}
}
欣赏下雷神的代码~~
RabbitMq集成SpirngBoot相关推荐
- 介绍Spring Cloud Stream与RabbitMQ集成
一. 首先安装rabbitmq-management 这里用的是rabbitmq的docker镜像,我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装 sudo docker ...
- 【自撰】RabbitMQ集成SpringBoot框架
RabbitMQ集成SpringBoot框架 导入springboot依赖 <!-- rabbitmq依赖 --> <dependency><groupId>org ...
- 使用Spring Cloud Stream与RabbitMQ集成
在我以前的文章中,我写了两个系统之间非常简单的集成场景-一个生成一个工作单元,另一个处理该工作单元,以及Spring Integration如何使这种集成非常容易. 在这里,我将演示如何使用Sprin ...
- SpringBoot和RabbitMQ集成
SpringBoot和RabbitMQ的集成: 步骤 自动配置 123456789101112131415161718192021222324252627 @Bean public CachingCo ...
- Dockerfile 定制 Rabbitmq 集成延时队列的镜像
下载 延时队列插件 官方提供的一些插件 延时队列插件 下载适合的版本 构建 Dockerfile 新建一个文件夹 newDockerImage,放入插件文件并新建文件名为Dockerfile的文件 D ...
- Rabbitmq集成与使用
Springboot集成rabbitmq pom.xml 依赖 <dependency><groupId>org.springframework.boot</groupI ...
- SpringBoot RabbitMQ 集成 七 延迟队列
为什么80%的码农都做不了架构师?>>> 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟 ...
- springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列
1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...
- Spring Boot系列——7步集成RabbitMQ
RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步.削峰的目的. 今天这篇,我们来看看Spring Boot是如何集成RabbitMQ,发送消息和消费消息的.同时 ...
最新文章
- 7天搞定图神经网络,实战助力新冠疫情防控!
- CSS 样式书写规范
- 《Flex 3权威指南》——Adobe官方培训教材
- DPDK vhost-user之前后端通知机制场景分析(十)
- 位掩码(BitMask)
- bind-html自动换行,如何实现textarea placeholder自动换行?
- 95-136-043-源码-Operator-CoProcessOperator
- servelt笔记一
- SQL区分大小写——转载
- 93. php 命名空间(3)
- 移动数据通信网络工作原理(SGSNGGSN)
- 2048游戏最多能玩到多大的数字?最多能玩多少分?
- 2019第十届互联网牛耳人人盛典圆满成功,罗超频道入选年度专栏作者
- 自动化 夏令营 保研
- VS导入easyx图形库
- 计算机音乐算法冯,计算机辅助算法作曲方法研究与软件设计
- 用友U9 SOA引领企业IT架构全面升级
- 赵小楼《天道》《遥远的救世主》深度解析(116)论天国的女人
- ubuntu10.10+双显卡I卡N卡+bumbleb…
- iOS开发——网络请求案例汇总