目录

prefetch_count参数的含义

RabbitMQ客户端中prefetch_count源码跟踪

prefetch_count参数使用

prefetch_count参数最佳实践

小结


prefetch_count参数的含义

先从AMQP(Advanced Message Queuing Protocol,及高级消息队列协议,RabbitMQ实现了此协议的0-9-1版本的大部分内容)和RabbitMQ的具体实现去理解prefetch_count参数的含义,可以查阅对应的文档(见文末参考资料)。AMQP 0-9-1定义了basic.qos方法去限制消费者基于某一个Channel或者Connection上未进行ack的最大消息数量上限。basic.qos方法支持两个参数:

  • global:布尔值。
  • prefetch_count:整数。

这两个参数在AMQP 0-9-1定义中的含义和RabbitMQ具体实现时有所不同,见下表:

或者用简洁的英文表格理解:

这里画一个图理解一下:

上图仅仅为了区分协议本身和RabbitMQ中实现的不同,接着说说prefetch_count对于消费者(线程)和待消费消息的作用。假定一个前提:RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个Channel实例。当global参数为false时候,效果如下:

而当global参数为true时候,效果如下:

在消费者线程处理速度远低于RabbitMQ客户端从RabbitMQ服务端获取到队列消息的速度的场景下,prefetch_count条未进行ack的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。这部分消息会占据JVM的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到RabbitMQ的消费者,必须要考虑这些"预取消息"的内存占用量。不过值得注意的是:prefetch_count是RabbitMQ服务端的参数,它的设置值或者快照都不会存放在RabbitMQ客户端。同时需要注意prefetch_count生效的条件和特性(从参数设置的一些demo和源码上感知):

  • prefetch_count参数仅仅在 basic.consume的 autoAck参数设置为 false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
  • basic.consume如果在非自动确认模式下忘记了手动调用 basic.ack,那么 prefetch_count正是未 ack消息数量的最大上限。
  • prefetch_count是由 RabbitMQ服务端控制,一般情况下能保证各个消费者线程中的未 ack消息分发是均衡的,这点笔者猜测是 consumerTag起到了关键作用。

RabbitMQ客户端中prefetch_count源码跟踪

编写本文的时候引入的RabbitMQ客户端版本为:
com.rabbitmq:amqp-client:5.9.0

上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读RabbitMQ的Java客户端源码,主要是针对basic.qos和basic.consume两个方法,对应的是
com.rabbitmq.client.impl.ChannelN#basicQos()和
com.rabbitmq.client.impl.ChannelN#basicConsume()两个方法。先看ChannelN#basicQos():

这里的basicQos()方法多了一个prefetchSize参数,用于限制分发内容的大小上限,默认值0代表无限制,而prefetchCount的取值范围是[0,65535],取值为0也是代表无限制。这里的ChannelN#basicQos()实现中直接封装basic.qos方法参数进行一次RPC调用,意味着直接更变RabbitMQ服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。接着看ChannelN#basicConsume()方法:

上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:

整个消息消费过程,prefetch_count参数并未出现在客户端代码中,又再次印证了前面一节的结论,即prefetch_count参数的行为和作用完全由RabbitMQ服务端控制。而最终Customer或者常用的DefaultCustomer句柄是在WorkPoolRunnable中回调的,这类任务的执行线程来自于ConsumerWorkService内部的线程池,而这个线程池又使用了
Executors.newFixedThreadPool()去构建,使用了默认的线程工厂类,因此在Customer#handleDelivery()方法内部打印的线程名称的样子是pool-1-thread-*。

这里
VariableLinkedBlockingQueue就是前一节中的message queue的原型

prefetch_count参数使用

设置prefetch_count参数比较简单,就是调用Channel#basicQos()方法:

public class RabbitQos {static String QUEUE = "qos.test";public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE, true, false, false, null);channel.basicQos(2);channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("1------" + Thread.currentThread().getName());sleep();}});channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("2------" + Thread.currentThread().getName());sleep();}});for (int i = 0; i < 20; i++) {channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());}sleep();}private static void sleep() {try {Thread.sleep(Long.MAX_VALUE);} catch (Exception ignore) {}}
}

上面是原生的amqp-client的写法,如果使用了spring-amqp(spring-boot-starter-amqp),可以通过配置文件中的
spring.rabbitmq.listener.direct.prefetch属性指定所有消费者线程的prefetch_count,如果要针对部分消费者线程进行该属性的设置,则需要针对
RabbitListenerContainerFactory进行改造。

prefetch_count参数最佳实践

关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:

这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。也就是说,「官方是建议把prefetch_count设置为30」。这里再参看一下spring-boot-starter-amqp中对此参数定义的默认值,具体是
AbstractMessageListenerContainer中的DEFAULT_PREFETCH_COUNT:

如果没有通过
spring.rabbitmq.listener.direct.prefetch进行覆盖,那么使用spring-boot-starter-amqp中的注解定义的消费者线程中设置的prefetch_count就是250。

笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。总结如下(个人经验仅供参考):

  • 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把 prefetch_count设置为 1。
  • 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未 ack总消息量的内存极限,从而设计一个合理的 prefetch_count值。
  • 当消费者线程的处理速度十分快,远远大于 RabbitMQ服务端的消息分发,在网络带宽充足的前提下,设置可以把 prefetch_count值设置为 0,不做任何的消息流控。
  • 一般场景下,建议使用 RabbitMQ官方的建议值 30或者 spring-boot-starter-amqp中的默认值 250。

