1. yml配置

    spring:rabbitmq:username: adminpassword: adminhost: localhostport: 5672virtual-host: /publisher-confirm: true #发布确认 开启confirms回调 Producer -> Exchangepublisher-returns: true #发布返回 开启returnedMessage回调 Exchange -> Queuelistener:type: simplesimple:acknowledge-mode: manual #消费端收到消息后的确认方式 manual手动确认  none自动确认prefetch: 1 #消费者预取1条数据到内存default-requeue-rejected: false  #决定被拒绝的消息是否重新入队。默认值为true,需要手动basicNack时这些参数谅失效了retry:enabled: true  #开启消费者 程序异常情况下会进行重试max-attempts: 3 #重试次数initial-interval: 2000 #消费者重试间隔次数 2s
    
  2. RabbitConfig

    @Autowired
    private CachingConnectionFactory connectionFactory;/*** rabbitTemplate* @return*/
    @Bean
    public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(converter());//消息是否成功发送到ExchangerabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息成功发送至Exchange");} else {log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);}});//触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调rabbitTemplate.setMandatory(true);//失败回调 消息是否从Exchange路由到Queue (只有消息从Exchange路由到Queue失败才会回调这个方法)rabbitTemplate.setReturnsCallback(returned ->log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), returned.getMessage()));return rabbitTemplate;
    }@Bean
    public Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();
    }/*** 普通队列交换机*/
    @Bean
    public FanoutExchange testFanoutExchange() {return new FanoutExchange(RabbitConstants.TEST_FANOUT_EXCHANGE, true, false);
    }
    /*** 普通队列* 基于消息事务的处理方式,当失败消息进行重试,有时间间隔,当达到超时时,就发送到死信队列,等待人工处理* @return*/
    @Bean
    public Queue testFanoutQueue() {//超时2s就放入死信队列return QueueBuilder.durable(RabbitConstants.TEST_FANOUT_QUEUE_A).deadLetterExchange(RabbitConstants.TEST_DDL_EXCHANGE).ttl(2000).build();
    }/*** 普通队列绑定到交换机* @return*/
    @Bean
    public Binding testFanoutBinding() {return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange());
    }
    /*** 死信交换机* @return*/
    @Bean
    public FanoutExchange testFanoutDdlExchange() {return new FanoutExchange(RabbitConstants.TEST_DDL_EXCHANGE, true, false);
    }
    /*** 死信队列* @return*/
    @Bean
    public Queue testFanoutDdlQueue() {return new Queue(RabbitConstants.TEST_DDL_QUEUE_A, true, false, false);
    }
    /*** 死信队列绑定至交换机* @return*/
    @Bean
    public Binding testFanoutDdlBinding() {return BindingBuilder.bind(testFanoutDdlQueue()).to(testFanoutDdlExchange());
    }
    
  3. 消息实体

    @Data
    @EqualsAndHashCode(callSuper = false)
    @Accessors(chain = true)
    @ApiModel(value="MsgLog对象", description="消息投递日志")
    public class MsgLog implements Serializable {private static final long serialVersionUID = 1L;@ApiModelProperty(value = "消息唯一标识")private String msgId;@ApiModelProperty(value = "消息体, json格式化")private String msg;@ApiModelProperty(value = "交换机")private String exchange;@ApiModelProperty(value = "路由键")private String routingKey;@ApiModelProperty(value = "状态: 0投递中 1投递成功 2投递失败 3已消费")private Integer status;@ApiModelProperty(value = "重试次数")private Integer tryCount;@ApiModelProperty(value = "下一次重试时间")private Date nextTryTime;@ApiModelProperty(value = "创建时间")private Date createTime;@ApiModelProperty(value = "更新时间")private Date updateTime;}
    
  4. producer

    /*** @author : lixuan* @date : 2021/04/07/10:55* @description: 生产者*/
    public interface ProducerService {/*** 发送消息** @param msg          某些你需要的参数* @param exchangeName 队列名称* @param routingKey   路由key* @return*/void sendMsg(Msg msg, String exchangeName, String routingKey);
    }/*** @author : lixuan* @date : 2021/04/07/10:57* @description: 消息生产者*/
    @Service
    public class ProducerServiceImpl implements ProducerService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMsg(Msg msg, String exchangeName, String routingKey) {if (StringUtils.isBlank(msg.getMsgId())) {String msgId = UUID.randomUUID().toString().replaceAll("-", "");msg.setMsgId(msgId);}CorrelationData correlationData = new CorrelationData(msg.getMsgId());//发送消息到rabbitMQrabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData);}}
    
  5. consumer

    /*** @author : lixuan* @date : 2021/04/07/11:23* @description: 消息接收者*/
    @Component
    @Slf4j
    public class MessageReceiver {@Autowiredprivate MsgService msgService;@Autowiredprivate MsgLogService msgLogService;@RabbitListener(queues = RabbitConstants.TEST_FANOUT_QUEUE_A)public void testMsgReceiver(Message message, Channel channel) throws IOException, InterruptedException {long deliveryTag = message.getMessageProperties().getDeliveryTag();Msg msg = JSON.parseObject(message.getBody(), Msg.class);try {int a = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {Map<String, Object> headers = message.getMessageProperties().getHeaders();//重试次数Integer retryCount;String mapKey = "retry-count";if (!headers.containsKey(mapKey)) {retryCount = 0;} else {retryCount = (Integer) headers.get(mapKey);}if (retryCount++ < RETRY) {log.info("已经重试 " + retryCount + " 次");headers.put("retry-count", retryCount);//当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。//这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行//而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。//因此我们采取的解决方案是,将消息进行应答。//这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案//1.应答channel.basicAck(deliveryTag, false);//2.重新发送到MQ中AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("application/json").headers(headers).build();channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), basicProperties,message.getBody());} else {log.info("现在重试次数为:" + retryCount);/*** 重要的操作 存盘* 手动ack* channel.basicAck(deliveryTag,false);* 通知人工处理* log.error("重试三次异常,快来人工处理");*///消息存盘MsgLog msgLog = new MsgLog();msgLog.setMsgId(msg.getMsgId());msgLog.setMsg(new String(message.getBody(),"utf-8"));msgLog.setExchange(message.getMessageProperties().getReceivedExchange());msgLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());msgLog.setTryCount(retryCount);msgLog.setStatus(MsgLogStatusEnum.FAIL.getStatus());msgLogService.save(msgLog);/*** 不重要的操作放入 死信队列* 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预*///休眠2s 延迟写入队列,触发转入死信队列//Thread.sleep(2000);//channel.basicNack(deliveryTag, false, true);}}}@RabbitListener(queues = RabbitConstants.TEST_DDL_QUEUE_A)public void deadTestReceiver(Message message, Channel channel) throws IOException {log.info("消息将放入死信队列", new String(message.getBody(), "UTF-8"));String str = new String(message.getBody());//转换为消息实体Msg msg = JSON.parseObject(str, Msg.class);log.info("收到的消息为{}", msg);}
    }
    
  6. 写个controller 自己测试一下。

    /*** @author : lixuan* @date : 2021/04/07/13:38* @description: TestController*/
    @RestController
    @RequestMapping("/test")
    public class TestController {@Autowiredprivate ProducerService producerService;@RequestMapping("/ack")public void testSendMsg(@RequestParam String content, @RequestParam String title) {Msg msg = new Msg();msg.setContent(content);msg.setTitle(title);producerService.sendMsg(msg, RabbitConstants.TEST_FANOUT_EXCHANGE, "");}
    }
    
  7. postman测试一下

  8. 查看控制台

  9. 查看数据库

SpringBoot集成RabbitMQ实现消息重试机制,消息重试3次失败后写入死信队列,消息重试3次失败后入库相关推荐

  1. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  2. springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式

    springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...

  3. springboot 集成rabbitmq 实例

    springboot 集成rabbitmq 实例 个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理 ...

  4. Springboot集成RabbitMQ一个完整案例

    springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...

  5. Springboot集成rabbitmq实现延时队列

    Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...

  6. Springboot集成rabbitMQ之mandatory和备份交换机

    Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...

  7. RabbitMQ——SpringBoot集成RabbitMQ

    文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...

  8. springboot集成rabbitmq死信队列的延时队列使用

    目录         1.自动分列延时队列 2.应答失败自动转储延时再通知机制 ------------------------------------------------------------ ...

  9. springboot集成rabbitMQ实现消息的推送

    RabbitMQ消息中间件组件,目前比较流行的消息中间件有:RabbitMQ.RocketMQ.ActiveMQ.Kafka等. 我的代码中用的是RabbitMQ,先介绍几个概念: 一:消息队列的特性 ...

  10. RabbitMQ的消费者处理消息失败后之重试3次,重试3次仍然失败发送到死信队列。

    1.为什么要重试? 如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失.所以我们要在消费者处理消息失败的时候,重试一定的次数.比 ...

最新文章

  1. 企业部署Windows 7指南
  2. ps人像精修照片步骤_ps修图教程:人像精修
  3. 前端学习(1606):数据请求与json-server
  4. springmvc 中@Controller和@RestController的区别
  5. 一般算术表达式转换成后缀式
  6. 深度学习在NLP领域的发展之Transformer
  7. 随笔:谈谈考研二战如何准备
  8. 遥感高光谱分类文献阅读:Exploring Hierarchical Convolutional Features for Hyperspectral Image Classification
  9. BZOJ1085:[SCOI2005]骑士精神——题解+IDA*粗略讲解
  10. python气象绘图速成_气象数据可视化——利用Python绘制温度平流
  11. 爬虫手册03 Selenium的使用
  12. ThinkCMF 框架上的任意内容包含漏洞
  13. 关于windows10系统连接隐藏网络时,显示无法连接问题的解决方式
  14. 程序员之富爸爸穷爸爸思考
  15. 在小鸟云新春采购节买到划算的云服务器
  16. iTween基础之Move(移动)
  17. 真正的程序员就应该这样.
  18. Android Studio报错--Error: Library projects cannot set applicationId. applicationId is set to ...
  19. 【易购管理系统】导航折叠效果
  20. 【转】原码一位乘和移码一位乘

热门文章

  1. [hdu5285]wyh2000 and pupil
  2. CV之ModelScope:基于ModelScope框架的人脸人像数据集利用DCT-Net算法实现人像卡通化图文教程之详细攻略
  3. php 表情,php emoji表情处理
  4. 蚊香液加热器雷达感应,人体存在感应雷达模块,智能控制加热器启动与关闭
  5. linux 更新etc profile,讲解Linux系统中修改/etc/profile文件的方法
  6. 全新织梦DEDE CMS模板-精仿qq技术导航网站源码
  7. CQOI2016滚粗记
  8. 06.Spring Cloud OpenFeign:基于Ribbon和Hystrix的声明式服务调用
  9. state_dict详解--存放训练过程中需要学习的权重和偏执系数
  10. Unity中常用的游戏存档/读档技术