实际使用RabbitMQ过程中,如果完全不配置QoS,这样Rabbit会尽可能快速地

发送队列中的所有消息到client端。因为consumer在本地缓存所有的message,

从而极有可能导致OOM或者导致服务器内存不足影响其它进程的正常运行。所以我们

需要通过设置Qos的prefetch count来控制consumer的流量。同时设置得当也会提高consumer的吞吐量。

prefetch与消息投递

prefetch允许为每个consumer指定最大的unacked messages数目。简单来说就是用来指定一个consumer一次可以从Rabbit中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。一旦缓冲区满了,Rabbit将会停止投递新的message到该consumer中直到它发出ack。

假设prefetch值设为10,共有两个consumer。意味着每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

总的来说,consumer负责不断处理消息,不断ack,然后只要unacked数少于prefetch * consumer数目,broker就不断将消息投递过去。

如何设置

官方提供的java client可以通过channel来设置:

channel = connection.createChannel();

channel.basicQos(prefetch);

spring-amqp的话可通过配置文件来配置

这里需要注意的是,spring-amqp中的prefetch默认值是250。

客户端源码剖析

官方Java客户端提供了DefaultConsumer和QueueingConsumer两种类来从queue中获取消息。 其中QueueingConsumer内部维护了一个阻塞队列BlockingQueue,此队列就是用来缓存从queue获取的message。当调用 channel.basicConsume后,broker就会不断往consumer投递message,直到prefetch条。

初始化的时候,如果不指定BlockingQueue的长度,默认值会设为Integer.MAX_VALUE,所以这就解释了文章开头所说的如果不设置Qos的话为什么会有可能导致OOM,因为此时BlockingQueue会不断膨胀,消耗内存。所以设置了prefetch后,建议BlockingQueue的长度(capacity)也初始化为prefetch。

另外需要注意的是,在调用channel.basicConsume之后,consumer是通过异步方式来抓取message的,通过debug可以发现BlockingQueue的size是在异步地不断增长直到prefetch。而客户端代码可以通过consumer.nextDelivery()或consumer.nextDelivery(long timeout)方法来获取message,其对应的就是BlockingQueue的take()和poll(long timeout)方法。

再来看看spring-amqp的comsumer,大致也一样。核心类BlockingQueueConsumer

public class BlockingQueueConsumer {

private final BlockingQueue queue;

//some code

...

public BlockingQueueConsumer(ConnectionFactory connectionFactory,

MessagePropertiesConverter messagePropertiesConverter,

ActiveObjectCounter activeObjectCounter, AcknowledgeMode acknowledgeMode,

boolean transactional, int prefetchCount, boolean defaultRequeueRejected,

Map consumerArgs, boolean exclusive, String... queues) {

//... some code

this.queue = new LinkedBlockingQueue(prefetchCount);

}

BlockingQueueConsumer的构造函数清楚说明了每个消费者内部的队列大小就是prefetch的大小。

吞吐量、延迟

prefetch并不是说设置得越大越好。过大可能导致consumer处理不过来,一直在本地缓存的BlockingQueue里呆太久,这样消息在客户端的延迟就大大增加;而对于多个consumer的情况,则会分配不均匀,导致有些consumer一直在忙,有些则非常空闲。

然而设置的过小,又会令到consumer不能充分工作,因为我们总想它100%的时间都是处于繁忙状态,而这时可能会在处理完一条消息后,BlockingQueue为空,因为新的消息还未来得及到达,所以consumer就处于空闲状态了。

prefetch应该设置多大,具体可参考这篇文章

里面详细论述吞吐量与prefetch之间的关系。prefetch的设置与以下几点有关:

客户端服务端之间网络传输时间

consumer消耗一条消息所执行的业务逻辑的耗时

网络状况

【完】

如有纰漏,欢迎指出

参考资料:

php rabbitmq qos,RabbitMQ之Qos prefetch相关推荐

  1. RabbitMQ有关限流QOS的理解

    RabbitMQ有关限流QOS的理解 在我的理解中rabbitmq的qos设置对于我们mq队列的速度和性能方面有一定的影响 假如消费者都down机了,或者生产者生产的数量越来越多,队列拼命堆积,如果不 ...

  2. rabbitmq多个消费者_为什么要选择RabbitMQ,RabbitMQ简介,各种MQ选型对比

