在工作队列中,我们有多个消息的消费者,每个消费者都会进行消息消费,在默认情况下,RabbitMQ会进行消息轮询发送给每一个消费者,因此每个消费者处理的消息数量是一致的。下面直接看我们的主要文件代码

一、pom文件

我们只需要引入RabbitMQ的依赖包即可

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQ</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies></project>

二、消息生产者Producer

消息生产者代码与上一篇简单模式一样,步骤仍然是1创建获取连接,2创建渠道,3声明消息队列,4发送消息,5关闭连接步骤。

package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static final  String QUEUE_NAME = "work_queue";public  static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();//创建渠道Channel channel = connection.createChannel();//声明创建队列channel.queueDeclare(QUEUE_NAME,true,false, false, null);for (int i = 1; i <= 20; i++) {//发送消息String msg = "小兔子来了。。。。"+i;channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());}//释放资源(关闭渠道 以及连接)channel.close();connection.close();}
}

三、消息消费者代码:

在工作队列中我们使用了两个消息消费者进行处理消息。注意:此处我们分别使用两种不同的消息接收确认代码实现;

1,autoAck(ture) 一旦消息由服务端送达就作为消息接收确认,然后就会中消息队列中删除(Consumer1中实现);

2,autoAck(false)另一种我们设置为有消费客户端代码手动进行报送消息接收确认,如果没有接到到消息确认,MQ会将该消息重新进行入到消息队列中。然后重新在进行分发(Consumer2中实现)。

Consumer1 代码如下:

package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道final Channel channel = connection.createChannel();//声明消费队列channel.queueDeclare(Producer.QUEUE_NAME, true,false,false,null);//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==================消费者1开始===================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));System.out.println("===================消费者1结束==================");try {Thread.sleep(1000);
//                    channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.QUEUE_NAME,true,consumer);}}

Consumer2 代码如下:

package com.xiaohui.rabbitmq.work;import com.rabbitmq.client.*;
import com.xiaohui.rabbitmq.utils.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Cunsumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建消费端链接Connection connection = ConnectionUtils.getConnection();//创建消费端渠道final Channel channel = connection.createChannel();//声明消费队列channel.queueDeclare(Producer.QUEUE_NAME, true,false,false,null);//监听消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==============消费者2开始=======================");System.out.println("路由的key为:"+envelope.getRoutingKey());System.out.println("交换机为:"+envelope.getExchange());System.out.println("消息ID为:"+envelope.getDeliveryTag());System.out.println("收到的消息为:"+new String(body,"UTF-8"));try {Thread.sleep(1000);System.out.println("================消费者2 任务执行结束=====================");} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(envelope.getDeliveryTag(), false);}}};/*** 第二个参数表示是否 向mqserver自动回复收到* 第三个参数表示消息回调*/channel.basicConsume(Producer.QUEUE_NAME,false,consumer);}
}

我们在启动了 两个消费端代码后,我们启动发送端代码:

场景1:我们正常的让程序执行。测试结果为两个消费端,依次进行处理发送的消息。Consumer1 执行 1,3,5,7,9....奇数的消息。Consumer2 执行2,4,6,8,10 偶数的消息内容。

场景2:我们在运行过程中 停止Consumer1的程序,则表现为 Consumer1 执行了 部分消息(如:只有1,3,5)之后,再无消息执行;而Consumer2仍还是只执行了2,4,6,8,10等偶数的消息。消息 7,9,11..等未被执行的消息则会丢失。

场景3:我们在运行过程中 停止Consumer2(手动反馈消息接收确认)的程序后,从打印可以看出,在Consumer2 中未被确认的消息,重进进入消息队列,全部由Consumer1 完成执行接收。打印结果如下:

Consumer1:

==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:45
收到的消息为:小兔子来了。。。。1
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:46
收到的消息为:小兔子来了。。。。3
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:47
收到的消息为:小兔子来了。。。。5
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:48
收到的消息为:小兔子来了。。。。7
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:49
收到的消息为:小兔子来了。。。。9
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:50
收到的消息为:小兔子来了。。。。11
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:51
收到的消息为:小兔子来了。。。。13
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:52
收到的消息为:小兔子来了。。。。15
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:53
收到的消息为:小兔子来了。。。。17
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:54
收到的消息为:小兔子来了。。。。19
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:55
收到的消息为:小兔子来了。。。。10
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:56
收到的消息为:小兔子来了。。。。12
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:57
收到的消息为:小兔子来了。。。。14
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:58
收到的消息为:小兔子来了。。。。16
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:59
收到的消息为:小兔子来了。。。。18
===================消费者1结束==================
==================消费者1开始===================
路由的key为:work_queue
交换机为:
消息ID为:60
收到的消息为:小兔子来了。。。。20
===================消费者1结束==================

Consumer2 :

