rabbitmq不会创建队列
rabbitmq是懒加载 必须配置监听者才能 创建队列

延迟队列

  • 延迟队列配置
    • 配置
    • 队列配置
    • 生产者
    • 消费者
    • 结果
  • 延迟队列优化( 消息设置超时,队列不设置超时时间)
    • 消息设置过期时间
    • 延迟问题的解决
  • 基于延迟交换机的延迟队列
    • 配置
    • 生产者和消费者
    • 运行结果
    • 配置
    • 回调配置

延迟队列配置

配置

1.引入依赖

       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.配置yml

spring.rabbitmq.host=**.**.**.**你的主机地址
spring.rabbitmq.port=5672

队列配置

package com.cunk.blog.rabbitMq;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class TtlQueueConfig  {//普通交换机public static final String DEADLATER_EXCHANEG = "X_E" ;//死信交换机public static final String NORMOLL_EXCHANEG = "Y_E" ;//普通队列1public static final String NORMOLL_Q1 = "N_Q1" ;//普通队列2public static final String NORMOLL_Q2 = "N_Q2" ;//死信队列public static final String DEADLATER_Q = "T_Q" ;@Bean(NORMOLL_EXCHANEG)public DirectExchange xExchage(){return new DirectExchange(NORMOLL_EXCHANEG) ;}//死信交换机@Bean(DEADLATER_EXCHANEG)public DirectExchange yExchage(){return new DirectExchange(DEADLATER_EXCHANEG) ;}//声明普通队列TTL 为 10 s q1@Bean(NORMOLL_Q1)public Queue queueA(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键arguments.put("x-message-ttl", 10000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q1, true, false, false, arguments);}//声明普通队列TTL 为 10 s q2@Bean(NORMOLL_Q2)public Queue queueB(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键arguments.put("x-message-ttl", 20000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q2, true, false, false, arguments);}//死信队列@Bean(DEADLATER_Q)public Queue queueD(){return QueueBuilder.durable(DEADLATER_Q).build();}/====绑定======/////q1 q2 绑定@Beanpublic Binding bindingEmail(@Qualifier(NORMOLL_Q1) Queue queue,@Qualifier(NORMOLL_EXCHANEG) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("XA").noargs();}@Beanpublic Binding q2BingExchange(@Qualifier(NORMOLL_Q2) Queue q2, @Qualifier(NORMOLL_EXCHANEG) Exchange e1){return BindingBuilder.bind(q2).to(e1).with("XB").noargs();}@Beanpublic Binding qdBingExchange(@Qualifier(DEADLATER_Q) Queue qd, @Qualifier(DEADLATER_EXCHANEG) Exchange e1){return BindingBuilder.bind(qd).to(e1).with("YD").noargs();}}

生产者

   @GetMapping("/sendMsg/{msg}")public void sendMsg(@PathVariable("msg") String msg){System.out.println("发送当前时间"+new Date().toString()+msg);//发送消息rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG,"XA","消息来自10sttl队列"+msg);rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG,"XB","消息来自20sttl队列"+msg);}

消费者

    @GetMapping("/getMsg")@RabbitListener(queues ={DEADLATER_Q})public void  getMsg(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());System.out.println("当前时间"+new Date().toString()+"收到的死信队列消息是"+msg);}

结果

延迟队列优化( 消息设置超时,队列不设置超时时间)

生成者根据需求对消息进行超时进入死信队列

 //声明普通队列 , 不设置过期时间@Bean(NORMOLL_Q3)public Queue queueC(){HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", DEADLATER_EXCHANEG);// 死信路由arguments.put("x-dead-letter-routing-key", "YD");// 死信路由键//   arguments.put("x-message-ttl", 20000); // 消息过期时间 10sreturn new Queue(NORMOLL_Q3, true, false, false, arguments);}

消息设置过期时间

        rabbitTemplate.convertAndSend(NORMOLL_EXCHANEG, "XC", "消息来自自定义消息时间的队列" + msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration({**过期时间**});return message ;}});

⚠ 如果过期时间是10s 的先进队列 ,过期时间5s 的后进队列 (5s过期的消息进入阻塞,先等10s的发完)
出队顺序是 10s的先出 5s 的反而后出 (队列有先进先出的规定!!)

