相关文章

RabbitMQ系列汇总:RabbitMQ系列


前言

  • 开始消息应答之前先思考几个问题
  • 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况?
  • RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。
  • 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是
    • 消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

一、自动应答

  • 消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了。
  • 当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死。
  • 所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。

①、生产者

  • /*** 这是一个测试的生产者*@author DingYongJun*@date 2021/8/1*/
    public class DyProducerTest_xiaoxiyingda {/*** 这里为了方便,我们使用main函数来测试* 纯属看你个人选择* @param args*/public static void main(String[] args) throws Exception{//使用工具类来创建通道Channel channel = RabbitMqUtils.getChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/for (int i=0;i<6;i++){String message="我是生产者,我告诉你一个好消息!"+i;Thread.sleep( 1000 );channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());System.out.println("消息发送完毕");}}}
    

②、消费者

  • /*** 这是一个测试的消费者*@author DingYongJun*@date 2021/8/1*/
    public class DyConsumerTest_xiaoxiyingda01 {public static void main(String[] args) throws Exception{//使用工具类来创建通道Channel channel = RabbitMqUtils.getChannel();System.out.println("我是消费者A,我在等待接收消息!");DeliverCallback deliverCallback = (String var1, Delivery var2)->{String message= new String(var2.getBody());System.out.println(message);};CancelCallback cancelCallback = (String var1)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/Thread.sleep(1000);channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,true,deliverCallback,cancelCallback);}
    }
    

③、测试

  • 执行结果

  • 结论

    •     public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {return this.basicConsume(queue, autoAck, "", this.consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));}
      
    • 当autoAck为true时,为自动应答

    • 当autoAck为false时,为手动应答

  • 不建议使用自动应答,实际业务场景中,一般我们使用手动应答。

二、手动应答

  • 生产者

    • /*** 这是一个测试的生产者*@author DingYongJun*@date 2021/8/1*/
      public class DyProducerTest_xiaoxiyingda {/*** 这里为了方便,我们使用main函数来测试* 纯属看你个人选择* @param args*/public static void main(String[] args) throws Exception{//使用工具类来创建通道Channel channel = RabbitMqUtils.getChannel();/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null);/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes());System.out.println("生产者发出消息" + message);}}}
      
  • 消费者A

    • /*** 这是一个测试的消费者*@author DingYongJun*@date 2021/8/1*/
      public class DyConsumerTest_xiaoxiyingda01 {public static void main(String[] args) throws Exception{//使用工具类来创建通道Channel channel = RabbitMqUtils.getChannel();System.out.println("我是消费者A,我在等待接收消息!");DeliverCallback deliverCallback = (String var1, Delivery var2)->{String message= new String(var2.getBody());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(message);//true 代表批量应答 channel 上未应答的消息  false 单条应答boolean multiple = false;channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);};CancelCallback cancelCallback = (String var1)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);}
      }
      
    • 设置手动应答,设置休眠时间较短,表示处理业务非常快。

  • 消费者B

    • /*** 这是一个测试的消费者*@author DingYongJun*@date 2021/8/1*/
      public class DyConsumerTest_xiaoxiyingda02 {public static void main(String[] args) throws Exception{//使用工具类来创建通道Channel channel = RabbitMqUtils.getChannel();System.out.println("我是消费者B,我在等待接收消息!");DeliverCallback deliverCallback = (String var1, Delivery var2)->{String message= new String(var2.getBody());try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(message);//true 代表批量应答 channel 上未应答的消息  false 单条应答boolean multiple = false;channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple);};CancelCallback cancelCallback = (String var1)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback);}
      }
      
    • 设置手动应答,设置休眠时间较长,表示处理业务较慢。

  • 执行顺序

    • 1、控制台分别发送两条消息
    • 2、理论上AB各一条消息,如果这个时候B还未处理完就断开连接了,那么消息怎么办?
  • 执行结果

三、总结

  • multiple 的 true 和 false 代表不同意思

    • ture 表示批量应答,是以通道为单位的,比如你一个完成了,我就会应答全部完成。效率高,但是不安全。
    • false表示单个应答,完成一个我应答一个。较为安全靠谱。但是效率较低。
  • 消息自动重新入队

    • 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。
    • 如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。

路漫漫其修远兮,吾必将上下求索~

如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah

MQ消息的自动应答和手动应答| RabbitMQ系列(三)相关推荐

  1. RabbitMQ之消息的自动应答、手动应答和消息持久化(Java开发)

    1.消息的自动和手动应答 boolean autoAck = true;//消息自动应答 channel.basicConsume(WQ_QUEUE,autoAck,consumer); 默认情况下, ...

  2. 各个MQ消息队列介绍以及区别比较(RabbitMq ActiveMQ、ZeroMQ、Kafka)

    首先,MQ其实就是消息队列,队列我们可以理解为管道,以管道的方式做消息传递. 在本篇博客中,我们先来简单学习一下几种MQ,之后对他们进行对比. ActiveMQ.RabbitMQ.kafka.Rock ...

  3. php监听mq消息,客户端监听服务端获取rabbitmq消息队列,rabbitmq有消息的时候客户端刷新页面才能获取到消息,监听没起到作用,请求各位大神指点迷津...

    header("Content-Type:text/html;charset=utf-8"); use Workerman\Worker; require_once __DIR__ ...

  4. RabbitMQ消息应答实战(针对自动|手动应答常见问题进行模拟)

    消息应答概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除 ...

  5. Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化

    Hello模式 在idea中新建一个空工程 设置项目 添加模块 选择模块类型 设置模块 在pom文件中导入jar包依赖 书写生产者代码: public class HelloProduct {// 创 ...

  6. SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

    目录 1.环境搭建 2.队列模式 3.发布订阅模式 4.路由模式 5.主题模式 6.消息手动应答机制 7.回调函数-确认机制(发布确认模式) 1.环境搭建 引入pom: <!-- rabbitM ...

  7. [RabbitMQ]消息应答概念_消息手动应答代码

    消息应答 概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为 ...

  8. RabbitMQ之手动应答消息(消息不丢失)

    RabbitMQ之手动应答消息 1.为什么需要手动应答 当消费者完成一个任务需要一段时间,如果其中一个消费者处理一个长的任务并且只处理了部分突然他挂掉了,会发生什么情况.RabbitMQ一旦向消费者传 ...

  9. RabbitMQ消息手动应答生产者

    /** 消息在手动应答时是不丢失.放回队列中重新消费* */public class Task2 {// 队列名称public static final String TASK_QUEUE_NAME ...

最新文章

  1. 网站抓取频率对排名优化有着怎样的意义?
  2. linux暂停线程和恢复,是否有可能在Linux [暂停]中检测到线程已进行上下文切换?...
  3. JMeter:报错(Content type 'text/plain;charset=UTF-8' not supported)
  4. C#的foreach
  5. 自动化测试工具selenium python_Selenium自动化测试工具使用方法汇总
  6. Hive的数据模型-分区表
  7. Covariance and Contravariance in C#, Part One
  8. 一个简单的PHP Web论坛
  9. 为什么说黄桃罐头是东北的人参果
  10. 【读书笔记】【独立思考】2018-03-14
  11. 自动档汽车正确的操作方法和习惯---请教贴
  12. ZOJ 3761 Easy billiards 月赛E DFS
  13. xp-80c打印机无法打印_6种方法解决打印机无法打印问题
  14. Quartus与modelsim的初级使用教程
  15. Python黑帽子——通过Paramiko使用SSH
  16. Hive 官网函数全列表(聚合函数/日期函数/字符串函数...)
  17. 网络攻击与防御-常用网络命令的使用
  18. 为什么阿里那么难进,原来精髓在这
  19. 2.从键盘输入自己的姓名拼音(用小写输入),将首字母变为大写后在屏幕上输出。有没有汇编方面的大神教教我,我现在还是个新手
  20. 深度学习AI美颜系列——人像静态/动态贴纸特效算法实现 | CSDN博文精选

热门文章

  1. 人生中第一个自制游戏
  2. 设计模式之美笔记11
  3. 什么是end-to-end神经网络?
  4. 一天一道算法题——799. 香槟塔
  5. title啥意思?网址title题目书写
  6. MSTSC连接不到的问题
  7. 我的世界java版区块显示_我的世界手游区块显示指令分享:区块玩法操作详解[多图]...
  8. 远控NanoCore RAT样本分析
  9. 【dotnet】dotnet命令创建一个console
  10. 酷开系统8无界空间,打动用户的正确姿势