RabbitMQ(四) Work模式下的消息产生以及消费代码实现示例
在工作队列中,我们有多个消息的消费者,每个消费者都会进行消息消费,在默认情况下,RabbitMQ会进行消息轮询发送给每一个消费者,因此每个消费者处理的消息数量是一致的。下面直接看我们的主要文件代码
一、pom文件
我们只需要引入RabbitMQ的依赖包即可
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQ</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies></project>
二、消息生产者Producer
消息生产者代码与上一篇简单模式一样,步骤仍然是1创建获取连接,2创建渠道,3声明消息队列,4发送消息,5关闭连接步骤。
package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();//创建渠道Channel channel = connection.createChannel();//声明创建队列channel.queueDeclare(QUEUE_NAME,true,false, false, null);for (int i = 1; i <= 20; i++) {//发送消息String msg = "小兔子来了。。。。"+i;channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());}//释放资源(关闭渠道 以及连接)channel.close();connection.close();}
}
三、消息消费者代码:
在工作队列中我们使用了两个消息消费者进行处理消息。注意:此处我们分别使用两种不同的消息接收确认代码实现;
1,autoAck(ture) 一旦消息由服务端送达就作为消息接收确认,然后就会中消息队列中删除(Consumer1中实现);
2,autoAck(false)另一种我们设置为有消费客户端代码手动进行报送消息接收确认,如果没有接到到消息确认,MQ会将该消息重新进行入到消息队列中。然后重新在进行分发(Consumer2中实现)。
Consumer1 代码如下:
package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道final Channel channel = connection.createChannel();//声明消费队列channel.queueDeclare(Producer.QUEUE_NAME, true,false,false,null);//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================消费者1开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================消费者1结束==================");try {Thread.sleep(1000);
// channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.QUEUE_NAME,true,consumer);}}
Consumer2 代码如下:
package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道final Channel channel = connection.createChannel();//声明消费队列channel.queueDeclare(Producer.QUEUE_NAME, true,false,false,null);//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==============消费者2开始=======================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));try {Thread.sleep(1000);System.out.println("================消费者2 任务执行结束=====================");} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(envelope.getDeliveryTag(), false);}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.QUEUE_NAME,false,consumer);}
}
我们在启动了 两个消费端代码后,我们启动发送端代码:
场景1:我们正常的让程序执行。测试结果为两个消费端,依次进行处理发送的消息。Consumer1 执行 1,3,5,7,9....奇数的消息。Consumer2 执行2,4,6,8,10 偶数的消息内容。
场景2:我们在运行过程中 停止Consumer1的程序,则表现为 Consumer1 执行了 部分消息(如:只有1,3,5)之后,再无消息执行;而Consumer2仍还是只执行了2,4,6,8,10等偶数的消息。消息 7,9,11..等未被执行的消息则会丢失。
场景3:我们在运行过程中 停止Consumer2(手动反馈消息接收确认)的程序后,从打印可以看出,在Consumer2 中未被确认的消息,重进进入消息队列,全部由Consumer1 完成执行接收。打印结果如下:
Consumer1:
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:45
收到的消息为:小兔子来了。。。。1
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:46
收到的消息为:小兔子来了。。。。3
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:47
收到的消息为:小兔子来了。。。。5
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:48
收到的消息为:小兔子来了。。。。7
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:49
收到的消息为:小兔子来了。。。。9
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:50
收到的消息为:小兔子来了。。。。11
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:51
收到的消息为:小兔子来了。。。。13
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:52
收到的消息为:小兔子来了。。。。15
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:53
收到的消息为:小兔子来了。。。。17
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:54
收到的消息为:小兔子来了。。。。19
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:55
收到的消息为:小兔子来了。。。。10
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:56
收到的消息为:小兔子来了。。。。12
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:57
收到的消息为:小兔子来了。。。。14
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:58
收到的消息为:小兔子来了。。。。16
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:59
收到的消息为:小兔子来了。。。。18
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:60
收到的消息为:小兔子来了。。。。20
===================消费者1结束==================
Consumer2 :
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:1
收到的消息为:小兔子来了。。。。2
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:2
收到的消息为:小兔子来了。。。。4
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:3
收到的消息为:小兔子来了。。。。6
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:4
收到的消息为:小兔子来了。。。。8
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:5
收到的消息为:小兔子来了。。。。10Process finished with exit code -1
RabbitMQ(四) Work模式下的消息产生以及消费代码实现示例相关推荐
- LoRa模块E22-400T22S 四种模式下的电流分析和功耗评测
LoRa模块E22-400T22S 4种模式下的功耗评测 E22-400T22S是全新一代的LoRa无线模块,是由EBYTE(亿佰特)设计研发的,它基于SEMTECH公司SX1268射频芯片的无线串口 ...
- [原创] 域模式下的ASP.NET 发邮件代码
ASP.NET 下发邮件是个很把普通的功能,可是,在登陆域模式,在域组织的局域网中,有不同的写法.以前用163的邮箱发邮件,是在工作组模式下的.两种代码大致相同,但是域模式有 不同的地方 domain ...
- CDISC的ADaMIG (V1.2) 中英文对照【4】_第四章(下)实施问题,标准解决方案和示例
本AdaMIG (v1.2)来自CDISC官网以下链接: https://www.cdisc.org/standards/foundational/adam/adam-implementation-g ...
- SD卡在SPI模式下的初始化和详细的代码分析
SD卡在spi下的初始化: 1.初始化与SD卡链接的硬件条件(mcu的spi配置, IO口配置) 2.上电延时(>74个CLK) 3.复位卡(CMD0),进入idle状态 4.发送CMD8,检查 ...
- Angular开发模式下的编译器和运行时的代码比较
IDE里的index.html里的app-root: 在浏览器里打开后,能看到app-root下面的几个子节点:app-top-bar和router-outlet, 以及app-product-lis ...
- RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器
文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...
- RabbitMQ系列-顺序消费模式和迅速消息发送模式
MQ使用过程中,有些业务场景需要我们保证顺序消费,而如果一个Producer,一个Queue,多个Consumer的情况下是无法保证顺序的; 举例: 1.业务上产生三条消息,分别是对数据的增加.修改. ...
- 一篇文章学会RabbitMQ。SpringAMQP操作RabbitMQ。RabbitMQ五种模式及其代码实现。
目录 一.同步与异步调用: 一)同步调用: 二)异步调用: 三)使用建议: 四)MQ种类 二.SpringAMQP 1.导入依赖: 2.启动相关服务: 3.配置序列化: 三.Rabbit五种关系模式: ...
- RabbitMQ work quene 模式
直连模式的缺点 当生产者生产消息过快,消费者消费过慢的情况下,会造成消息的大量堆积.因此这个时候就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息.队列的消息一旦消费就不会存在, ...
最新文章
- java里冒泡排序编程案例_java冒泡排序小实例
- 2019年,人工智能要落地,更要小心被“摔死”
- 在Sql2005中,向表中插入数据时遇到uniqueidentifier列,如何插入数据?
- python requests返回值为200 但是text无内容_爬取高清无版权美图
- SAP UI5 应用开发教程之二十七 - SAP UI5 应用的单元测试工具 QUnit 介绍
- 实验二简化版C语言中文理解程序文法
- 【计算机网络复习 物理层】2.1.1 物理层基本概念
- linux下 java 文本_Java中如何将输入的信息写入文本中
- 基于ADS的c语言程序设计实验,实验一:基于ADS软件传输线理论仿真设计与分析.docx...
- 解决IDEA Maven项目无法下载依赖
- rtsp服务器如何低延时linux,web实现RTSP无插件低延迟播放方案整理
- UP及按照UP进行软件开发的流程
- datagrip 导出数据库备份到本地
- 解决复制项目后名称不改变的问题:org.eclipse.wst.common.component
- 计算几何-判断两条线段是否相交
- BIND rndc—使用说明
- webrtc2sip项目说明
- jbutton java_Java JButton
- ApacheCN 翻译活动进度公告 2019.5.31
- mount –o remount,rw /