工作队列


包含一个生产者和多个消费者的MQ。生产者发送消息非常轻松,消费者和业务结合,需要花费时间。
下面介绍一种轮询方式的工作队列:

首先定义生产者:

public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i<50; i++){String message = "work+"+i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());Thread.sleep(i*20);}}
}

生产50条消息,给下面的两个消费者进行食用:

消费者One(这里我们消费一条消息就sleep 1s

public class ReceiveOne {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");try {Thread.sleep(1000);   } catch (InterruptedException e) {e.printStackTrace();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

消费者Two (sleep 2s)

public class ReceiveTwo {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerTwo is] Received '" + message + "'");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

下面我们看两个控制台的输出:

ReceiveOne

ReceiveTwo

事实证明,设置两个不同的sleep时间,并没有影响两个消费者轮流消费,这种方式称之为轮询。

这样看起来不太合理,消费快的消费者理应多消费一些,所谓“能者多劳”,为了解决这个问题,下面我们引入了公平分发:

公平分发,它是由消费者主动发送ACK应答,告诉生产者:“我已经消费完了,快点给我下一个”,之前的都是,自动发送ACK,并非手动的。下面我们实现一下手动的。

只需要在前面的代码修改部分即可:

发送者:

public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消费者发送确认消息(ACK)之前,只发送一个消息给你channel.basicQos(1);for(int i = 0; i<50; i++){String message = "work+"+i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());Thread.sleep(i*20);}}
}

消费者1:

public class ReceiveOne {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);  //需要修改DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);  //这里是手动应答}};boolean autoAck = false;  //这里需要修改至手动应答 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}
}

同理2也是一样的,这里就不再重复了。


很明显,消费者1的处理速度比消费者2要快,因为消费者1的sleep时间较短,这就和上面的轮询立判高下了,体现了能者多劳。


下面我们有一个问题,我们设置 boolean autoAck = false;这个是有隐患的,假设我们我的消费者1挂了,而消息已经发送过去了,这时候,我们该怎么办?这时候这个消息是丢失的。为了应对这种情况,我们可以设置autoack = true,能解决问题。但是我们又想公平分发,这就有点麻烦了,怎么解决这个问题,暂时没有想法,容我继续学习。

还有一个问题,如果RabbitMQ挂了,这时候怎么办?这里我们就要设置另外一个参数了,那就是durable,表示可持久化。

在声明队列的时候,就要设置好,等队列已经存在了再设置的话,就会报错。这点注意,如果已经报错了,就删除队列重新声明。

 boolean durable = false; //声明数据可以持久化channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

RabbitMQ——work queue相关推荐

  1. RabbitMQ 声明Queue时的参数们的Power

    RabbitMQ 声明Queue时的参数们的Power 参数们的Power 在声明队列的时候会有很多的参数 public static QueueDeclareOk QueueDeclare(this ...

  2. 【原创】RabbitMQ之Queue属性测试

    2019独角兽企业重金招聘Python工程师标准>>> 常用queue属性 在 rabbitmq-c代码中可以看到如下代码 上图所示为queue声明时使用的结构体.其中最容易让使用者 ...

  3. RabbitMq队列 queue

    目录 RabbitMq队列 消息确认机制 负载均衡 生产者代码 消费者1 消费者2 RabbitMq队列 在上篇文章中讲了mq的队列,这篇用代码实现.在例子中存在一个生产者,和两个消费者.生产者将生产 ...

  4. rabbitmq监控queue中message数量

    2019独角兽企业重金招聘Python工程师标准>>> 头儿让整一个jar包干这活,学了一下rabbitmq,但这东西中文文档很少,就认真读了读官方的英文文档.官方文档提供了两种方法 ...

  5. RabbitMQ的Queue详解;

    一.前言 Queue(队列)是RabbitMQ的内部对象,用于存储消息队列,并将它们转发给消费者: 二.Queue队列 队列跟交换机共享某些属性,但是队列也有一些另外的属性 Name:队列的名称 Du ...

  6. RabbitMQ Exchange Queue RoutingKey BindingKey解析

    许多新手在刚接触RabbitMQ的时候,会被各种名词弄晕,包括ConnectionFactory .Connection .Channel.Exchange.Queue.RoutingKey.Bind ...

  7. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  8. Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

    Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...

  9. RabbitMQ(一):RabbitMQ快速入门

    RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要对RabbitMQ有所了解,本文是RabbitMQ快速入门文章,主要内容包括Rab ...

最新文章

  1. 车牌识别的分类器文件目录
  2. Android中的那些权限
  3. mysql系统搭建互备DB(双主)记录
  4. iText简介(转)
  5. 设计师找灵感?集设用作品打动世界的窗口
  6. TZOJ 4813 机器翻译(模拟数组头和尾)
  7. linux删除目录下文件的几种方法
  8. 深度学习13-cnn介绍(卷积神经网络简介)
  9. 【论文阅读】xgboost
  10. python模拟别人说话的声音_现在你可以通过深度学习用别人的声音来说话了
  11. cocos creator播放声音控制台显示Simulator: jsb: ERROR
  12. java解析HL7协议报文工具(v24版)
  13. jvm参数调优_3_问题排查
  14. 四轴无人机动力学模型
  15. 大家都在用的视频音频提取器,免费用!
  16. PDF文件在线预览之pdf.js
  17. java编程——吸血鬼数字(四位)
  18. 网络连接成功,但浏览器显示网络未连接
  19. 一个很好的省市县三级联动js文件,使用很方便
  20. Root你的设备(二)

热门文章

  1. 在 CentOS 5.4 下编译安装MySQL时
  2. 通过Rancher部署并扩容Kubernetes集群基础篇一
  3. Dynamic CRM 2013学习笔记(四十二)流程5 - 实时/同步工作流(Workflow)用法图解...
  4. Windows下svn服务器安装
  5. 转载:jQuery 1.3.3 新功能
  6. 配置Apache虚拟机
  7. 创新性应用深度学习,IBM在语音识别领域取得了里程碑式突破
  8. JS 设计模式 一(接口)
  9. C++学习笔记——虚函数
  10. WebSocket科普