work queues也成为task queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理。此时,就恶意使用work模型,让多个消费者绑定到一个队列,共同消费队列的消息。消息队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

通过官方文档我们可以很直观的认识到这种模型结构,一个消费者发送多条消息至消息队列,对应的多个消费者同时工作,消费消息。
这种模型和我们之前提到的hello word直连简单模型非常相似,只是消费者从一个变成了多个,以此提高消息消费的效率。

此处省略虚拟主机用户等信息的创建、依赖以及工具类。
可参考:java实现rabbitmq简单队列模型,生产者 消费者 消息队列

  1. 创建生产者
    较之前的生产者类唯一变化的是同时发布多条消息,以便观察多个消费者消费消息的情况。
public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl",true,false,false,null);for (int i = 0; i < 20; i++) {channel.basicPublish("","wuwl",null,("info index :" + i).getBytes());}}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}
}
  1. 创建多个消费者
    消费者一:
public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}

消费者二:

public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}

此时的消费者一和消费者二完全一致

  1. 启动消费者
    将两个消费者先于生产者启动,等待消息的发送

  2. 启动生产者
    启动生产者,发送20条消息至消息队列

  3. 分析消费者
    消费者一:
    消费者二:

    消费者一与消费者二均匀地消费消息队列中的消息,即使两个消费者在消费消息的效率不一样,也是均匀消费。将消费者一加上线程睡眠延迟,可以发现两个消费者消费的信息数量不变,依旧是循环交替。这一点从官方的文档得到了证实。

    所以,work queues无法根据消费者的能力来分配消息,只能平均分派消息,如果因为某一消费者效率低下导致消息堆积,就会比较麻烦。这个主要是有消息确认机制决定的,默认地,消费者接收到消息即确认,队列中就会把消息移除,此时,消费者是否真的消费完成了消息是未知的。比如,其中一个消费者拿到了5条待处理的消息,只处理了其中2条,服务器即发生故障,丢失的3条消息无法找回。

修改消费者消息确认机制。
消费者一:

public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息  参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}

消费者二:

public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));//参数一:确认队列中的那个消息  参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}

改动一:channel.basicQos(1),限制队列向消费者最大消息发送量为1
改动二: channel.basicConsume("wuwl", false, new DefaultConsumer(channel),第二个参数改成false,取消消息自动确认
改动三:channel.basicAck(envelope.getDeliveryTag(),false)消息手动确认
改动四:Thread.sleep(10)消费者一消费一条休息休眠10毫秒,太长看不出来效果,被其它消费者抢先全部消费掉
改动五:将生产者消息发送数量改成100

运行效果:

消费者一消费了100条消息中的6条
消费者消费了其余的94条

此时实现了能者多劳,队列中的消息也不会出现未被真正消费而丢失,数据安全。

java实现rabbitmq任务模型(work queues), 生产者 消费者 消息队列 能者多劳相关推荐

  1. Java多线程之线程通信之生产者消费者阻塞队列版

    Java多线程之线程通信之生产者消费者传统版和阻塞队列版 目录 线程通信之生产者消费者传统版 线程通信之生产者消费者阻塞队列版 1. 线程通信之生产者消费者传统版 题目: 一个初始值为零的变量,两个线 ...

  2. java实现rabbitmq简单队列模型,生产者 消费者 消息队列

    生产者向队列发送消息,随机消费者从队列中接收消息 创建用户和虚拟主机 通过rabbitmq提供的用户管理界面可以很轻松的创建用户和虚拟主机,并且需要将用户绑定到对应的虚拟主机.自带有guest用户和/ ...

  3. java 读者写者_Java实现生产者消费者问题与读者写者问题详解

    1.生产者消费者问题 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两 ...

  4. 线程通信之生产者消费者阻塞队列版

    线程通信之生产者消费者阻塞队列版 ProdConsumer_BlockQueueDemo.java import java.util.concurrent.ArrayBlockingQueue; im ...

  5. rabbitmq怎样确认是否已经消费了消息_【朝夕专刊】RabbitMQ生产者/消费者消息确认...

    欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 上篇文章介绍了R ...

  6. 分布与并行计算—生产者消费者模型队列(Java)

    在生产者-消费者模型中,在原有代码基础上,把队列独立为1个类实现,通过公布接口,由生产者和消费者调用. public class Consumer implements Runnable {int n ...

  7. Java(二十二) -- 生产者消费者模式

    目录 生产者消费者模式 汉堡类 容器类 生产者 消费者 测试类 案例:多线程并发卖票 生产者消费者模式 在一个生产环境中,生产者和消费者在同一时间段内共享同一块缓冲区,生产者负责向缓冲区添加数据,消费 ...

  8. java多线程并发之旅-09-java 生产者消费者 Producer/Consumer 模式

    生产者消费者模式 在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产 ...

  9. java中synchronized同步锁实现生产者消费者模式

    synchronized介绍 一.基本概念 synchronized关键字是java里面用来在多线程环境下保证线程安全的同步锁:java里面有对象锁和类锁,对象锁是用在对象实例的方法上或者一个对象实例 ...

最新文章

  1. 记一次网站部署遇到的问题
  2. 三大运营商齐发力大数据
  3. 阅读redis源代码的一些体会
  4. PHP下socket编程
  5. poj2823 线段树模板题 点修改(也可以用单调队列)
  6. 文件上传测试 bugku
  7. 【转】补零与离散傅里叶变换的分辨率
  8. Linux 学习笔记_12_Windows与Linux文件共享服务_1.1_--Samba(下)Samba经典应用案例
  9. php开发uki引流脚本,UKI引流脚本
  10. mysql在jsp的导包语句_JSP+MYSQL中如何正确使用JDBC包?
  11. java生成和识别二维码
  12. python-图书管理系统4-最终完成界面代码文件
  13. 阿里ICON图标,使用教程
  14. (判断题)两台路由器之间转发的数据包一定不携带VLAN TAG?
  15. mac谷歌浏览器实现跨域
  16. Help Hanzo LightOJ - 1197(素数筛法)
  17. 用 Unity 进行网络游戏开发(一)
  18. docker安装kong和konga并简单使用
  19. AMM终极笔记——五大类无常损失解决方案
  20. python哪个方向工资高_深圳python工资高还是java

热门文章

  1. 如何在'纯'Swift中创建弱协议引用(不带@objc)
  2. win10如何解决浏览器出现“正在解析主机”的问题,很大原因是虚拟机,虚拟网卡,小米随身wifi导致的,DNS优选下载,
  3. 每个叶子节点(nil)是黑色。_填充每个节点的下一个右侧节点指针
  4. AT指令:AT+CMGF
  5. STM32:堆和栈(Heap Stack)及SRAM存储使用
  6. MTK:架构和消息机制(必看)
  7. java axis2 jar_Java axis2.jar包详解及缺少jar包错误分析
  8. 网页页面禁止用户复制源代码
  9. ❤️再也不用为了重写方法而苦恼了,Lombok帮你解决!
  10. unity3D-Gear VR字体由小变大效果