一、简介

生产端的可靠性投递:

  • 保障消息的成功发出;
  • 保障MQ节点的成功接收;
  • 发送端收到MQ节点(Broker) 确认应答;
  • 完善的消息补偿机制;

在实际项目中某些场景下,必须保证消息不会出现丢失情况,这时候就需要我们对消息可靠性传输解决方法有所认识,才能在各种特殊情况下不会出现消息丢失、消息多发现象等。生产者可靠性传输有两种方案:

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查

本文将使用"消息落库,对消息状态进行打标"方式保证消息可靠性传输。

二、原理图

思路:

【1】业务数据以及MQ消息入库处理:首先将业务数据保存到数据库中,然后生成一条消息,也保存到消息记录表中,同时指定消息初始化状态为0(发送中)。

【2】生产者发送消息到MQ服务器,生产者端监听ConfirmCallback回调。

【3】生产者接收到Broker发送的确认应答信号,判断该条消息是否发送成功,如果成功,那么更新该条消息的状态为1(发送成功),如果发送失败的话,则进行重发尝试。

【4】假设在消息确认的时候由于网络闪断,MQ Broker端异常等原因导致 回送消息失败或者异常,导致生产者未能成功收到确认消息,针对这种情况,我们可以使用定时任务,去定时查询数据库中距离消息创建时间超过5分钟的且状态为0的消息,然后对这些消息进行重试尝试。

【5】当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2(发送失败),表示这个消息最终投递失败,可以通过人工去处理这些失败消息。(或者把消息转储到失败消息记录表中)。

下面我们使用springboot + mybatis实现一个消息可靠性传输的案例,具体代码如下:

三、案例

【1】数据库脚本:

/*
SQLyog Ultimate v11.24 (32 bit)
MySQL - 5.5.44 : Database - rabbitmq_reliability
*********************************************************************
*//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`rabbitmq_reliability` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `rabbitmq_reliability`;/*Table structure for table `broker_message_log` */DROP TABLE IF EXISTS `broker_message_log`;CREATE TABLE `broker_message_log` (`message_id` varchar(128) NOT NULL,`message` varchar(4000) DEFAULT NULL,`try_count` int(4) DEFAULT '0',`status` varchar(10) DEFAULT '',`next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',`create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',`update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Table structure for table `t_order` */DROP TABLE IF EXISTS `t_order`;CREATE TABLE `t_order` (`id` varchar(128) NOT NULL,`name` varchar(128) DEFAULT NULL,`message_id` varchar(128) NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

【2】pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.21.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.wsh.springboot</groupId><artifactId>springboot_rabbotmq_topic_exchange</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot_rabbotmq_topic_exchange</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

【3】application.yml:主要包括rabbitmq、数据源、mybatis配置等

server:port: 4444
spring:application:name: rabbitmq-message-abilitity-demorabbitmq:host: 127.0.0.1virtual-host: /vhostusername: wshpassword: wshport: 5672publisher-confirms: truepublisher-returns: truetemplate:mandatory: truelistener:simple:acknowledge-mode: manualdatasource:username: rootpassword: wsh0905driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/rabbitmq_reliability?characterEncoding=utf8http:encoding:charset: utf-8jackson:time-zone: GMT+8date-format: yyyy-MM-dd HH:mm:ssdefault-property-inclusion: non_null
mybatis:mapper-locations: classpath:mapping/*Mapper.xmltype-aliases-package: com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity

【4】实体类OrderInfo:

import java.io.Serializable;/*** @Description: 订单实体类* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 8:59*/
public class OrderInfo implements Serializable {/*** 订单ID*/private String id;/*** 订单名称*/private String name;/*** 消息ID*/private String messageId;public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}
}

【5】实体类BrokerMessageLog:

