前言:

在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被throws或被catch,没有往下走。再比如对方接收代码出现问题,或者网络问题,接口没通,同步失败。

遇到上面同步失败的情况,就会影响到业务的正常使用了,本文只讨论第二种调用失败的情况(第一种情况可以把同步代码封装起来,提供一个接口出来用于手动调用hhhh,很笨但是很救命的办法),所以必须要加入重发机制,来让程序更加的健壮。

因为项目中也用到了rabbitmq,所以第一时间就想到了死信队列,之前的一种实现方式是使用死信队列的超时时间特性,第一次失败后,把参数放入死信队列,但是参数的bean中要有一个记录次数的值,第一次放的时候set为1。在消息超时后放入死信队列,被监听到时,再去调用接口,如果失败了,就按照次数去计算下一次执行的时间,然后重新放入到参数的bean中,再把bean重新放入到正常消费队列中,直到下一次消息超时被死信队列接收。不过这样的是要在代码中设置一个最大循环次数的,否则调用不通的情况下,会一直循环。如果成功了,那么我们就手动调用下 channel.basicAck 去手动签收一下(这里是要在配置中把自动签收改成手动签收)。

spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收

这种方式虽然实现了功能,但是确增大了代码量,尤其是需要增添2个队列(一个正常消费队列,一个消费队列绑定的死信队列),不太方便扩展,而且也增加了复杂度,所以不太推荐这样的写法(代码就不贴了,如果有人感兴趣,可以给我留言,到时候我再更新)。

为了优化掉现有代码,于是我就又重新研究了一下,发现不就是重发么,rabbitmq早就给我们准备好了,-_-|| 自己这又是死信又是手动签收的,,,一顿操作,确实浪费了不少功夫。


实现过程:

那下面我们来看一下是如何实现的。

首先写贴下配置

##rabbit地址
spring.rabbitmq.addresses=amqp://guest:guest@localhost:5672
# 开启重发
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重发次数
spring.rabbitmq.listener.simple.retry.max-attempts=10
# 重试间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
# 重试最大间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=86400000ms
# 重发间隔因子 间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
spring.rabbitmq.listener.simple.retry.multiplier=2

开启rabbitmq的重发机制,并且设置好重试间隔时间(这个间隔时间应该是第一次到第二次的间隔时间,往后的间隔时间是通过间隔因子算的),以及最大间隔时间(避免出现无限重试的问题),还有重要的间隔因子,这样保证了每次的间隔时间是成比例增长的。

配置好后,接下来就要声明我们所用到的队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** rabbitmq重发配置*/
@Configuration
public class RepeatSendRabbitmqConfig {/*** 正常队列*/public final static String REPEAT_QUEUE = "repeat_queue";/*** 交换机*/public final static String REPEAT_EXCHANGE = "repeat_exchange";/*** 路由键*/public final static String REPEAT_ROUTING_KEY = "repeat_routing_key";/*** 声明队列*/@BeanQueue repeatQueue() {return QueueBuilder.durable(REPEAT_QUEUE).build();}/*** 声明交换机* @return*/@BeanDirectExchange repeatExchange() {return new DirectExchange(REPEAT_EXCHANGE);}/*** 将队列和交换机进行绑定* @param repeatQueue* @param repeatExchange* @return*/@BeanBinding dlxBinding(Queue repeatQueue, DirectExchange repeatExchange) {return BindingBuilder.bind(repeatQueue).to(repeatExchange).with(REPEAT_ROUTING_KEY);}}

我们把队列,交换机,路由键都声明好后,下一步就要写接收代码了。

首先我们要写好生产者,也就是把消息放入到消息队列中的那步。


import com.google.gson.Gson;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;/*** 生产者,将消息放入队列*/
@Component
public class MqSender {Gson gson = new Gson();@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 将消息放入队列* @param request*/public void repeatSend(RequestBean request){log.err("repeat request " + gson.toJson(request),"接收时间:" + LocalDateTime.now());rabbitTemplate.convertAndSend(RepeatSendRabbitmqConfig.REPEAT_EXCHANGE,RepeatSendRabbitmqConfig.REPEAT_ROUTING_KEY,request);}
}

注意了,repeatSend这个方法,只运行一次,这里并没有指定超时时间,仅仅是传入了exchange和routing key,通过之前的绑定,就能定位到是哪个队列,然后把参数放到队列中。

(这里有一个小坑,之前是使用的 AmqpTemplate amqpTemplate 这个接口来做数据存放,但是用了其中的方法,在测试的时候偶尔不会消费,,不知道什么原因,为了赶工,就改成使用 RabbitTemplate rabbitTemplate 了,有知道的小伙伴可以留言讨论一下)。

前期准备工作好后,最后一步就是拿来消费了。


