[九]RabbitMQ-客户端源码之Consumer
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-consumer/
在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。
这里先来看下消费者客户端的关键代码:
QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicQos(32);channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [X] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}
可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。
在AMQConnection中有关MainLoop的主线程,专门用来"第一线"的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:
if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true;
}
之后调用processDelivery方法:
protected void processDelivery(Command command, Basic.Deliver method) {Basic.Deliver m = method;Consumer callback = _consumers.get(m.getConsumerTag());if (callback == null) {if (defaultConsumer == null) {throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");}else {callback = defaultConsumer;}}Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());try {this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());} catch (Throwable ex) {getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");}
}
这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。
我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:
@Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException
{checkShutdown();this._queue.add(new Delivery(envelope, properties, body));
}
这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:
public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{return handle(_queue.take());
}private Delivery handle(Delivery delivery) {if (delivery == POISON ||delivery == null && (_shutdown != null || _cancelled != null)) {if (delivery == POISON) {_queue.add(POISON);if (_shutdown == null && _cancelled == null) {throw new IllegalStateException("POISON in queue, but null _shutdown and null _cancelled. " +"This should never happen, please report as a BUG");}}if (null != _shutdown)throw Utility.fixStackTrace(_shutdown);if (null != _cancelled)throw Utility.fixStackTrace(_cancelled);}return delivery;
}
这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。
附:本系列全集
- [Conclusion]RabbitMQ-客户端源码之总结
- [一]RabbitMQ-客户端源码之ConnectionFactory
- [二]RabbitMQ-客户端源码之AMQConnection
- [三]RabbitMQ-客户端源码之ChannelManager
- [四]RabbitMQ-客户端源码之Frame
- [五]RabbitMQ-客户端源码之AMQChannel
- [六]RabbitMQ-客户端源码之AMQCommand
- [七]RabbitMQ-客户端源码之AMQPImpl+Method
- [八]RabbitMQ-客户端源码之ChannelN
- [九]RabbitMQ-客户端源码之Consumer
欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-consumer/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
[九]RabbitMQ-客户端源码之Consumer相关推荐
- RabbitMQ 客户端源码系列 - Channel
前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...
- RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码
目录 第十章-RabbitMQ之Spring客户端源码 1. 前言 2. 客户端消费代码 2.1 消费的实现方式 2.2 消费中注解解释 2.3 推测Spring实现过程 3.MQ消费源码分析 3.1 ...
- Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析
我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...
- boost::asio异步模式的C/S客户端源码实现
异步模式的服务器源码 //g++ -g async_tcp_server.cpp -o async_tcp_server -lboost_system //#include <iostream& ...
- zookeeper 客户端_zookeeper进阶-客户端源码详解
流程图 先看一下客户端源码的流程图 总体流程 总体流程 开启SendThread线程 开启EventThread 总结 下面根据源码讲解,大家整合源码和流程图一起看最好,本篇内容比较多建议收藏起来看. ...
- grpc-go客户端源码分析
grpc-go客户端源码分析 代码讲解基于v1.37.0版本. 和grpc-go服务端源码分析一样,我们先看一段示例代码, const (address = "localhost:50051 ...
- WordPress Blog Android客户端源码分析(一)
一直想找一个大型的Android开源项目进行分析,由于自身和导师课程需要选择了wordpress的Android客户端源码进行学习和解读.源码github官方下载地址:开源项目地址.分析源码的最佳手段 ...
- 使用live555客户端源码遇到的问题及解决方法
使用live555客户端源码拉rtsp流遇到两个问题,正常测试拉取海康摄像头没问题: 1.拉有些厂商的rtsp流会间隔一段时间断开连接: 2.与大华摄像头建立连接时,发送DESCRIBE命令后很长时间 ...
- swift实现饭否应用客户端源码
swift 版 iOS 饭否客户端 源码下载:http://code.662p.com/view/13318.html 饭否是中国大陆地区第一家提供微博服务的网站,被称为中国版Twitter.用户可通 ...
最新文章
- 一份可以让 Python 变得更快的工具清单
- 轻松 [2007年4月22日]
- Excel应该这么玩——5、三种数据:Excel也是系统
- 【Python】Pandas数据挖掘与分析时的常用方法
- 前端学习(1344):用户的增删改查操作1
- 为什么很多人C语言学不下去
- 文件操作,列表实例NiceHexSpiral
- shell备份mysql思路_写一个shell脚本备份mysql数据库的步骤
- MATLAB生成数组
- 【缺陷识别】基于matlab GUI SVM金属表面缺陷分类与测量【含Matlab源码 682期】
- BSN联盟链专有节点服务介绍
- JAVA优秀开源框架收集
- 小型超市管理系统(JAVA,JSP,SSH,MYSQL)
- zhu的Oracle数据库笔记
- debian安装vim提示Package vim is not available, but is referred to by another package的解决方案
- 中小学生安全专用手机
- Axure获取焦点动画
- 我的世界 win10c语言版,Minecraft Win10版下载
- js-xlsx插件导出的excel头部有一行序号解决办法
- 这些你必须知道的 Linux 技能
热门文章
- bt5重启网卡命令_BackTrack 5 简单网络配置命令
- pytorch教程龙曲良26-30
- 201103阶段二linux gdb调试与ftp配置
- IDAE启动报错:Intellij idea Cannot start internal HTTP server. Git integration, JavaScript debugger...
- 【报错笔记】 启动tomcat服务器报错Context initialization failed
- (转载)java list排序
- yield next和yield* next的区别
- C语言之从内存角度理解不同类型的变量
- python高阶函数和匿名函数
- 《无人机DIY》——4.2 项目1:MakerBeam机身