文章目录

  • 第三章 `RabbitMQ`高级特性
    • 3.1-消息如何保障100%的投递成功方案
    • 3.2-幂等性概念及业界主流解决方案
    • 3.3-`Confirm`确认消息详解
    • 3.4-`Return`消息机制
    • 3.5-消费端自定义监听
    • 3.6-消费端的限流策略
    • 3.7-消费端`ACK`与重回队列机制
    • 3.8-`TTL`消息详解
    • 3.9-死信队列`DLX`

第三章 RabbitMQ高级特性

3.1-消息如何保障100%的投递成功方案

  1. 生产端的可靠性投递

    • 保障消息的成功发出
    • 保障MQ节点的成功接收
    • 发送端收到MQ节点(Broker)确认应答
    • 完善的消息进行补偿机制
  2. BAT/TMD大厂解决方案

    • 消息落库,对消息状态进行打标(缺点,在高并发场景下对数据库压力过大)

      1. 生产者将业务数据和消息入库,status默认为0
      2. 生产者发送消息到MQ
      3. 消费者进行消息确认
      4. 正常消费情况下,update消息状态为1,表示正常消费,如果出现网络闪断等问题,则消息状态为0不变
      5. 定时任务去数据库查询状态为0的消息
      6. 针对此类数据,进行数据重发,重复2-3-4-5-6步骤
      7. 统计重复次数3次以上的消息,不再进行重发,update消息状态为2,等待下一步处理

    • 消息的延迟投递,做二次确认,异步回调检查

 1. 生产者(上游服务)先将业务数据,比如订单数据入库,之后再发送消息到MQ中,而且不加事务,因为事务在这影响性能(绿色线)2. 定时在5分钟后,发送延迟检查消息(红色线)3. 消费者(下游服务)监听指定队列,消费消息(绿色线)4. 如果消费者成功接收消息后,将主动发送一条确认消息到MQ(蓝色线)5. 回调服务监听指定队列,该队列监听消费者发送的确认消息,如果成功收到确认消息,表示消息正常消费,则进行入库处理(黑色线)6. 回调服务监听指定队列,该队列监听生产者发送的延迟检查消息,5分钟后接收到消息,回调服务在数据库中检查这条消息是否被正常消费,如果一切正常,则不返回消息(红色线)7. 如果这条消息未正常消费,则回调服务调用RPC服务(rpc-远程服务调用),通知上游服务(生产者)进行重发消息(紫色线)

3.2-幂等性概念及业界主流解决方案

  1. 引入

    • 借鉴数据库乐观锁机制:执行一条更新库存SQL

      //高并发情况下,同时两条数据更新语句,通过version来预防超卖
      update T_goods set count = count -1 ,version = version +1 where version = 1
      
    • 消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次

  2. 消费端幂等性保障

    • 场景:在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

      1. 消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次
    • 业务主流幂等性操作

      • 唯一ID + 指纹码 机制,利用数据库主键去重

        //指纹码,可以是时间戳,可以是区别码,id为主键唯一
        select count(1) from t_order where id = 唯一id+指纹码
        //查询这个主键是否有值,无则插入数据,有则说明消息已消费入库
        //好处:实现简单
        //缺点:高并发下有数据写入的性能瓶颈
        //解决方案:根据ID进行分库分表进行算法路由,进行分压分流
        
      • 利用Redis的原子性去实现

        使用redis进行幂等,
        利用redis set 之后,判断isexist()判断是否存在
        另外需要考虑问题
        1.我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存之间如何做到原子性?数据一致性
        2.如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
        

3.3-Confirm确认消息详解

  1. 理解Confirm消息确认机制

    • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答

    • 生产者接收应答,用来确认这条消息是否正常发送到Broker,这种方式也是RabbitMQ消息的可靠性投递的核心保障

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OXi7Wl6e-1645578206551)(img\confirm消息确认.png)]

  2. 实现Confirm确认消息

    1. channel上开启确认模式,channel.confirmSelect()
    2. channel上添加监听,addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
  3. 代码演示

    // 生产者端
    public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("118.126.65.50");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String exchangeName = "test_confirm_exchange";String routingKey   = "test_routing_key";String exchangeType = "direct";String queueName    = "test_confirm_queue";String msg          = "confirm message";channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());channel.addConfirmListener(new ConfirmListener() {// broker (消息代理,就是MQ) 当消息投递到了所匹配的队列之后,broker就会发送一个确认信号,给到生产者@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("======有ack=========");}@Overridepublic void handleNack(long l, boolean b) throws IOException {// 没有ack情况,磁盘占满、队列满等System.out.println("======没有ack=========");}});}// 消费者端
    public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("118.126.65.50");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_confirm_exchange";String routingKey   = "test_routing_key";String queueName    = "test_confirm_queue";DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收消息-" +new String(body));}};channel.basicConsume(queueName,true,consumer);}
    

