RabbitMQ——work queue
工作队列
包含一个生产者和多个消费者的MQ。生产者发送消息非常轻松,消费者和业务结合,需要花费时间。
下面介绍一种轮询方式的工作队列:
首先定义生产者:
public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);for(int i = 0; i<50; i++){String message = "work+"+i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());Thread.sleep(i*20);}}
}
生产50条消息,给下面的两个消费者进行食用:
消费者One(这里我们消费一条消息就sleep 1s)
public class ReceiveOne {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");try {Thread.sleep(1000); } catch (InterruptedException e) {e.printStackTrace();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
消费者Two (sleep 2s)
public class ReceiveTwo {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerTwo is] Received '" + message + "'");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
下面我们看两个控制台的输出:
ReceiveOne
ReceiveTwo
事实证明,设置两个不同的sleep时间,并没有影响两个消费者轮流消费,这种方式称之为轮询。
这样看起来不太合理,消费快的消费者理应多消费一些,所谓“能者多劳”,为了解决这个问题,下面我们引入了公平分发:
公平分发,它是由消费者主动发送ACK应答,告诉生产者:“我已经消费完了,快点给我下一个”,之前的都是,自动发送ACK,并非手动的。下面我们实现一下手动的。
只需要在前面的代码修改部分即可:
发送者:
public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消费者发送确认消息(ACK)之前,只发送一个消息给你channel.basicQos(1);for(int i = 0; i<50; i++){String message = "work+"+i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());Thread.sleep(i*20);}}
}
消费者1:
public class ReceiveOne {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1); //需要修改DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [ConsumerOne is] Received '" + message + "'");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); //这里是手动应答}};boolean autoAck = false; //这里需要修改至手动应答 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}
}
同理2也是一样的,这里就不再重复了。
很明显,消费者1的处理速度比消费者2要快,因为消费者1的sleep时间较短,这就和上面的轮询立判高下了,体现了能者多劳。
下面我们有一个问题,我们设置 boolean autoAck = false;
这个是有隐患的,假设我们我的消费者1挂了,而消息已经发送过去了,这时候,我们该怎么办?这时候这个消息是丢失的。为了应对这种情况,我们可以设置autoack = true
,能解决问题。但是我们又想公平分发,这就有点麻烦了,怎么解决这个问题,暂时没有想法,容我继续学习。
还有一个问题,如果RabbitMQ挂了,这时候怎么办?这里我们就要设置另外一个参数了,那就是durable
,表示可持久化。
在声明队列的时候,就要设置好,等队列已经存在了再设置的话,就会报错。这点注意,如果已经报错了,就删除队列重新声明。
boolean durable = false; //声明数据可以持久化channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
RabbitMQ——work queue相关推荐
- RabbitMQ 声明Queue时的参数们的Power
RabbitMQ 声明Queue时的参数们的Power 参数们的Power 在声明队列的时候会有很多的参数 public static QueueDeclareOk QueueDeclare(this ...
- 【原创】RabbitMQ之Queue属性测试
2019独角兽企业重金招聘Python工程师标准>>> 常用queue属性 在 rabbitmq-c代码中可以看到如下代码 上图所示为queue声明时使用的结构体.其中最容易让使用者 ...
- RabbitMq队列 queue
目录 RabbitMq队列 消息确认机制 负载均衡 生产者代码 消费者1 消费者2 RabbitMq队列 在上篇文章中讲了mq的队列,这篇用代码实现.在例子中存在一个生产者,和两个消费者.生产者将生产 ...
- rabbitmq监控queue中message数量
2019独角兽企业重金招聘Python工程师标准>>> 头儿让整一个jar包干这活,学了一下rabbitmq,但这东西中文文档很少,就认真读了读官方的英文文档.官方文档提供了两种方法 ...
- RabbitMQ的Queue详解;
一.前言 Queue(队列)是RabbitMQ的内部对象,用于存储消息队列,并将它们转发给消费者: 二.Queue队列 队列跟交换机共享某些属性,但是队列也有一些另外的属性 Name:队列的名称 Du ...
- RabbitMQ Exchange Queue RoutingKey BindingKey解析
许多新手在刚接触RabbitMQ的时候,会被各种名词弄晕,包括ConnectionFactory .Connection .Channel.Exchange.Queue.RoutingKey.Bind ...
- RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失
1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...
- Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy
Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度 ...
- RabbitMQ(一):RabbitMQ快速入门
RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要对RabbitMQ有所了解,本文是RabbitMQ快速入门文章,主要内容包括Rab ...
最新文章
- 车牌识别的分类器文件目录
- Android中的那些权限
- mysql系统搭建互备DB(双主)记录
- iText简介(转)
- 设计师找灵感?集设用作品打动世界的窗口
- TZOJ 4813 机器翻译(模拟数组头和尾)
- linux删除目录下文件的几种方法
- 深度学习13-cnn介绍(卷积神经网络简介)
- 【论文阅读】xgboost
- python模拟别人说话的声音_现在你可以通过深度学习用别人的声音来说话了
- cocos creator播放声音控制台显示Simulator: jsb: ERROR
- java解析HL7协议报文工具(v24版)
- jvm参数调优_3_问题排查
- 四轴无人机动力学模型
- 大家都在用的视频音频提取器,免费用!
- PDF文件在线预览之pdf.js
- java编程——吸血鬼数字(四位)
- 网络连接成功,但浏览器显示网络未连接
- 一个很好的省市县三级联动js文件,使用很方便
- Root你的设备(二)