延迟问题的解决

下载延迟插件以及安装步骤
链接: link

基于延迟交换机的延迟队列

配置


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;//基于交换机的延迟队列
@Configuration
public class DelatedQueueConfig {//交换机public static final String DEADLATER_AUTO_EXCHANEG = "D_E" ;//延迟队列public static final String DELAYED_EXCHANGE_QUEUE ="D_Q" ;//keypublic static final String DELAYED_ROUNTING_KEY = "DelayRountKey" ;@Bean(DEADLATER_AUTO_EXCHANEG)public CustomExchange daleyExchange(){HashMap<String, Object> arguments = new HashMap<>();//设置延迟类型为直接类型arguments.put("x-delayed-type","direct");return new CustomExchange(DEADLATER_AUTO_EXCHANEG,"x-delayed-message",true,false,arguments);}@Bean(DELAYED_EXCHANGE_QUEUE)public Queue queueB(){return new Queue(DELAYED_EXCHANGE_QUEUE, true, false, false);}@Beanpublic Binding bindingEmail(@Qualifier(DELAYED_EXCHANGE_QUEUE) Queue queue,@Qualifier(DEADLATER_AUTO_EXCHANEG) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUNTING_KEY).noargs();}
}

生产者和消费者

   //交换机public static final String DEADLATER_AUTO_EXCHANEG = "D_E" ;//延迟队列public static final String DELAYED_EXCHANGE_QUEUE ="D_Q" ;//keypublic static final String DELAYED_ROUNTING_KEY = "DelayRountKey" ;//生产者@GetMapping("/sendExchangeMsg/{msg}/{ttl}")public void sendDelayExchange111(@PathVariable("msg") String msg,@PathVariable("ttl") Integer ttl){System.out.println("发送当前时间"+new Date().toString()+msg);rabbitTemplate.convertAndSend(DEADLATER_AUTO_EXCHANEG, DELAYED_ROUNTING_KEY, "消息来自自定义消息时间的队列" + msg,message -> {//设置过期时间message.getMessageProperties().setDelay(ttl);return message ;});}//消费者@GetMapping("/getMsg")@RabbitListener(queues ={DELAYED_EXCHANGE_QUEUE})public void  getMsg111(Message message, Channel channel)throws Exception{String msg = new String(message.getBody());System.out.println("当前时间"+new Date().toString()+"收到的基于插件的延迟消息消息是"+msg);}

运行结果

消息可靠投递

  • 延迟队列配置
    • 配置
    • 队列配置
    • 生产者
    • 消费者
    • 结果
  • 延迟队列优化( 消息设置超时,队列不设置超时时间)
    • 消息设置过期时间
    • 延迟问题的解决
  • 基于延迟交换机的延迟队列
    • 配置
    • 生产者和消费者
    • 运行结果
    • 配置
    • 回调配置

配置

    publisher-confirm-type: correlated# 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】publisher-returns: true# 消息在没有被队列接收时是否强行退回template:mandatory: true# 消费者手动确认模式,关闭自动确认,否则会消息丢失listener:simple:acknowledge-mode: manual

回调配置

package com.cunk.blog.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;//设置接口回调
@Configuration
public class RabbitMqConfig {/*  这是 FastJson 的消息转换器@Beanpublic FastJsonHttpMessageConverter  messageConverter() {// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式return new FastJsonHttpMessageConverter();}
*/@Beanpublic MessageConverter messageConverter() {// 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//确认消息送到交换机(Exchange)回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause){System.out.println("\n确认消息送到交换机(Exchange)结果:");System.out.println("相关数据:" + correlationData);System.out.println("是否成功:" + ack);System.out.println("错误原因:" + cause);}});//消息没有被队列接受的时候会触发回调//确认消息送到队列(Queue)回调/* 这里的代码在2022.8.3 月不能使用了      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){@Overridepublic void returnedMessage(ReturnedMessage returnedMessage){System.out.println("\n确认消息送到队列(Queue)结果:");System.out.println("发生消息:" + returnedMessage.getMessage());System.out.println("回应码:" + returnedMessage.getReplyCode());System.out.println("回应信息:" + returnedMessage.getReplyText());System.out.println("交换机:" + returnedMessage.getExchange());System.out.println("路由键:" + returnedMessage.getRoutingKey());}});*///消息没有被队列接受的时候会触发回调//确认消息送到队列(Queue)回调rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){log.error("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode,replyText, exchange, routingKey);}});return rabbitTemplate;}
}

欣赏下雷神的代码~~


RabbitMq集成SpirngBoot相关推荐