3.4-Return消息机制

  1. 机制理解

    • Return Listener用于处理一些不可路由的消息

    • 生产者通过指定一个Exchange和RoutingKey,把消息送到某个队列中,然后消费者监听队列,进行消费处理

    • 在某些情况下,如果生产者在发送消息时,当前的Exchange不存在或者指定的RoutingKey路由不到指定队列,这个时候要监听这种不可达消息,就要使用Return Listener

  2. 基础API配置

    • Mandatory:如果是true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果是false,那么broker会自动删除该消息 (翻译:adj强制的,n代理人)

    • 参数设置 public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

      channel.addReturnListener(new ReturnListener() {//replyCode-响应码//replyText-响应文本public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {System.out.println(new String(bytes));}});
      //如果消息正常发出,或者设置成false,则return listener不会监听,只有设置成true时才会监听错误返回
      channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true,properties,msg.getBytes());replyCode:312
      replyText:NO_ROUTE
      exchange:exchange-01
      routingKey:routingKey-02
      body:hello RabbitMq return
      

3.5-消费端自定义监听

//4-创建消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("msg-"+new String(body));}};//5-消费数据-队列名称,设置是否自动签收,消费者channel.basicConsume(QUEUE_NAME,true,consumer);

3.6-消费端的限流策略

  1. 场景引入

    • RabbitMQ服务器有1万条未处理的消息,此时打开消费端服务器,则会出现巨量消息瞬时全部推送,单个消费者端无法同时处理
  2. 解决方案

    • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数量的消息未被确认前(通过基于consumer或者channel设置Qos的值),不进行消费新的数据

    • 消费端一定不要设置成自动签收,而是手动签收

      public void basicQos(int prefetchSize, int prefetchCount, boolean global)
      //prefetchSize:单条消息大小限制,一般设置0表示不限制
      //prefetchCount:一般是1,告诉RabbitMQ不要同时给一个消费者推送多余N个消息,如果N个消息返回了ack,则继续发送消息,如果没有则不要继续发送消息
      //global:一般false。true表示应用到channel级别,false表示应用到consumer级别,
      
  3. 代码示例

    // 生产端不变
    // 消费者 要改变
    DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("msg-"+new String(body));//public void basicAck(long deliveryTag, boolean multiple)channel.basicAck(envelope.getDeliveryTag(), false);}
    };
    channel.basicQos(0, 1, false);
    //限流方法,一定要将autoAck设置成false,表示手动签收
    channel.basicConsume(QUEUE_NAME,false,consumer);
    

