RabbitMQ消息中间件技术精讲(三)
文章目录
- 第三章 `RabbitMQ`高级特性
- 3.1-消息如何保障100%的投递成功方案
- 3.2-幂等性概念及业界主流解决方案
- 3.3-`Confirm`确认消息详解
- 3.4-`Return`消息机制
- 3.5-消费端自定义监听
- 3.6-消费端的限流策略
- 3.7-消费端`ACK`与重回队列机制
- 3.8-`TTL`消息详解
- 3.9-死信队列`DLX`
第三章 RabbitMQ
高级特性
3.1-消息如何保障100%的投递成功方案
生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)确认应答
- 完善的消息进行补偿机制
BAT/TMD
大厂解决方案消息落库,对消息状态进行打标(缺点,在高并发场景下对数据库压力过大)
- 生产者将业务数据和消息入库,status默认为0
- 生产者发送消息到MQ
- 消费者进行消息确认
- 正常消费情况下,update消息状态为1,表示正常消费,如果出现网络闪断等问题,则消息状态为0不变
- 定时任务去数据库查询状态为0的消息
- 针对此类数据,进行数据重发,重复2-3-4-5-6步骤
- 统计重复次数3次以上的消息,不再进行重发,update消息状态为2,等待下一步处理
消息的延迟投递,做二次确认,异步回调检查
1. 生产者(上游服务)先将业务数据,比如订单数据入库,之后再发送消息到MQ中,而且不加事务,因为事务在这影响性能(绿色线)2. 定时在5分钟后,发送延迟检查消息(红色线)3. 消费者(下游服务)监听指定队列,消费消息(绿色线)4. 如果消费者成功接收消息后,将主动发送一条确认消息到MQ(蓝色线)5. 回调服务监听指定队列,该队列监听消费者发送的确认消息,如果成功收到确认消息,表示消息正常消费,则进行入库处理(黑色线)6. 回调服务监听指定队列,该队列监听生产者发送的延迟检查消息,5分钟后接收到消息,回调服务在数据库中检查这条消息是否被正常消费,如果一切正常,则不返回消息(红色线)7. 如果这条消息未正常消费,则回调服务调用RPC服务(rpc-远程服务调用),通知上游服务(生产者)进行重发消息(紫色线)
3.2-幂等性概念及业界主流解决方案
引入
借鉴数据库乐观锁机制:执行一条更新库存SQL
//高并发情况下,同时两条数据更新语句,通过version来预防超卖 update T_goods set count = count -1 ,version = version +1 where version = 1
消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次
消费端幂等性保障
场景:在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
- 消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次
业务主流幂等性操作
唯一ID + 指纹码 机制,利用数据库主键去重
//指纹码,可以是时间戳,可以是区别码,id为主键唯一 select count(1) from t_order where id = 唯一id+指纹码 //查询这个主键是否有值,无则插入数据,有则说明消息已消费入库 //好处:实现简单 //缺点:高并发下有数据写入的性能瓶颈 //解决方案:根据ID进行分库分表进行算法路由,进行分压分流
利用Redis的原子性去实现
使用redis进行幂等, 利用redis set 之后,判断isexist()判断是否存在 另外需要考虑问题 1.我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存之间如何做到原子性?数据一致性 2.如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
3.3-Confirm
确认消息详解
理解
Confirm
消息确认机制消息的确认,是指生产者投递消息后,如果
Broker
收到消息,则会给生产者一个应答生产者接收应答,用来确认这条消息是否正常发送到
Broker
,这种方式也是RabbitMQ消息
的可靠性投递的核心保障[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OXi7Wl6e-1645578206551)(img\confirm消息确认.png)]
实现
Confirm
确认消息- 在
channel
上开启确认模式,channel.confirmSelect()
- 在
channel
上添加监听,addConfirmListener
,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
- 在
代码演示
// 生产者端 public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("118.126.65.50");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String exchangeName = "test_confirm_exchange";String routingKey = "test_routing_key";String exchangeType = "direct";String queueName = "test_confirm_queue";String msg = "confirm message";channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());channel.addConfirmListener(new ConfirmListener() {// broker (消息代理,就是MQ) 当消息投递到了所匹配的队列之后,broker就会发送一个确认信号,给到生产者@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("======有ack=========");}@Overridepublic void handleNack(long l, boolean b) throws IOException {// 没有ack情况,磁盘占满、队列满等System.out.println("======没有ack=========");}});}// 消费者端 public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("118.126.65.50");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_confirm_exchange";String routingKey = "test_routing_key";String queueName = "test_confirm_queue";DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收消息-" +new String(body));}};channel.basicConsume(queueName,true,consumer);}
3.4-Return
消息机制
机制理解
Return Listener
用于处理一些不可路由的消息生产者通过指定一个
Exchange和RoutingKey
,把消息送到某个队列中,然后消费者监听队列,进行消费处理在某些情况下,如果生产者在发送消息时,当前的Exchange不存在或者指定的RoutingKey路由不到指定队列,这个时候要监听这种不可达消息,就要使用
Return Listener
基础
API
配置Mandatory
:如果是true
,则监听器会接收到路由不可达的消息,然后进行后续处理;如果是false
,那么broker
会自动删除该消息 (翻译:adj强制的,n代理人)参数设置
public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.addReturnListener(new ReturnListener() {//replyCode-响应码//replyText-响应文本public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {System.out.println(new String(bytes));}}); //如果消息正常发出,或者设置成false,则return listener不会监听,只有设置成true时才会监听错误返回 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true,properties,msg.getBytes());replyCode:312 replyText:NO_ROUTE exchange:exchange-01 routingKey:routingKey-02 body:hello RabbitMq return
3.5-消费端自定义监听
//4-创建消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("msg-"+new String(body));}};//5-消费数据-队列名称,设置是否自动签收,消费者channel.basicConsume(QUEUE_NAME,true,consumer);
3.6-消费端的限流策略
场景引入
RabbitMQ
服务器有1万条未处理的消息,此时打开消费端服务器,则会出现巨量消息瞬时全部推送,单个消费者端无法同时处理
解决方案
RabbitMQ
提供了一种qos
(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数量的消息未被确认前(通过基于consumer
或者channel
设置Qos
的值),不进行消费新的数据消费端一定不要设置成自动签收,而是手动签收
public void basicQos(int prefetchSize, int prefetchCount, boolean global) //prefetchSize:单条消息大小限制,一般设置0表示不限制 //prefetchCount:一般是1,告诉RabbitMQ不要同时给一个消费者推送多余N个消息,如果N个消息返回了ack,则继续发送消息,如果没有则不要继续发送消息 //global:一般false。true表示应用到channel级别,false表示应用到consumer级别,
代码示例
// 生产端不变 // 消费者 要改变 DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("msg-"+new String(body));//public void basicAck(long deliveryTag, boolean multiple)channel.basicAck(envelope.getDeliveryTag(), false);} }; channel.basicQos(0, 1, false); //限流方法,一定要将autoAck设置成false,表示手动签收 channel.basicConsume(QUEUE_NAME,false,consumer);
3.7-消费端ACK
与重回队列机制
消费端手工
ACK
和NACK
(返回ack
表示成功消费,nack
表示没有正常消费)- 消费端进行消费,如果超过
count
次数后,一直返回nack
,则可能是由于业务逻辑异常导致,此时将消息添加到日志中,然后进行补偿发送 - 如果由于服务器宕机等严重问题,那就需要手工进行
ACK
保障消费端消费成功
- 消费端进行消费,如果超过
消费端重回队列
消费端重回队列,是为了没有处理成功的消息,把消息重新传递给
Broker
,将该条消息放入此队列尾端,重新发送一般实际使用中,都会关闭重回队列,设置成
false
//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)// 消息标签,是否批量,是否重回队列 channel.basicNack(envelope.getDeliveryTag(), false, false);
代码演示
// 生产端代码 for (int i = 0; i < 5; i++) {String msg = "hello RabbitMq " + i;Map<String, Object> header = new HashMap<>();header.put("num",i);// 设置自定义参数AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(header).build();// 发布消息channel.basicPublish("exchange-01","routingKey-01",true,properties,msg.getBytes(StandardCharsets.UTF_8));}// 消费端代码 // 3.创建信道 Channel channel = connection.createChannel(); // 4.创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 进行模拟判断,如果自定义属性的header头中,num参数为0,则返回重回队列中,不进行消费if((Integer) properties.getHeaders().get("num") == 0){// 消息唯一标签,是否批量处理,是否返回重回队列// 观察管控台和控制台,会发现一直打印第0个消息,因为一直在重回队列中不停的发送,一直没有消费,管控台上unacked一直有该条数据channel.basicNack(envelope.getDeliveryTag(),false,true);}else{channel.basicAck(envelope.getDeliveryTag(),false);}System.out.println("消息-"+new String(body));} }; // 5. 消费数据,将第二个参数autoACK设置为false,表示不进行自动签收,而是手动签收 channel.basicConsume("queue-01",false,consumer);
3.8-TTL
消息详解
TTL
(time to live
)生存时间RabbitMQ
支持消息的过期时间,在消息发送时可以进行指定RabbitMQ
支持队列中消息过期时间,从消息进入队列开始计算,只要超过了队列的超时时间配置,则消息自动清除
3.9-死信队列DLX
死信队列 (
DLX dead-letter-exchange
)- 利用
DLX
,当消息在一个队列中变成死信(dead message
)之后,它能被重新publish到另一个Exchange
,这个Exchange
就是DLX
DLX
也是一个正常的Exchange
,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置了某个队列的属性- 当这个队列中有死信时,
RabbitMQ
就会自动的将这个消息重新发布到设置的DLX
死信队列上,进而被路由到另一个队列 - 可以监听这个队列中消息进行相应的处理,这个特性可以弥补
RabbitMQ3.0
之前支持的immediate
参数功能
- 利用
消息变成死信的情况
- 消息被拒绝(
basic.reject/basic.nack
)并且requeue = false
(上面重回队列的设置参数,false
表示不再重回队列) TTL
过期- 队列达到最大长度
- 消息被拒绝(
死信队列设置
首先需要设置死信队列的
Exchange
和Queue
,然后进行绑定Exchange
比如dlx.exchange
Queue
比如dlx.queue
RoutingKey
#
然后正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可
arguments.put("x-dead-letter-exchange","dlx.exchange")
代码演示
- 消费端,创建正常的交换机、队列、并绑定,再创建死信交换机、死信队列、绑定,同时正常队列要设置参数绑定
- 启动消费端,通过管控台,查看是否正常创建队列和交换机
- 关闭消费端,模拟死信队列消息超时的情况
- 生产端,发布消息,但设置消息过期时间为10s
- 启动生产端,会发现5条消息一直堆积在
commonQueue
中,当消息过期后,5条消息自动转移到dlx.queue
中,模拟成功,代码如下:
// 消费端 public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.创建信道Channel channel = connection.createChannel();// 4.创建交换机String commonExchangeName = "commonExchange";String commonExchangeType = "topic";String commonQueueName = "commonQueue";String commonRoutingKey = "common.#";// 交换机,交换机类型,是否持久化,是否自动删除,自定义参数channel.exchangeDeclare(commonExchangeName,commonExchangeType,true,false,null);// 5.创建队列// 队列,是否持久化,是否排他性,是否自动删除,自定义参数HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange","dlx.exchange");channel.queueDeclare(commonQueueName,true,false,false,arguments);// 6.交换机与队列绑定channel.queueBind(commonQueueName,commonExchangeName,commonRoutingKey);// 7.声明死信队列String dlxExchangeName = "dlx.exchange";String dlxExchangeType = "topic";String dlxQueueName = "dlx.queue";String dlxRoutingKey = "#";channel.exchangeDeclare(dlxExchangeName,dlxExchangeType,true,false,null);channel.queueDeclare(dlxQueueName,true,false,false,null);channel.queueBind(dlxQueueName,dlxExchangeName,dlxRoutingKey);// 4.创建消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息-"+new String(body));}};// 5. 消费数据 队列名,是否自动签收,消费者对象channel.basicConsume(commonQueueName,false,consumer);// 8.关闭连接//channel.close();//connection.close();} }// 生产端 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.创建信道Channel channel = connection.createChannel();String commonExchangeName = "commonExchange";String commonRoutingKey = "common.#";// 4.发送数据 (String exchange, String routingKey, BasicProperties props, byte[] body)for (int i = 0; i < 5; i++) {String msg = "hello RabbitMq DLX" + i;AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(commonExchangeName,commonRoutingKey,true,properties,msg.getBytes(StandardCharsets.UTF_8));}// 5.关闭连接//channel.close();//connection.close();} }
RabbitMQ消息中间件技术精讲(三)相关推荐
- RabbitMQ消息中间件技术精讲全集
RabbitMQ消息中间件技术精讲 导航: RabbitMQ消息中间件技术精讲 一. 主流消息中间件介绍 1.1 ActiveMQ 1.2 Kafka 1.3 RocketMQ 1.4 RabbitM ...
- 老的消息中间件投递失败的类型值_RabbitMQ消息中间件技术精讲11 高级篇四 confirm 确认消息...
RabbitMQ消息中间件技术精讲11 高级篇四 confirm 确认消息 理解Confirm消息确认机制: 消息的确认,是指生产者投递消息后,如果broker收到消息,则会给生产者一个应答: 生产者 ...
- Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2
七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...
- [小森数据结构]看电影-顺序表技术精讲
故事前研(言) 小森去看电影<<流浪地球2>>的时候买电影票的场景. 突然一个叫高启强的人买电影票 拉了两个Hei友,来看电影 <<流浪地球2>> 当场 ...
- 视频教程-华为路由交换精讲系列20:OSPF技术精讲 [肖哥]视频课程-华为认证
华为路由交换精讲系列20:OSPF技术精讲 [肖哥]视频课程 肖老师(肖哥),思科认证讲师讲师(CCIE#27529),RedHat Linux认证讲师讲师,Juniper 认证讲师讲师,微软认证讲师 ...
- 技术精讲丨多线程环境下时间轮-海量定时任务的定时器设计
多线程环境下海量定时任务处理-定时器设计 1. 定时器设计 2. 红黑树.最小堆以及跳表的实现对比 3. 时间轮的实现 视频讲解如下,点击观看: 技术精讲丨多线程环境下时间轮-海量定时任务的定时 ...
- Hadoop和大数据技术精讲班
尊敬的先生/女士: 思数于计算和大数据服务中心http://www.bihadoop.com,简称思数于(隶属亍北京思数科技有限公司),是国内与业大数据分析培训.咨询机构.中国于计算大数据处理委员会. ...
- 【韭菜必学】每天学点技术—非常非常重要的BOLL指标精讲
前面我们讲解了MACD RSI KDJ等技术指标,今天讲解一个更加重要的指标-BOLL(布林均线),可能需要花几节课来讲,希望大家认真学习,争取能自己灵活应用,不再做被割的韭菜. 首先我们来学习BOL ...
- MySQL精讲(一) |DQL数据查询语句全解析
MySQL精讲系列文章(更新完毕) <MySQL入门必看知识> <MySQL精讲(一) | DQL数据查询语句全解析> <MySQL精讲(二) | DML数据操作语句全解 ...
最新文章
- python画图颜色表示大小变化_python画图(线条颜色、大小、类型:点、虚线等)(图文详细入门教程四)...
- TortoiseSvn
- mysql fetch next from_MySql 存储过程 动态sql
- MySQL InnoDB Memcached Plugin在Oray公司的实践
- 【NOI2001】炮兵阵地
- 硬件电路基础知识(30)---RS232、RS485、RS422、RJ45接口的区别
- 使用 Pig 进行数据分析
- 修改Maven默认编译级别
- 周界防护在安防行业重要性与日俱增
- 【图像去噪】基于马尔可夫随机场实现图像去噪附matlab代码
- SysML建模工具学习笔记总结一
- mysql数据库外键的作用
- 玩觅伊的女孩,都是一些什么样的人?
- Golang面试问题汇总
- 双光耦开关电源电路图_开关电源中的光耦经典电路设计分析
- 一个HTTP打趴80%面试者
- table thead tr设置表头背景色未完全覆盖的问题
- 解决RequestMapping写在类上页面跳转失败
- 【观察】华为云连续四年蝉联政务云第一,背后是“长跑选手”实力的印证
- 4.MyBatis源码解析-MyBatis扩展点--阿呆中二