    MQ 是什么?队列是什么,MQ 我们可以理解为消息队列,队列我们可以理解为管道.以管道的方式做消息传递. 场景: 1.其实我们在双11的时候,当我们凌晨大量的秒杀和抢购商品,然后去结算的时候,就会发现 ...

  3. RabbitMQ【RabbitMQ】

    RabbitMQ[RabbitMQ] 前言 说明 推荐 RabbitMQ 一.中间件 1.什么是中间件 2.中间件技术及架构的概述 3.基于消息中间件的分布式系统的架构 4.消息队列协议 5.消息队列 ...

  4. Spring Boot2.x-15 整合RabbitMQ 及RabbitMQ的基本使用

    文章目录 概述 在Docker CE中安装RabbitMQ 依赖 配置 基本使用 手工创建队列,发送消息到指定的队列 自动创建队列,发送消息到指定的队列 自动创建队列,Exchange和队列绑定 自动 ...

  5. RabbitMQ原理RabbitMQ各组件作用RabbitMQ使用场景

    RabbitMQ原理 RabbitMQ介绍 MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表 producer往消息队列中不断写入消息,而 ...

  6. 初识RabbitMQ,附RabbitMQ+PHP演示实例(亲测通过)

    RabbitMQ是一个在AMQP基础上实现的企业级消息系统.何谓消息系统,就是消息队列系统,消息队列是""消费-生产者模型""的一个典型的代表,一端往消息队列中 ...

  7. RabbitMQ系列——Rabbitmq Plugin configuration unchanged. 解决方案

    RabbitMQ系列--Rabbitmq Plugin configuration unchanged. 解决方案 一.问题概述 cmd命令窗口执行rabbitmq-plugins enable ra ...

  8. QoS服务质量四QoS边界行为之流量监管

    QoS服务质量四QoS边界行为之流量监管 二.QoS域边界节点的功能和行为 1.流分类 1.1.简单流分类 1.2.复杂流分类 2.流量监管 2.1.标记 2.1.1.流量监管工具CAR 2.1.1. ...

  9. 【RabbitMQ】RabbitMQ架构模型

    目录 RabbitMQ架构模型 Producer:生产者 Consumer:消费方 Broker:服务节点 Queue队列: Exchange:交换器 --fanout广播 --topic主题 --d ...

  10. QoS服务质量五QoS边界行为之流量整形

    QoS服务质量五QoS边界行为之流量整形 3.流量整形 3.1.GTS的原理 3.2.自适应流量整形 3.3流量监管和流量整形应用场景 3.4.配置限速(流量监管和流量整形) 3.4.1.配置流量整形 ...

最新文章

  1. 更智能:人工智能与能源行业的革命
  2. 【C++语法】回车与换行(vs2008)
  3. 2020.12.07.记录
  4. 【solr基础教程之九】客户端
  5. SAP CRM how is db table CRMD_PRODUCT_I read
  6. 听说面试很少有人答出:距离最近点对问题
  7. LuaForUnity10:框架配置与AssetBundle
  8. 如何在 React Native 中使用 NFC 标签
  9. LoadRunner压力测试:测试报告结果分析
  10. python pyecharts绘制网络关系图
  11. html5中translate,css3 中translate和transition的使用方法
  12. 个人理财管理系统代码
  13. typeof和instanceof的区别
  14. [转载]丢掉鼠标-Mac神软Alfred使用手册1_我是亲民_新浪博客
  15. 正则表达式的基本用法
  16. 泛微OA集成ERP,助力制造业实现供应商、销售全面数字化管理
  17. python输入长和宽输出面积_请用C++编写 从键盘上输入长方形的长和宽,输出周长和面积...
  18. https与http的区别以及https加密原理
  19. 多线程Monitor工作原理
  20. 统计字符串中数字字符的个数

热门文章

  1. python3免费下载小说案例
  2. 控制 html元素 显示/隐藏
  3. DNS轮询+泛域名解析
  4. 关于格雷码的一点思考
  5. Create SOCKET connection failure
  6. HttpClient API常用方法
  7. Fragment是什么,怎么用?
  8. MFC简单使用PostMessage
  9. 动画:链表快慢指针解题技巧
  10. ES7~ES13新特性(二)