3.7-消费端ACK与重回队列机制

  1. 消费端手工ACKNACK(返回ack表示成功消费,nack表示没有正常消费)

    • 消费端进行消费,如果超过count次数后,一直返回nack,则可能是由于业务逻辑异常导致,此时将消息添加到日志中,然后进行补偿发送
    • 如果由于服务器宕机等严重问题,那就需要手工进行ACK保障消费端消费成功
  2. 消费端重回队列

    • 消费端重回队列,是为了没有处理成功的消息,把消息重新传递给Broker,将该条消息放入此队列尾端,重新发送

    • 一般实际使用中,都会关闭重回队列,设置成false

      //public void basicNack(long deliveryTag, boolean multiple, boolean requeue)//                       消息标签,是否批量,是否重回队列
      channel.basicNack(envelope.getDeliveryTag(), false, false);
      
  3. 代码演示

    // 生产端代码
    for (int i = 0; i < 5; i++) {String msg = "hello RabbitMq " + i;Map<String, Object> header = new HashMap<>();header.put("num",i);// 设置自定义参数AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(header).build();// 发布消息channel.basicPublish("exchange-01","routingKey-01",true,properties,msg.getBytes(StandardCharsets.UTF_8));}// 消费端代码
    // 3.创建信道
    Channel channel = connection.createChannel();
    // 4.创建消费者
    DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 进行模拟判断,如果自定义属性的header头中,num参数为0,则返回重回队列中,不进行消费if((Integer) properties.getHeaders().get("num") == 0){// 消息唯一标签,是否批量处理,是否返回重回队列// 观察管控台和控制台,会发现一直打印第0个消息,因为一直在重回队列中不停的发送,一直没有消费,管控台上unacked一直有该条数据channel.basicNack(envelope.getDeliveryTag(),false,true);}else{channel.basicAck(envelope.getDeliveryTag(),false);}System.out.println("消息-"+new String(body));}
    };
    // 5. 消费数据,将第二个参数autoACK设置为false,表示不进行自动签收,而是手动签收
    channel.basicConsume("queue-01",false,consumer);
    

3.8-TTL消息详解

  1. TTLtime to live生存时间

    • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
    • RabbitMQ支持队列中消息过期时间,从消息进入队列开始计算,只要超过了队列的超时时间配置,则消息自动清除

3.9-死信队列DLX

  1. 死信队列 (DLX dead-letter-exchange

    • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
    • DLX也是一个正常的Exchange,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置了某个队列的属性
    • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的DLX死信队列上,进而被路由到另一个队列
    • 可以监听这个队列中消息进行相应的处理,这个特性可以弥补RabbitMQ3.0之前支持的immediate参数功能
  2. 消息变成死信的情况

    • 消息被拒绝(basic.reject/basic.nack)并且requeue = false(上面重回队列的设置参数,false表示不再重回队列)
    • TTL过期
    • 队列达到最大长度
  3. 死信队列设置

    • 首先需要设置死信队列的ExchangeQueue,然后进行绑定

      • Exchange 比如 dlx.exchange
      • Queue 比如 dlx.queue
      • RoutingKey #
    • 然后正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可

      arguments.put("x-dead-letter-exchange","dlx.exchange")

  4. 代码演示

    1. 消费端,创建正常的交换机、队列、并绑定,再创建死信交换机、死信队列、绑定,同时正常队列要设置参数绑定
    2. 启动消费端,通过管控台,查看是否正常创建队列和交换机
    3. 关闭消费端,模拟死信队列消息超时的情况
    4. 生产端,发布消息,但设置消息过期时间为10s
    5. 启动生产端,会发现5条消息一直堆积在commonQueue中,当消息过期后,5条消息自动转移到dlx.queue中,模拟成功,代码如下:
    // 消费端
    public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.创建信道Channel channel = connection.createChannel();// 4.创建交换机String commonExchangeName = "commonExchange";String commonExchangeType = "topic";String commonQueueName    = "commonQueue";String commonRoutingKey   = "common.#";// 交换机,交换机类型,是否持久化,是否自动删除,自定义参数channel.exchangeDeclare(commonExchangeName,commonExchangeType,true,false,null);// 5.创建队列// 队列,是否持久化,是否排他性,是否自动删除,自定义参数HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange","dlx.exchange");channel.queueDeclare(commonQueueName,true,false,false,arguments);// 6.交换机与队列绑定channel.queueBind(commonQueueName,commonExchangeName,commonRoutingKey);// 7.声明死信队列String dlxExchangeName = "dlx.exchange";String dlxExchangeType = "topic";String dlxQueueName    = "dlx.queue";String dlxRoutingKey   = "#";channel.exchangeDeclare(dlxExchangeName,dlxExchangeType,true,false,null);channel.queueDeclare(dlxQueueName,true,false,false,null);channel.queueBind(dlxQueueName,dlxExchangeName,dlxRoutingKey);// 4.创建消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息-"+new String(body));}};// 5. 消费数据  队列名,是否自动签收,消费者对象channel.basicConsume(commonQueueName,false,consumer);// 8.关闭连接//channel.close();//connection.close();}
    }// 生产端
    public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.创建信道Channel channel = connection.createChannel();String commonExchangeName = "commonExchange";String commonRoutingKey   = "common.#";// 4.发送数据 (String exchange, String routingKey, BasicProperties props, byte[] body)for (int i = 0; i < 5; i++) {String msg = "hello RabbitMq  DLX" + i;AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(commonExchangeName,commonRoutingKey,true,properties,msg.getBytes(StandardCharsets.UTF_8));}// 5.关闭连接//channel.close();//connection.close();}
    }
    

