转:https://blog.csdn.net/u014373554/article/details/92686063

项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败、消息持久化、消费者失败处理方法和发送消息解决方法及手动确认的模式

先引入pom.xml

<!--rabbitmq-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application 配置文件

spring:
rabbitmq:host: IP地址port: 5672username: 用户名password: 密码RabbitConfig配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;/**Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息的载体,每个消息都会被投到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.Routing Key:路由关键字,exchange根据这个关键字进行消息投递。vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。Producer:消息生产者,就是投递消息的程序.Consumer:消息消费者,就是接受消息的程序.Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;public static final String EXCHANGE_A = "my_mq_exchange_A";public static final String EXCHANGE_B = "my_mq_exchange_B";public static final String EXCHANGE_C = "my_mq_exchange_C";public static final String QUEUE_A="QUEUE_A";public static final String QUEUE_B="QUEUE_B";public static final String QUEUE_C="QUEUE_C";public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试connectionFactory.setChannelCacheSize(100);//解决多线程发送消息return connectionFactory;}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());template.setMandatory(true); //设置发送消息失败重试return template;}//配置使用json转递数据@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();}/*public SimpleMessageListenerContainer messageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());return container;}*//*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机* FanoutExchange: 将消息分发到所有的绑定队列,无 routingkey的概念* HeadersExchange: 通过添加属性key - value匹配* DirectExchange: 按照routingkey分发到指定队列* TopicExchange : 多关键字匹配* @return*/@Beanpublic DirectExchange defaultExchange(){return new DirectExchange(EXCHANGE_A,true,false);}@Beanpublic Queue queueA(){return  new Queue(QUEUE_A,true);// 队列持久化}@Beanpublic Queue queueB(){return  new Queue(QUEUE_B,true);// 队列持久化}/*** 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}@Beanpublic Binding bindingB(){return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);}}

生成者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;/*** 生产者*/
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{private RabbitTemplate rabbitTemplate;@Autowiredpublic ProducerMessage(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容rabbitTemplate.setReturnCallback(this::returnedMessage);rabbitTemplate.setMandatory(true);}public void  sendMsg (Object content){CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);}/*** 消息发送到队列中,进行消息确认* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(" 消息确认的id: " + correlationData);if(ack){log.info("消息发送成功");//发送成功 删除本地数据库存的消息}else{log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);// 根据本地消息的状态为失败,可以用定时任务去处理数据}}/*** 消息发送失败返回监控* @param message* @param i* @param s* @param s1* @param s2*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);}
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;/*** 消费者*/@Slf4j
@Componentpublic class ComsumerMessage {@RabbitListener(queues = RabbitConfig.QUEUE_A)public void handleMessage(Message message,Channel channel) throws  IOException{try {String json = new String(message.getBody());JSONObject jsonObject = JSONObject.fromObject(json);log.info("消息了【】handleMessage" +  json);int i = 1/0;//业务处理。/*** 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据* 1、有数据则不消费,直接应答处理* 2、缓存没有数据,则进行消费处理数据,处理完后手动应答* 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)*///手动应答channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){log.error("消费消息失败了【】error:"+ message.getBody());log.error("OrderConsumer  handleMessage {} , error:",message,e);// 处理消息失败,将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);}}}

发送消息:调用生成的方法

import com.zz.blog.BlogApplicationTests;
import com.zz.blog.mq.ProducerMessage;
import net.sf.json.JSONObject;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
public class Message extends BlogApplicationTests {@Autowiredprivate ProducerMessage producerMessage;@Testpublic void sendMessage(){JSONObject jsonObject = new JSONObject();jsonObject.put("id", UUID.randomUUID().toString());jsonObject.put("name","TEST");jsonObject.put("desc","订单已生成");//防止发送消息失败,将发送消息存入本地。producerMessage.sendMsg(jsonObject.toString());}
}

rabbitTemplate的发送消息流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发

当然了,这种解决方式也有一定的问题
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

消息存入本地:在message 发消息的写数据库中。

消息应答成功,则删除本地消息,失败更改消息状态,可以使用定时任务去处理。

消息持久化:

消费者:

/*** 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据* 1、有数据则不消费,直接应答处理* 2、缓存没有数据,则进行消费处理数据,处理完后手动应答* 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)*/

转载于:https://www.cnblogs.com/duende99/p/11597619.html

rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关相关推荐

  1. RabbitMQ消息队列,发送消息失败、消息持久化、消费者失败处理方法和发送消息

    项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败.消息持久化.消费者失败处理方法和发送消息解决方法及手动确认的模式 先引入pom.xml <!--rabbitmq- ...

  2. 消息队列、RabbitMQ原理、消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题

    消息队列 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已 常见的消息队列 RabbitMq ActiveM ...

  3. rabbitmq 手动提交_RabbitMQ系列(四)RabbitMQ事务和Confirm发送方消息确认——深入解读 - 王磊的博客 - 博客园...

    RabbitMQ事务和Confirm发送方消息确认--深入解读 RabbitMQ系列文章 引言 根据前面的知识( 深入了解RabbitMQ工作原理及简单使用 . Rabbit的几种工作模式介绍与实践 ...

  4. 消息队列(定义、结构、如何创建、消息队列的发送与接收、发送与接收实例)

    一.定义 1.消息队列是一种先进先出的队列型数据结构,实际上是系统内核中的一个内部链表.消息被顺序插入队列中,其中发送进程将消息添加到队列末尾,接受进程从队列头读取消息. 2.多个进程可同时向一个消息 ...

  5. 消息队列面试 - 如何保证消息的可靠性传输?

    消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...

  6. 何为消息队列,为何使用消息队列,有什么消息队列插件

    一.什么叫消息队列 MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息.因为消息的生产和消费都是异步 ...

  7. 消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点?

    消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点? 面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区 ...

  8. 消息队列-简单介绍Java消息队列,什么是消息队列,作用以及常见消息队列

    天天说队列, 项目请求数据不能及时处理时,就一言不合通过队列啊, 心中那个是妈卖批,那么到底什么队列呢,队列有到底运用于哪些运用场景呢; 先说说应用场景吧, 不知道有啥作用,看多了含义,原理什么的还是 ...

  9. 消息队列面试 - 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

    消息队列面试 - 如何保证消息不被重复消费? 面试题 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性? 面试官心理分析 其实这是很常见的一个问题,这俩问题基本可以连起来问.既然是消费消息, ...

  10. 消息队列面试 - 如何保证消息的顺序性?

    消息队列面试 - 如何保证消息的顺序性? 面试题 如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的 ...

最新文章

  1. selenium+Edge浏览器实现web端自动化测试
  2. 第十六届智能车竞赛广东线上比赛 - 哈尔滨工业大学(深圳)比赛筹备
  3. 海量分布式爬取抖音视频,几行代码搞定
  4. 基于 HTML5 的 WebGL 自定义 3D 摄像头监控模型
  5. Day 5: GruntJS——重复乏味的工作总会有人做(反正我不做)
  6. 42.移动构造函数的合成规则
  7. 1814: 一元三次方程求解
  8. 德国巴伐利亚山谷积雪遍地 汽车被大雪掩埋
  9. python 横坐标旋转,python 横坐标旋转
  10. php pc_base,phpcms二次开发之base.php的桥梁作用_PHP教程
  11. C++判断一个序列是否为堆(最大堆、最小堆)
  12. css grid 自动高度_前端面试题:关于CSS布局
  13. access 导入 txt sql语句_从零开始学 MySQL - 数据库的导入导出和备份
  14. PostgreSQL 命令行客户端 psql 使用指南
  15. Zemax仿真中像质评价及方法
  16. ubuntu中firfox插件xmarks的同步问题
  17. 锐目对讲机的使用方法详解
  18. NAACL2022-Prompt相关论文对Prompt的看法
  19. 斯蒂夫·乔布斯 与苹果公司
  20. clickhouse连接Tableau

热门文章

  1. 洛谷 U3357 C2-走楼梯
  2. Linux tar命令高级用法——备份数据
  3. 自定义View控件(2—手写实例代码)
  4. EDM的九大用途盘点
  5. 昨天7月21号,笑笑又生病了
  6. valgrind检测libevent内存泄露
  7. 5.19 - Stacks and Queues
  8. 《MySQL排错指南》——1.9 许可问题
  9. Android App 优化之 ANR 详解
  10. Linux直接与编译安装Vsftpd服务器