  1. 介绍Spring Cloud Stream与RabbitMQ集成

    一. 首先安装rabbitmq-management 这里用的是rabbitmq的docker镜像,我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装 sudo docker ...

  2. 【自撰】RabbitMQ集成SpringBoot框架

    RabbitMQ集成SpringBoot框架 导入springboot依赖 <!-- rabbitmq依赖 --> <dependency><groupId>org ...

  3. 使用Spring Cloud Stream与RabbitMQ集成

    在我以前的文章中,我写了两个系统之间非常简单的集成场景-一个生成一个工作单元,另一个处理该工作单元,以及Spring Integration如何使这种集成非常容易. 在这里,我将演示如何使用Sprin ...

  4. SpringBoot和RabbitMQ集成

    SpringBoot和RabbitMQ的集成: 步骤 自动配置 123456789101112131415161718192021222324252627 @Bean public CachingCo ...

  5. Dockerfile 定制 Rabbitmq 集成延时队列的镜像

    下载 延时队列插件 官方提供的一些插件 延时队列插件 下载适合的版本 构建 Dockerfile 新建一个文件夹 newDockerImage,放入插件文件并新建文件名为Dockerfile的文件 D ...

  6. Rabbitmq集成与使用

    Springboot集成rabbitmq pom.xml 依赖 <dependency><groupId>org.springframework.boot</groupI ...

  7. SpringBoot RabbitMQ 集成 七 延迟队列

    为什么80%的码农都做不了架构师?>>>    何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟 ...

  8. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  9. Spring Boot系列——7步集成RabbitMQ

    RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步.削峰的目的. 今天这篇,我们来看看Spring Boot是如何集成RabbitMQ,发送消息和消费消息的.同时 ...

最新文章

  1. 7天搞定图神经网络,实战助力新冠疫情防控!
  2. CSS 样式书写规范
  3. 《Flex 3权威指南》——Adobe官方培训教材
  4. DPDK vhost-user之前后端通知机制场景分析(十)
  5. 位掩码(BitMask)
  6. bind-html自动换行,如何实现textarea placeholder自动换行?
  7. 95-136-043-源码-Operator-CoProcessOperator
  8. servelt笔记一
  9. SQL区分大小写——转载
  10. 93. php 命名空间(3)
  11. 移动数据通信网络工作原理(SGSNGGSN)
  12. 2048游戏最多能玩到多大的数字?最多能玩多少分?
  13. 2019第十届互联网牛耳人人盛典圆满成功,罗超频道入选年度专栏作者
  14. 自动化 夏令营 保研
  15. VS导入easyx图形库
  16. 计算机音乐算法冯,计算机辅助算法作曲方法研究与软件设计
  17. 用友U9 SOA引领企业IT架构全面升级
  18. 赵小楼《天道》《遥远的救世主》深度解析(116)论天国的女人
  19. ubuntu10.10+双显卡I卡N卡+bumbleb…
  20. iOS开发——网络请求案例汇总

热门文章

  1. 【微信小程序】--注册小程序账号安装开发者工具(一)
  2. 怎样用python中matplotlib模块直观的将股票数据展现出来
  3. 破解Kotlin协程创建调用的那些事
  4. 什么才是状态机?什么又是状态?
  5. 手机软件的测试主要有哪些方面的测试,性能测试用什么去测试好
  6. win7系统如何关闭广告弹窗操作方法教学
  7. win10插拔U盘、鼠标、键盘等外设的一瞬间,屏幕会闪黑一下
  8. ps批量处理图片大小
  9. CAJ与PDF与WORD转化方法
  10. ImageNet Large Scale Visual Recognition Competition (ILSVRC)-ImageNet数据集标签名称中英文对照