RabbitMQ消息中间件技术精讲(三)相关推荐

  1. RabbitMQ消息中间件技术精讲全集

    RabbitMQ消息中间件技术精讲 导航: RabbitMQ消息中间件技术精讲 一. 主流消息中间件介绍 1.1 ActiveMQ 1.2 Kafka 1.3 RocketMQ 1.4 RabbitM ...

  2. 老的消息中间件投递失败的类型值_RabbitMQ消息中间件技术精讲11 高级篇四 confirm 确认消息...

    RabbitMQ消息中间件技术精讲11 高级篇四 confirm 确认消息 理解Confirm消息确认机制: 消息的确认,是指生产者投递消息后,如果broker收到消息,则会给生产者一个应答: 生产者 ...

  3. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  4. [小森数据结构]看电影-顺序表技术精讲

    故事前研(言) 小森去看电影<<流浪地球2>>的时候买电影票的场景. 突然一个叫高启强的人买电影票 拉了两个Hei友,来看电影 <<流浪地球2>> 当场 ...

  5. 视频教程-华为路由交换精讲系列20:OSPF技术精讲 [肖哥]视频课程-华为认证

    华为路由交换精讲系列20:OSPF技术精讲 [肖哥]视频课程 肖老师(肖哥),思科认证讲师讲师(CCIE#27529),RedHat Linux认证讲师讲师,Juniper 认证讲师讲师,微软认证讲师 ...

  6. 技术精讲丨多线程环境下时间轮-海量定时任务的定时器设计

    多线程环境下海量定时任务处理-定时器设计 1.  定时器设计 2.  红黑树.最小堆以及跳表的实现对比 3.  时间轮的实现 视频讲解如下,点击观看: 技术精讲丨多线程环境下时间轮-海量定时任务的定时 ...

  7. Hadoop和大数据技术精讲班

    尊敬的先生/女士: 思数于计算和大数据服务中心http://www.bihadoop.com,简称思数于(隶属亍北京思数科技有限公司),是国内与业大数据分析培训.咨询机构.中国于计算大数据处理委员会. ...

  8. 【韭菜必学】每天学点技术—非常非常重要的BOLL指标精讲

    前面我们讲解了MACD RSI KDJ等技术指标,今天讲解一个更加重要的指标-BOLL(布林均线),可能需要花几节课来讲,希望大家认真学习,争取能自己灵活应用,不再做被割的韭菜. 首先我们来学习BOL ...

  9. MySQL精讲(一) |DQL数据查询语句全解析

    MySQL精讲系列文章(更新完毕) <MySQL入门必看知识> <MySQL精讲(一) | DQL数据查询语句全解析> <MySQL精讲(二) | DML数据操作语句全解 ...

最新文章

  1. python画图颜色表示大小变化_python画图(线条颜色、大小、类型:点、虚线等)(图文详细入门教程四)...
  2. TortoiseSvn
  3. mysql fetch next from_MySql 存储过程 动态sql
  4. MySQL InnoDB Memcached Plugin在Oray公司的实践
  5. 【NOI2001】炮兵阵地
  6. 硬件电路基础知识(30)---RS232、RS485、RS422、RJ45接口的区别
  7. 使用 Pig 进行数据分析
  8. 修改Maven默认编译级别
  9. 周界防护在安防行业重要性与日俱增
  10. 【图像去噪】基于马尔可夫随机场实现图像去噪附matlab代码
  11. SysML建模工具学习笔记总结一
  12. mysql数据库外键的作用
  13. 玩觅伊的女孩,都是一些什么样的人?
  14. Golang面试问题汇总
  15. 双光耦开关电源电路图_开关电源中的光耦经典电路设计分析
  16. 一个HTTP打趴80%面试者
  17. table thead tr设置表头背景色未完全覆盖的问题
  18. 解决RequestMapping写在类上页面跳转失败
  19. 【观察】华为云连续四年蝉联政务云第一,背后是“长跑选手”实力的印证
  20. 4.MyBatis源码解析-MyBatis扩展点--阿呆中二

热门文章

  1. 《惢客创业日记》2020.07.24(周五)向论语大师请教什么?
  2. 信号源提供不出电流--使用电压串联负反馈(同相比例运算电路)
  3. Jetson nano开机
  4. python爬虫之Scrapy爬取股票信息的示例
  5. Android自定义通知栏显示
  6. 7.3 从被动到主动
  7. 计算机专业在澳洲可以移民,澳洲IT专业好移民吗
  8. 善始者实繁 克终者盖寡
  9. 工业视觉中如何定量分析镜头光学性能
  10. html 手机端 应用程序,移动端Web开发