import java.util.Date;/*** @Description: 消息记录实体类* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 9:00*/
public class BrokerMessageLog {/*** 消息ID*/private String messageId;/*** 消息内容*/private String message;/*** 重试次数*/private Integer tryCount;/*** 消息的状态*/private String status;/*** 下一次重试时间*/private Date nextRetry;/*** 创建时间*/private Date createTime;/*** 更新时间*/private Date updateTime;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public Integer getTryCount() {return tryCount;}public void setTryCount(Integer tryCount) {this.tryCount = tryCount;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public Date getNextRetry() {return nextRetry;}public void setNextRetry(Date nextRetry) {this.nextRetry = nextRetry;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime = updateTime;}
}

【6】RabbitMQ配置类:声明队列、交换机、绑定关系等

/*** @Description: RabbitMQ配置类* @author: weishihuai* @Date: 2019/7/28 10:34*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue queue() {return new Queue(Constants.QUEUE_NAME);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(Constants.EXCHANGE_NAME);}@Beanpublic Binding bindQueue() {return BindingBuilder.bind(queue()).to(topicExchange()).with(Constants.ROUTE_KEY);}}

【7】定时任务配置类

/*** @Description: 定时任务配置类* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 9:06*/
@Configuration
@EnableScheduling
public class SchedulerConfiguration implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {scheduledTaskRegistrar.setScheduler(taskScheduler());}@Beanpublic Executor taskScheduler() {//创建一个定长的线程池return Executors.newScheduledThreadPool(10);}}

【8】自定义消息发送确认回调配置类:  implements RabbitTemplate.ConfirmCallback

import com.wsh.springboot.springboot_rabbotmq_topic_exchange.constant.Constants;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.BrokerMessageLogMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Date;/*** @Description 自定义消息发送确认的回调* @Author weishihuai* @Date 2019/7/28 10:42*/
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback {private static final Logger logger = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);}/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {String messageId = correlationData.getId();if (isSendSuccess) {//如果消息到达MQ Broker,更新消息brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());} else {logger.error("消息发送异常...");}}}

【9】全局常量类:

/*** @Description: 常量类* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 9:09*/
public class Constants {/*** 队列名称*/public static final String QUEUE_NAME = "order-queue";/*** 交换机名称*/public static final String EXCHANGE_NAME = "order-exchange";/*** 错误交换机名称*/public static final String ERROR_EXCHANGE_NAME = "error-order-exchange";/*** 路由键*/public static final String ROUTE_KEY = "order.#";/*** 消息状态(发送中)*/public static final String ORDER_SENDING = "0";/*** 消息状态(成功)*/public static final String ORDER_SEND_SUCCESS = "1";/*** 消息状态(失败)*/public static final String ORDER_SEND_FAILURE = "2";/*** 消息超时时间(一分钟)*/public static final int ORDER_TIMEOUT = 1;}

【10】订单Mapper持久层接口:

import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.OrderInfo;
import org.springframework.stereotype.Repository;/*** @Description: 订单Mapper* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 15:47*/
@Repository
public interface OrderMapper {/*** 保存订单信息** @param orderInfo 订单信息*/void saveOrder(OrderInfo orderInfo);}

【11】消息记录Mapper持久层接口:

/*** @Description: 消息发送记录Mapper* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 15:34*/
@Repository
public interface BrokerMessageLogMapper {/*** 查询消息状态为0(发送中) 且已经超时的消息集合** @return 消息集合*/List<BrokerMessageLog> queryTimeoutBrokerMessageLog();/*** 更新消息重试发送次数** @param messageId  消息ID* @param updateTime 更新时间*/void updateBrokerMessageLogRetryCount(@Param("messageId") String messageId, @Param("updateTime") Date updateTime);/*** 更新消息发送状态** @param messageId  消息ID* @param status     消息的状态* @param updateTime 更新时间*/void changeBrokerMessageLogStatus(@Param("messageId") String messageId, @Param("status") String status, @Param("updateTime") Date updateTime);/*** 保存消息发送记录消息** @param brokerMessageLog 消息发送记录*/void saveBrokerMessageLog(BrokerMessageLog brokerMessageLog);
}

【12】订单Mapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.OrderMapper"><!--保存订单信息--><insert id="saveOrder"parameterType="com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.OrderInfo">insert into t_order(id,name,message_id) values(#{id},#{name},#{messageId})</insert></mapper>

【13】消息记录Mapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.BrokerMessageLogMapper"><!--查询消息状态为0(发送中) 且已经超时的消息集合--><select id="queryTimeoutBrokerMessageLog"resultType="com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.BrokerMessageLog">SELECT t.`create_time` AS createTime,t.`message` AS message,t.`message_id` AS messageId,t.`next_retry` AS nextRetry, t.`status` AS STATUS,t.`try_count` AS tryCount,t.`update_time` AS updateTimefrom broker_message_log twhere status = '0'and next_retry <![CDATA[ <= ]]> sysdate()</select><!--更新消息重试发送次数--><update id="updateBrokerMessageLogRetryCount">update broker_message_log bmlset bml.try_count = bml.try_count + 1,bml.update_time = #{updateTime, jdbcType=TIMESTAMP}where bml.message_id = #{messageId,jdbcType=VARCHAR}</update><!--更新消息发送状态--><update id="changeBrokerMessageLogStatus">update broker_message_log bmlset bml.status = #{status,jdbcType=VARCHAR},bml.update_time = #{updateTime, jdbcType=TIMESTAMP}where bml.message_id = #{messageId,jdbcType=VARCHAR}</update><!--保存消息发送记录消息--><insert id="saveBrokerMessageLog"parameterType="com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.BrokerMessageLog">INSERT INTO `broker_message_log`(`message_id`,`message`,`try_count`,`status`,`next_retry`,`create_time`,`update_time`)VALUES (#{messageId},#{message},#{tryCount},#{status},#{nextRetry},#{createTime},#{updateTime})</insert></mapper>

【14】订单服务层接口OrderService:

import com.alibaba.fastjson.JSONObject;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.constant.Constants;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.BrokerMessageLog;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.OrderInfo;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.BrokerMessageLogMapper;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.OrderMapper;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.producer.Producer;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;/*** @Description: 订单服务层接口* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 15:36*/
@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;@Autowiredprivate Producer producer;public void saveOrderInfo(OrderInfo order) throws Exception {Date nowDate = new Date();// 1.保存订单信息orderMapper.saveOrder(order);// 2.保存消息记录信息BrokerMessageLog brokerMessageLog = new BrokerMessageLog();brokerMessageLog.setMessageId(order.getMessageId());brokerMessageLog.setMessage(JSONObject.toJSONString(order));// 初始化消息状态为0brokerMessageLog.setStatus(Constants.ORDER_SENDING);// 初始化已重试次数为0brokerMessageLog.setTryCount(0);// 设置消息超时时间为一分钟brokerMessageLog.setNextRetry(DateUtils.addMinutes(nowDate, Constants.ORDER_TIMEOUT));brokerMessageLog.setCreateTime(nowDate);brokerMessageLog.setUpdateTime(nowDate);brokerMessageLogMapper.saveBrokerMessageLog(brokerMessageLog);// 3.消息发送者发送消息到MQproducer.sendOrder(order);}}

【15】消息重发定时器:容器启动后,延迟5秒后执行定时器,以后每10秒再执行一次该定时器

import com.alibaba.fastjson.JSONObject;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.constant.Constants;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.BrokerMessageLog;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.OrderInfo;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper.BrokerMessageLogMapper;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.List;/*** @Description: 消息重发定时器* @Author: weixiaohuai* @Date: 2019/7/28* @Time: 15:50*/
@Component
public class MessageReSendScheduler {private static final Logger logger = LoggerFactory.getLogger(MessageReSendScheduler.class);@Autowiredprivate Producer producer;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** 容器启动后,延迟5秒后执行定时器,以后每10秒再执行一次该定时器*/@Scheduled(initialDelay = 5000, fixedDelay = 10000)public void reSend() {//查询出状态为0以及超时的消息List<BrokerMessageLog> list = brokerMessageLogMapper.queryTimeoutBrokerMessageLog();if (null != list && list.size() > 0) {for (BrokerMessageLog brokerMessageLog : list) {if (null != brokerMessageLog) {if (brokerMessageLog.getTryCount() >= 3) {logger.info("消息【{}】重试三次之后仍失败..", brokerMessageLog.getMessageId());//重试失败三次,更新消息状态为发送失败brokerMessageLogMapper.changeBrokerMessageLogStatus(brokerMessageLog.getMessageId(), Constants.ORDER_SEND_FAILURE, new Date());} else {// 如果重试次数小于三次,那么进行重发brokerMessageLogMapper.updateBrokerMessageLogRetryCount(brokerMessageLog.getMessageId(), new Date());OrderInfo orderInfo = JSONObject.parseObject(brokerMessageLog.getMessage(), OrderInfo.class);try {logger.info("消息【{}】即将进行重发尝试..", brokerMessageLog.getMessageId());producer.sendOrder(orderInfo);} catch (Exception e) {e.printStackTrace();logger.error("消息处理异常..");}}}}}}
}

【16】消息发送者Producer:

import com.wsh.springboot.springboot_rabbotmq_topic_exchange.constant.Constants;
import com.wsh.springboot.springboot_rabbotmq_topic_exchange.entity.OrderInfo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Description: 消息发送者* @author: weishihuai* @Date: 2019/7/28 10:27*/
@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendOrder(OrderInfo order) throws Exception {//消息唯一IDCorrelationData correlationData = new CorrelationData(order.getMessageId());//正常发送消息rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME, "order.save", order, correlationData);//指定一个不存在的交换机,这样触发confirmCallback失败回调,进行重发尝试
//        rabbitTemplate.convertAndSend(Constants.ERROR_EXCHANGE_NAME, "order.save", order, correlationData);}
}

【17】消息消费者Consumer:

import com.wsh.springboot.springboot_rabbotmq_topic_exchange.constant.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Description: 消息消费者* @author: weishihuai* @Date: 2019/7/28 11:30*/
@Component
public class Consumer {private static final Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitListener(queues = {Constants.QUEUE_NAME})public void receiveMessage(Message message) {logger.info("【消费者接收到消息:{}】", message);}}

【18】应用启动类:

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@MapperScan("com.wsh.springboot.springboot_rabbotmq_topic_exchange.mapper")
public class SpringbootRabbotmqTopicExchangeApplication {public static void main(String[] args) {SpringApplication.run(SpringbootRabbotmqTopicExchangeApplication.class, args);}}

【19】测试发送订单:验证订单业务数据以及消息记录是否正确进行入库。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbotmqTopicExchangeApplicationTests {@Autowiredprivate Producer producer;@Autowiredprivate OrderService orderService;@Testpublic void testCreateOrder() throws Exception {OrderInfo order = new OrderInfo();order.setId(UUID.randomUUID().toString());order.setName("图书订单");order.setMessageId(UUID.randomUUID().toString());orderService.saveOrderInfo(order);}}

启动项目,并且运行上面的单元测试,通过sqlyong查询两个表数据:

订单表:

消息记录表:

可见,由于指定的routeKey正确路由,订单数据以及消息记录都成功入库,并且消息记录表中的消息状态为1(发送成功)。

【20】接着,修改一下消息发送的目标交换机,随便指定一个不存在的交换机名称,这时候消息会发生丢失,从而触发confirmCallback的失败回调,所以该条消息的状态为0(发送中)。由于此时存在定时器去定时查询状态为0并且消息已经过时的消息,进行重发尝试,如果重试三次还是未能发送成功,将更新消息状态为2,表示投递失败。

继续运行上面的测试用例,查看结果:

订单表:

消息记录表:

控制台输出:可见,此条消息重复尝试了三次之后依然失败,这时候消息记录表中的消息状态被更新为2(发送失败)。

四、总结

本文通过将消息标识状态方式进行包装消息可靠性传输,该种方式适合于数据量不会特别大的场合,在高并发下,由于需要进行两次数据入库操作,对数据库压力蛮大的,所以还有一种方式解决消息可靠性传输,就是 "消息的延迟投递,做二次确认,回调检查",有空会去研究一下。本文是在笔者看到https://www.imooc.com/article/49814大神这篇博客之后启发自己写的一个示例,也加深了自己对可靠性传输解决方案的认识。

RabbitMQ消息可靠性传输示例相关推荐

  1. RabbitMQ消息可靠性分析和应用

    2019独角兽企业重金招聘Python工程师标准>>> RabbitMQ流程简介(带Exchange) RabbitMQ使用一些机制来保证可靠性,如持久化.消费确认及发布确认等. 先 ...

  2. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  3. 【MQ】如何确保RabbitMQ消息可靠性传递?

    文章内容 1. Producer-to-Exchange 2. Exchange-to-Queue 3. Queue Sotrage 4. Queue-to-Consumer 5. Others 正在 ...

  4. 【内部技术分享PPT】漫谈 RabbitMQ 消息可靠性

  5. Rabbitmq消息保存机制应用案例分析消息可靠性保证

    Rabbitmq 消息保存机制 mandatory参数和immediate参数作用 mandatory:当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,Rabbitmq ...

  6. rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)

    工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...

  7. 详解SpringCloud中RabbitMQ消息队列原理及配置,一篇就够!

    作者:kosamino cnblogs.com/jing99/p/11679426.html 一.MQ用途 1.同步变异步消息 场景:用户下单完成后,发送邮件和短信通知. 运用消息队列之后,用户下单完 ...

  8. 消息队列面试 - 如何保证消息的可靠性传输?

    消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...

  9. 2.RabbitMQ 的可靠性消息的发送

      本篇包含 1. RabbitMQ 的可靠性消息的发送 2. RabbitMQ 集群的原理与高可用架构的搭建 3. RabbitMQ 的实践经验   上篇包含 1.MQ 的本质,MQ 的作用 2.R ...

  10. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

最新文章

  1. MacApp开发Error Domain=NSURLErrorDomain Code=-1003 A server with the specified hostname could not be
  2. c++的uint8不赋值_2021国考 | 用对方法后,赋值法竟然变得如此简单!
  3. 数字时代职业生涯规划
  4. python opencv 如何获取图像的尺寸(宽高)(分辨率)(大小)img.shape
  5. 容器学习 之 base镜像(四)
  6. MariaDB 双主复制的配置
  7. [Hei-Ocelot-Gateway ].Net Core Api网关Ocelot的开箱即用版本
  8. android评论嵌套,android 嵌套的listview示例(可参照实现朋友圈评论)
  9. 基于昇腾处理器的目标检测应用(ACL)
  10. c语言增加动态分配的存储空间吗,C语言 关于内存动态分配问题
  11. 调整Word中英文与汉字之间的空隙
  12. 给出年、月、日,计算该日是该年的第几天
  13. CPU-AMD处理器的驱动下载地址
  14. 10寸 nuc972_新唐ARM9之NUC972学习历程之系统的搭建和BSP包的使用
  15. LibreELEC(kodi)安装 IPTV
  16. 纬衡、金蝶、腾讯、迅雷获深圳软件明星企业称号
  17. 1.8版道士装备取向与PK方法
  18. 那些年我不知道为啥的电脑“灵异事件”
  19. CAD打断曲线(网页版)
  20. π120M31 10Mbps双通道数字隔离器兼容代替ADuM1200BR

热门文章

  1. html响应式布局media,JS中使用media实现响应式布局_飛雲_前端开发者
  2. GCN图卷积网络简单实现
  3. 矩阵计算 动手学深度学习 pytorch
  4. 机器学习 Machine Learning- 吴恩达Andrew Ng 第21~25课总结
  5. 17.20. 连续中值 堆
  6. 203.移除链表元素
  7. 虚继承中的构造函数的调用
  8. Neurons and the brains
  9. ServletContext的用法
  10. php 老是报错没有定义,php中的错误处理与异常处理机制介绍