小结

小结一下:

  • prefetch_count是 RabbitMQ服务端的参数,设置后即时生效。
  • prefetch_count对于 AMQP-0-9-1中的定义与 RabbitMQ中的实现不完全相同。
  • prefetch_count值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。

深入理解RabbitMQ中的prefetch_count参数相关推荐

  1. python参数传递方法_深入理解python中函数传递参数是值传递还是引用传递

    python 的 深入理解python中函数传递参数是值传递还是引用传递 目前网络上大部分博客的结论都是这样的: Python不允许程序员选择采用传值还是传 引用.Python参数传递采用的肯定是&q ...

  2. 解惑(三)----- 深入理解Python中的self参数和__init__(self)方法--通过类比Java语言

    一.前言 在这里我想通过用Python和Java语言的类比来对Python中的self参数和__init__(self)方法做一个深入的解释.这样可以加深对self参数和__init__(self)方 ...

  3. 理解nodejs中函数的参数的来由

    看一段创建并启动nodejs服务的代码,如下: var http = require('http');http.createServer(function (request, response) {r ...

  4. python怎么理解函数的参数_理解Python中函数的参数

    定义函数的时候,我们把参数的名字和位置确定下来,函数的接口定义就完成了.对于函数的调用者来说,只需要知道如何传递正确的参数,以及函数将返回什么样的值就够了,函数内部的复杂逻辑被封装起来,调用者无需了解 ...

  5. 参数 中_理解JavaScript中函数的参数

    1,arguments JavaScript的函数的参数(arguments)在函数体的内部表现为一个类似数组的对象.就是它拥有数组的方法,却不是Array的实例. 例1 我们直接打印出argumen ...

  6. 理解RabbitMQ中的AMQP模型,知乎上已获万赞

    前言 在实际开发,Redis使用会频繁,那么在使用过程中我们该如何正确抉择数据类型呢?哪些场景下适用哪些数据类型.而且在面试中也很常会被面试官问到Redis数据结构方面的问题: Redis为什么快呢? ...

  7. python魔法参数_python中的魔法参数:*args和**kwargs

    def foo(*args, **kwargs): print 'args = ', args print 'kwargs = ', kwargs print '------------------- ...

  8. RabbitMQ 中的 VirtualHost 该如何理解

    当我们第一次安装好一个 RabbitMQ 之后,我们可能都会通过 Web 页面去管理这个 RabbitMQ,默认情况下,我们第一次使用的默认用户是 guest. 登录成功后,在 admin 选项卡可以 ...

  9. lstm 输入数据维度_理解Pytorch中LSTM的输入输出参数含义

    本文不会介绍LSTM的原理,具体可看如下两篇文章 Understanding LSTM Networks DeepLearning.ai学习笔记(五)序列模型 -- week1 循环序列模型 1.举个 ...

最新文章

  1. 传统方法 + 深度学习发威! | 2021瓷砖缺陷检测总决赛冠军思路分享
  2. stp:spanning tree protocol 生成树基本原理
  3. aws搭建java项目_AWS下S3之java开发
  4. html5中高德、腾讯、百度 地图api调起手机app
  5. android.provider.documentscontract cannot be resolved.
  6. java printstacktrace_为什么异常. printStackTrace() 被认为是不好的实践?_java_酷徒编程知识库...
  7. 试用Mono Beta 1.0
  8. 被骂十几年,躺赚上千亿!这个“土匪”行业,还能浪多久?
  9. L2-004. 这是二叉搜索树吗?-PAT团体程序设计天梯赛GPLT
  10. win7计算机记忆窗口,Win7系统关闭和打开搜索记忆功能的方法(图文教程)
  11. BGP路由反射器原理及配置实例
  12. 一道学吧上的题 ^ 题目:不允许重复的实验 - 从数字1、2、3、4、5中随机抽取3次数字(不允许重复)组成一个三位数,则其各位数字之和等于n的概率为________ 输入整数 输出一个小数(保留
  13. android 修复工具,牛学长安卓手机修复工具(安卓手机修复助手)V2.4.0.11 免费版
  14. 电磁场理论-麦克斯韦方程组
  15. RestClient操作索引库
  16. Python xlwt 操作 excel 表格基础(三):单元格格式、字体格式、对齐方式、边框及填充等
  17. Android强大的控件——RecyclerView
  18. 可怜的RSA【网络攻防CTF】(保姆级图文)
  19. php adodb smarty,ADODB结合SMARTY使用~超级强
  20. 史上最全亚马逊申诉模板!!!!

热门文章

  1. 用vulkan写个引擎 (三)ui组件
  2. SVG使用XML格式定义图像
  3. Docker数据持久化
  4. 简单对比一下Cookie和Session的主要区别
  5. 【分享基金考试资料】如何才能取得基金从业人员资格?结果多久有效?
  6. pandas - merge 函数
  7. 前言 - 让彼得·潘常驻心间——《与时间同行》(1)
  8. c语言结构体数组放入文件中,c-从文件中读取数据并存储到结构数组中
  9. 解决所有浏览器被“hao 123”拦截的终极大法,试过很多很多方法都不管用,最终这个管用。
  10. java -cp classpath_如何正确配置classpath