import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;import java.time.LocalDateTime;/*** 接收者,消费者*/
@Component
public class MqReceiver {Gson gson = new Gson();@Autowiredprivate RemoteService remoteService;/*** 同步接口重发队列实现* @param request*/@RabbitListener(queues = RepeatSendRabbitmqConfig.REPEAT_QUEUE)public void ListenRepeatSend(RequestBean request){log.err(" ListenRepeatSend request " + gson.toJson(request),"同步时间 " + LocalDateTime.now());BaseResp response = remoteService.send(request);// 重新同步,失败后抛异常重试if(!"SUCCESS".equals(response.getCode())){throw new RuntimeException();}}}

使用 @RabbitListener 指定监听队列,那么这个队列就会被这个消费者所监听了。把参数传入我们自己的同步方法中,如果失败了,我们就抛异常出去,不用做其他的任何操作,只需要抛出去,rabbitmq就会按照配置的时间,以及间隔,来重新执行方法了,直到不抛异常,或者超过了配置中的最大时间,就停止重复执行了。

最后,使用生产者时,直接在代码中直接调用一下就好。

至此,我们的功能就整改完成了。


用rabbitmq实现消息重发的功能相关推荐

  1. android 局域网 推送,通过RabbitMQ实现消息推送功能 可同时实现局域网推送和广域网推送...

    服务端 pom文件 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-sta ...

  2. 通过RabbitMQ实现消息推送功能 可同时实现局域网推送和广域网推送

    服务端 pom文件 <dependency><groupId>org.springframework.boot</groupId><artifactId> ...

  3. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  4. RabbitMQ学习笔记:消息追踪Firehose功能详解

    在使用任何消息中间件的过程中,难免会出现消息异常丢失的情况.对于RabbitMQ而言,可能是生产者与Broker断开了连接并且没有任何重试机制:也可能是消费者在处理消息时发生了异常,不过却提前进行了a ...

  5. RabbitMQ 如何确保消息的成功投递?幂等性?顺序性?

    RabbitMQ 如何确保消息的成功投递?RabbitMQ 如何保证不重复消费,保证数据不丢失?分布式系统里,如何保证数据的一致性?一串连环炮你是否顶得住? 其实这几个问题的原理大同小异,都可以在统一 ...

  6. RabbitMQ 可靠消息传输实战--云平台技术栈12

    导读:之前发布了云平台技术栈(ps:点击可查看),本文主要说一下其中的RabbitMQ! 作者:极客慧 https://my.oschina.net/jikeh/blog/2207127 可能是缓存架 ...

  7. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  8. ActiveMQ的消息重发策略和DLQ处理

    2019独角兽企业重金招聘Python工程师标准>>> ActiveMQ的消息重发策略和DLQ处理 博客分类: MQ 在以下三种情况中,ActiveMQ消息会被重发给客户端/消费者: ...

  9. Redis与RabbitMQ作为消息队列的比较

    本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息 ...

最新文章

  1. 搭建Nginx+Tomcat 负载均衡集群
  2. HTML哪些是块级元素,哪些是行内元素、
  3. MSSQL分组取后每一组的最新一条记录
  4. Web前端笔记-let n = 6 - data.length >>> 0 含义
  5. 理论 | 三天两夜,万字长文,吃透TCP/IP
  6. LINUX下载编译Which
  7. linux mysql 源码包,Linux下MySQL 5.5.15源码包编译安装
  8. unity3d+虚拟城市:技术要点
  9. 51单片机电机测速程序c语言,基于51单片机光电编码器测速.doc
  10. STLINK : Warning: Connection to device 0x413 is lost
  11. 如何让HTML字体变的更小
  12. shell脚本-字符串和变量
  13. Android最新版支付宝钱包的实例源码
  14. 智慧电厂拥抱“双碳”,空冷岛设施数字化转型
  15. XDisplay 安装教程
  16. 红米ac2100有ipv6吗_红米AC2100刷padavan
  17. 微软借“云”掀估值高涨浪潮,百度借智能云启动千亿美金估值航母?
  18. 超级详细的Lua语言的基础教程
  19. 【语音识别】基于matlab电话按键语音识别(含按键录音)【含Matlab源码 1752期】
  20. 域名投资不能被个人眼光和情怀所误导

热门文章

  1. 2021-12-06 自动化专业C语言上机作业参考答案11
  2. 基于STM32的智能鞋柜设计与实现--“臭臭没了”
  3. 首页惨遭降权 台州SEO的一次惨痛优化失误经历
  4. 视频录像软件使用方法!
  5. nlp--基于SpaCy和Networkx的依存树和最短依存路径分析
  6. okhttp-OkGo的文件下载模块
  7. 如何在阿里云上配置安全规则用于开放3CX所需的端口?
  8. 计算机专业排名211大学排名,计算机专业大学排名最新
  9. 从50项到1590项,西咸基层卫生中心这样“提档加速”
  10. 三星承认Galaxy S10和Note 10的指纹识别模组存在漏洞