==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:1
收到的消息为:小兔子来了。。。。2
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:2
收到的消息为:小兔子来了。。。。4
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:3
收到的消息为:小兔子来了。。。。6
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:4
收到的消息为:小兔子来了。。。。8
================消费者2 任务执行结束=====================
==============消费者2开始=======================
路由的key为:work_queue
交换机为:
消息ID为:5
收到的消息为:小兔子来了。。。。10Process finished with exit code -1

RabbitMQ(四) Work模式下的消息产生以及消费代码实现示例相关推荐

  1. LoRa模块E22-400T22S 四种模式下的电流分析和功耗评测

    LoRa模块E22-400T22S 4种模式下的功耗评测 E22-400T22S是全新一代的LoRa无线模块,是由EBYTE(亿佰特)设计研发的,它基于SEMTECH公司SX1268射频芯片的无线串口 ...

  2. [原创] 域模式下的ASP.NET 发邮件代码

    ASP.NET 下发邮件是个很把普通的功能,可是,在登陆域模式,在域组织的局域网中,有不同的写法.以前用163的邮箱发邮件,是在工作组模式下的.两种代码大致相同,但是域模式有 不同的地方 domain ...

  3. CDISC的ADaMIG (V1.2) 中英文对照【4】_第四章(下)实施问题,标准解决方案和示例

    本AdaMIG (v1.2)来自CDISC官网以下链接: https://www.cdisc.org/standards/foundational/adam/adam-implementation-g ...

  4. SD卡在SPI模式下的初始化和详细的代码分析

    SD卡在spi下的初始化: 1.初始化与SD卡链接的硬件条件(mcu的spi配置, IO口配置) 2.上电延时(>74个CLK) 3.复位卡(CMD0),进入idle状态 4.发送CMD8,检查 ...

  5. Angular开发模式下的编译器和运行时的代码比较

    IDE里的index.html里的app-root: 在浏览器里打开后,能看到app-root下面的几个子节点:app-top-bar和router-outlet, 以及app-product-lis ...

  6. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  7. RabbitMQ系列-顺序消费模式和迅速消息发送模式

    MQ使用过程中,有些业务场景需要我们保证顺序消费,而如果一个Producer,一个Queue,多个Consumer的情况下是无法保证顺序的; 举例: 1.业务上产生三条消息,分别是对数据的增加.修改. ...

  8. 一篇文章学会RabbitMQ。SpringAMQP操作RabbitMQ。RabbitMQ五种模式及其代码实现。

    目录 一.同步与异步调用: 一)同步调用: 二)异步调用: 三)使用建议: 四)MQ种类 二.SpringAMQP 1.导入依赖: 2.启动相关服务: 3.配置序列化: 三.Rabbit五种关系模式: ...

  9. RabbitMQ work quene 模式

    直连模式的缺点 当生产者生产消息过快,消费者消费过慢的情况下,会造成消息的大量堆积.因此这个时候就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息.队列的消息一旦消费就不会存在, ...

最新文章

  1. java里冒泡排序编程案例_java冒泡排序小实例
  2. 2019年,人工智能要落地,更要小心被“摔死”
  3. 在Sql2005中,向表中插入数据时遇到uniqueidentifier列,如何插入数据?
  4. python requests返回值为200 但是text无内容_爬取高清无版权美图
  5. SAP UI5 应用开发教程之二十七 - SAP UI5 应用的单元测试工具 QUnit 介绍
  6. 实验二简化版C语言中文理解程序文法
  7. 【计算机网络复习 物理层】2.1.1 物理层基本概念
  8. linux下 java 文本_Java中如何将输入的信息写入文本中
  9. 基于ADS的c语言程序设计实验,实验一:基于ADS软件传输线理论仿真设计与分析.docx...
  10. 解决IDEA Maven项目无法下载依赖
  11. rtsp服务器如何低延时linux,web实现RTSP无插件低延迟播放方案整理
  12. UP及按照UP进行软件开发的流程
  13. datagrip 导出数据库备份到本地
  14. 解决复制项目后名称不改变的问题:org.eclipse.wst.common.component
  15. 计算几何-判断两条线段是否相交
  16. BIND rndc—使用说明
  17. webrtc2sip项目说明
  18. jbutton java_Java JButton
  19. ApacheCN 翻译活动进度公告 2019.5.31
  20. mount –o remount,rw /

热门文章

  1. vue-cli4.0+Echarts 3D
  2. Mybatis调用oracle 存储过程
  3. ai直线怎么变折线_如何在 AI中设计可编辑折线图
  4. 数据结构与算法简单总结()
  5. String 属于基础的数据类型吗?
  6. c语言全国计算机真题及答案,全国计算机C语言考试真题及答案.doc
  7. java调用FFmpeg及mencoder转换视频为FLV并截图
  8. 【云栖大会】阿里云未来走势 看当家的怎么说?
  9. SQL Server 分离与附加数据库
  10. ArcGIS Server 10.1发布结果地图服务——与10.0的区别及过程