


在我的博文《RabbitMQ之Consumer消费模式(Push & Pull)》中讲到,Consumer的消费模式有Pull 和 Push两种,而经常用到的就是Push模式,Push模式在3.x的用法demo如下:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [X] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);break;


boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String routingKey = envelope.getRoutingKey();String contentType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();// (process the message components here ...)channel.basicAck(deliveryTag, false);}


QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers ware made on the Connection’s thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.

QueuingConsumer provided client code with an easy way to obviate the problem by queueing incoming messages and processing them on a separate, application-managed thread.

The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.

As such, it is now safe to implement Consumer directly of to extend DefaultConsumer and QueueingConsumer is a lot less relevant.


  1. the Consumer could stall the processing of all Channels on the Connection. =>QueueingConsumer会拖累Connection的所有Channels的操作
  2. if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.=>同步递归调用时会产生死锁

对于这两句简单的言辞,博主没有停下追求真理的脚步,既而去github上发问,当我咨询rabbitmq-java-client的作者时(issue @265),他是这么回复的:

Search rabbitmq-users archives. That consumer implementation was merely a workaround for the consumer operation dispatch deficiency that no longer exists. It has significant limitations in that automatic connection recovery does not support it and when deliveries happen faster than consumers actually process them, its internal j.u.c. queue data structure can grow very large. It has been deprecated for years prior to the removal.

上面提及的rabbitmq-users的链接是:https://groups.google.com/forum/#!forum/rabbitmq-users,当然在我大天朝是访问不了的。博主的翻墙软件也失效了,就没法search,有兴趣的小伙伴search到的话麻烦告知下(下方留言,私信,或者留下你的资料地址~)。不过作者也提交了两点:1. automatic connection recovery不支持QueueingConsumer的这种形式;2. 内存溢出问题。


这里我们来看一段英文介绍: You have a queue in RabbitMQ. You have some clients consuming from that queue. If you don’t set a Qos setting at all (Basic.Qos), then RabbitMQ will push all the queue’s messages to the client as fast as the network and the clients will allow. 也就是说,如果由于某些原因,队列中堆积了比较多的消息,就可能导致Consumer内存溢出卡死,于是发生恶性循环,队列消息不断堆积得不到消化,彻底地悲剧了。其实这个问题可以通过设置Basic.Qos来很好的解决。


QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);






