Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化
Hello模式
在idea中新建一个空工程
设置项目
添加模块
选择模块类型
设置模块
在pom文件中导入jar包依赖
书写生产者代码:
public class HelloProduct {// 创建队列名称public static final String queue_name = "hello";//发消息public static void main(String[] args) throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//设置工厂ip 连接rabbitmq队列factory.setHost("192.168.14.131");//连接用户名factory.setUsername("admin");//密码factory.setPassword("admin");//创建连接Connection connection = factory.newConnection();//获取通道Channel channel = connection.createChannel();/** 生成一个队列* 1 队列名称* 2 队列里的消息是否持久化(存储到磁盘中) 默认情况下消息存储在内存中* 3 该队列是否只供一个消费者进行消费 是否进行消息共享 true:可以多个消费者共享 false:不可以多个消费者共享* 4 是否自动删除 最后一个消费者断开连接后 该队列是否自动删除 true自动删除 false不自动删除* 5 其他参数* */channel.queueDeclare(queue_name,false,false,false,null);//发消息String message = "hello world";//初次使用/** 发送一个消费* 1 发送到哪个交换机* 2 路由的key值是哪个 本次是队列的名称* 3 其他参数信息* 4 发送消息的消息体* */channel.basicPublish("",queue_name,null,message.getBytes());System.out.println("消息发送完毕");}}
启动运行:
在web端找到刚才发送的队列:
书写消费者:
public static void main(String[] args) throws Exception{//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置ipfactory.setHost("192.168.14.131");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("消费者开始消费消息");//声明 接收消息DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println(new String(message.getBody()));};//取消消息时的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息消费被中断");};/** 消费者消费* 1 消费哪个队列* 2 消费成功后是否要自动应答 true: 自动应答 false: 手动应答* 3 消费者未成功的消费得回调* 4 消费者取消消费的回调* */channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);}
}
运行消费者开始消费生产者生成的消息
由于在生产者和消费者中都需要建立连接工厂,代码重复,所以在这里可以将建立连接工厂的代码提取出来当做一个工具类来使用:
消息的轮询分配
创建工作线程队列:
public class Work1 {private static final String queue_name= "helllo";public static void main(String[] args) throws Exception{Channel channel = Rabbitmqutil.getChannel();//消息的接收DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("接收到的消息:"+new String(message.getBody()));};//消息接收被取消时 执行下面的内容CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消费者取消消费接口回调逻辑");};System.out.println("C1接收消息");channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);}
}
运行main方法
在这里插入图片描述
设置允许多线程启动
再次启动main方法
多线程启动成功
书写生产者代码:
public class WorkProduct {private static final String queue_name= "hello";public static void main(String[] args) throws Exception{Channel channel = Rabbitmqutil.getChannel();channel.queueDeclare(queue_name,false,false,false,null);System.out.println("请输入发送的消息:");//从控制台接收信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){System.out.println("请输入发送的消息:");/** 发送一个消息* 1 发送到哪交换机* 2 路由的key值是哪一个 本次是队列的名称* 3 其他参数信息* 4 发送消息的消息体 消息以二进制传输 这里获取到消息的二进制文件* */String message = scanner.next();channel.basicPublish("",queue_name,null,message.getBytes());}}
}
启动生产者,并在控制台中输入发送的4条消息:
A1,A2,A3,A4
根据消息的轮询消费,应该是C1消费A1和A3,C2消费A2和A4,查看结果是否符合预期:
符合预期,说明消息消费轮询验证成功
消息应答
Message acknowledgment(消息应答)
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
创建生产者:
* 消息在手动应答时是不丢失 放回队列中会重新消费**/public class Task {//队列名称public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception{//创建通道Channel channel = Rabbitmqutil.getChannel();//声明队列channel.queueDeclare(task_queue_name,false,false,false,null);//从控制台中输入信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}}
}
创建一个睡眠工具类,以便于直接对不同的消费者进行不同时间的睡眠执行:
public class SleepUtil {public static void sleep(int second){try{Thread.sleep(1000*second);}catch (InterruptedException e){Thread.currentThread().interrupt();}}
}
创建消费者work1,沉睡时间设置为1s:
public class Work1 {//队列名称public static final String task_queue_name = "ack_queue";//接收消息public static void main(String[] args) throws Exception{//创建通道Channel channel = Rabbitmqutil.getChannel();System.out.println("C1等待接收消息,处理时间短");DeliverCallback deliverCallback = (consumerTag,message)->{//沉睡1sSleepUtil.sleep(1);System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));//手动应答/** 1 消息的标记 tag* 2 是否批量应答 false :不批量应答通道中的消息 true:批量* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag,message)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");});}
}
创建消费者2,沉睡时间设置为30s:
public class Work2 {//队列名称public static final String task_queue_name = "ack_queue";//接收消息public static void main(String[] args) throws Exception{//创建通道Channel channel = Rabbitmqutil.getChannel();System.out.println("C2等待接收消息,处理时间长");DeliverCallback deliverCallback = (consumerTag,message)->{//沉睡1sSleepUtil.sleep(30);System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));//手动应答/** 1 消息的标记 tag* 2 是否批量应答 false :不批量应答通道中的消息 true:批量* */channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag,message)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");});}
}
首先执行生产者,由生产者首先创建队列:
再分别执行消费者1和消费者2:
由生产者发送消息:11和22
可以看到消费者1很快就接收到了消息:
消费者2在生产者发送22消息后30s才收到消息:
测试:
生产者发送两条消息,“33"和"44”,消费者1在1s后收到消息33,消费者2应该在30s后收到消息44,那么如果消费者2在30s内还没有收到消息时候出现宕机情况,那么消息44是否会丢失,还是会被其他的消费者(消费者1)所消费掉呢?
开始测试,生产者发送消息33和44
消费者1很快收到消息33
在消费者2还未收到消息44之前手动使其宕机
然后查看正常运行的消费者1,可以看到消费者1接收到消息44将其消费掉
由此可以得出结论,消息并未丢失,而是由其他消费者消费掉
Rabbitmq持久化方式
消息持久化和队列持久化的联系:
队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。
消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。
队列持久化
可以看到上面截图中ack_queue这个队列的Features属性为null,而其他下面的队列都有个D(durable),也就是该队列是非持久化的,一旦服务器中的mq宕机或者重启,该队列就会消失,这个时候我们就需要设置该队列持久化来防止mq宕机或者重启时该队列消失的问题
在我们前面创建的生产者task中将ack_queue队列更改为持久化队列
这个时候启动生产者会报错:
也就是说mq中该队列原先已经为非持久化,不可以直接更改为持久化,此时我们可以直接在mq中将该队列删除掉,然后重新创建该队列:
可以看到队列列表中已经没有该队列了:
重新启动生产者创建ack_queue队列:
消息持久化
队列发送消息一般存储在内存中,mq宕机和重启后消息也会消失,我们可以将消息进行持久化,将消息存储到磁盘中实现持久化
不公平分发
RabbitMQ分发消息默认采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大-部分时间处于空闲状态,而处理慢的那个消费者-直在干活, 这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。
我们可以设置参数channel.basicQos(1);
在两个消费者中都进行设置:
启动生产者和消费者进行发送消息测试:
沉睡时间短的消费者1消费消息多
沉睡时间长的消费者2消费消息少,遵循能者多劳规则
预取值
表示分发时候,每个消费者可以分配到多少条消息,预取值为多少该消费者通道中就可以堆积多少条消息供其消费
设置消费者1
设置消费者2:
分别启动生产者消费者并生产消息:
生产者发送20条消息:
消费者1:
由于消费者1处理消息快,在消费者2堆积满5条或者还未满5条时消费者1就已经消费完队列中堆积的一条消息,队列中空出一个位置还可以堆积,于是造成消费者1多消费消息
但是仍然可以看到消费者2中堆积了5条消息
最后在消费者2消费完堆积的5条消息中的一条而又空出来一个位置后队列中就又会存进去等待消费
Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化相关推荐
- MQ消息的自动应答和手动应答| RabbitMQ系列(三)
相关文章 RabbitMQ系列汇总:RabbitMQ系列 前言 开始消息应答之前先思考几个问题 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会 ...
- [RabbitMQ]消息应答概念_消息手动应答代码
消息应答 概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为 ...
- RabbitMQ之消息的自动应答、手动应答和消息持久化(Java开发)
1.消息的自动和手动应答 boolean autoAck = true;//消息自动应答 channel.basicConsume(WQ_QUEUE,autoAck,consumer); 默认情况下, ...
- RabbitMQ消息应答实战(针对自动|手动应答常见问题进行模拟)
消息应答概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除 ...
- RabbitMQ消息手动应答生产者
/** 消息在手动应答时是不丢失.放回队列中重新消费* */public class Task2 {// 队列名称public static final String TASK_QUEUE_NAME ...
- RabbitMQ常见问题解决方案——消息丢失、重复消费、消费乱序、消息积压
文章目录 背景 RabbitMQ常见问题解决方案 1. RabbitMQ的可靠性(消息丢失问题) 1.1 生产者丢失消息 1.2 RabbitMQ弄丢消息 1.2.1 交换机持久化 1.2.2 队列持 ...
- 2.RabbitMQ 的可靠性消息的发送
本篇包含 1. RabbitMQ 的可靠性消息的发送 2. RabbitMQ 集群的原理与高可用架构的搭建 3. RabbitMQ 的实践经验 上篇包含 1.MQ 的本质,MQ 的作用 2.R ...
- RabbitMQ总结(一)--消息队列RabbitMQ应答模式(自动、手动)
原文链接 消息队列RabbitMQ应答模式(自动.手动) 为了确保消息不会丢失,RabbitMQ支持消息应答.消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了.RabbitM ...
- RabbitMQ之手动应答消息(消息不丢失)
RabbitMQ之手动应答消息 1.为什么需要手动应答 当消费者完成一个任务需要一段时间,如果其中一个消费者处理一个长的任务并且只处理了部分突然他挂掉了,会发生什么情况.RabbitMQ一旦向消费者传 ...
最新文章
- Linux内核分析 - 网络[十六]:TCP三次握手
- mysql大规模读写性能_十招搞定 MySQL 大规模数据库的性能和伸缩性优化
- Android中TimePicker时间选择器的使用和获取选择的时和分
- DHCP option 150与option 66的区别
- java接口抽象方法_Java 接口 抽象类 抽象方法
- iOS 开发一定要尝试的 Texture(ASDK) 1
- abaqus在岩土工程中的应用_什么是岩土锚固,看看在深基坑支护中如何应用
- 用Docker搭建Laravel和Vue项目的开发环境
- FusionChartsFree的JSP标签开发
- 电子邮件营销的优势和劣势有哪些?哪些邮箱适合电子邮件营销?
- 什么软件可以搜C语言题答案,大学c语音搜题app
- 视频目标分割数据集DAVIS(Denly-Annotated VIdeo Segmentation)解读
- 【web前端期末大作业】简单的学生网页作业源码 基于html css javascript jquery技术设计的音乐网站(44页)
- pytest 测试框架学习(14):pytest.warns
- springboot毕设项目牙无忧6ayy4(java+VUE+Mybatis+Maven+Mysql)
- mPaaS 服务端核心组件:移动分析服务 MAS 架构解析
- 在线API文档、技术文档工具ShowDoc
- 华南理工大学php,华南理工大学网络教育平台v3
- 语言-英语翻译(edx-datascientist _A Very Short History Of Data Science)
- 素数问题 java_JAVA素数问题
热门文章
- jdk7和jdk8HashMap主要的区别
- Python学习关键tip记录
- Android 仿iphone提醒事项(一)
- matlab 图片选取区域,利用MATLAB截取一张复杂图片中想要的区域
- 数据中台数据分析过程梳理
- :I/O中断处理过程包括哪几个阶段?中断服务程序流程分为哪几部分?
- 大数据、云计算系统高级架构师课程学习路线图
- 构建 AWS AMI 镜像(EC2 Image Builder + Terraform)
- Python基础047:Pycharm debug时设置断点但是不起作用怎么回事?
- 大数据三篇论文—Google Bigtable中文版