欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/


关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1… _channelMax)。

ChannelManager类的代码量不是很多,主要用来管理Channel的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。

下面是ChannelManager的成员变量:

/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
private final Object monitor = new Object();/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();private final IntAllocator channelNumberAllocator;private final ConsumerWorkService workService;private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;

这上面的成员变量下面会有涉及。


对于ChannelManager的使用,是AMQConnection中的成员变量:

/** Object that manages a set of channels */
private volatile ChannelManager _channelManager;

AMQConnection中start()的_channelManager中对其初始化:

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {return new ChannelManager(this._workService, channelMax, threadFactory);
}

再调用其构造函数:

public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {if (channelMax == 0) {// The framing encoding only allows for unsigned 16-bit integers// for the channel numberchannelMax = (1 << 16) - 1;}_channelMax = channelMax;channelNumberAllocator = new IntAllocator(1, channelMax);this.workService = workService;this.threadFactory = threadFactory;
}

这里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():

private void initializeConsumerWorkService() {this._workService  = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
}

再回到构造函数。

channelMax参数是在client接收到broker的Connection.Tune帧中的“Channel-Max”参数之后设置的,如果为0则表示没有限制,这里就会设置为默认的最大值:2的16次方-1。
threadFactory参数是指:Executors.defaultThreadFactory();

关于ConsumerWorkService请参考文章末尾处。


使用过RabbitMQ的同学知道要生产或者消费消息之前必须要初始化Channel,如下:

Channel channel = connection.createChannel();

这个createChannel()是AMQConnection中的方法:

public Channel createChannel(int channelNumber) throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this, channelNumber);
}
public Channel createChannel() throws IOException {ensureIsOpen();ChannelManager cm = _channelManager;if (cm == null) return null;return cm.createChannel(this);
}

这里就是调用了ChannelManager的createChannel方法。

下面是ChannelManager中关于创建Channel的代码:

public ChannelN createChannel(AMQConnection connection) throws IOException {ChannelN ch;synchronized (this.monitor) {int channelNumber = channelNumberAllocator.allocate();if (channelNumber == -1) {return null;} else {ch = addNewChannel(connection, channelNumber);}}ch.open(); // now that it's been safely addedreturn ch;
}public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {ChannelN ch;synchronized (this.monitor) {if (channelNumberAllocator.reserve(channelNumber)) {ch = addNewChannel(connection, channelNumber);} else {return null;}}ch.open(); // now that it's been safely addedreturn ch;
}private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {if (_channelMap.containsKey(channelNumber)) {// That number's already allocated! Can't do it// This should never happen unless something has gone// badly wrong with our implementation.throw new IllegalStateException("We have attempted to "+ "create a channel with a number that is already in "+ "use. This should never happen. "+ "Please report this as a bug.");}ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);_channelMap.put(ch.getChannelNumber(), ch);return ch;
}protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {return new ChannelN(connection, channelNumber, workService);
}

上面有两个createChannel方法,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。

注意两个createChannel方法中都有这样一句代码:

ch.open();

这个是什么呢?其实是调用ChannelN的open方法:

/*** Package method: open the channel.* This is only called from {@link ChannelManager}.* @throws IOException if any problem is encountered*/
public void open() throws IOException {// wait for the Channel.OpenOk response, and ignore itexnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}

这样就调用了AMQChannel的rpc方法,向broker发送了一个Channel.Open帧。

addNewChannel方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。

channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet来实现channelNumber的分配,有兴趣的同学可以深究进去看看。

关于ChannelN类会有专门一篇博文来讲述,其实整个RabbitMQ-client的代码最关键的就是ChannelN这个类,需要着重讲述。

细心的朋友可能会发现关于ConsumerWorkService这个,我并没有做什么阐述。这个主要牵涉到Channel层面的处理,涉及到的类有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于这里怎么理解,在ChannelN中这么解释:
service for managing this channel’s consumer callbacks。意思是管理消费回调的服务。
综述,ChannelManager主要用来管理Channel, 包括channelNumber与Channel之间的映射关系。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channelmanager/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


[三]RabbitMQ-客户端源码之ChannelManager相关推荐

  1. RabbitMQ 客户端源码系列 - Channel

    前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...

  2. RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码

    目录 第十章-RabbitMQ之Spring客户端源码 1. 前言 2. 客户端消费代码 2.1 消费的实现方式 2.2 消费中注解解释 2.3 推测Spring实现过程 3.MQ消费源码分析 3.1 ...

  3. Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析

      我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...

  4. grpc-go客户端源码分析

    grpc-go客户端源码分析 代码讲解基于v1.37.0版本. 和grpc-go服务端源码分析一样,我们先看一段示例代码, const (address = "localhost:50051 ...

  5. TeamTalk客户端源码分析七

    TeamTalk客户端源码分析七 一,CBaseSocket类 二,select模型 三,样例分析:登录功能 上篇文章我们分析了network模块中的引用计数,智能锁,异步回调机制以及数据的序列化和反 ...

  6. ZooKeeper客户端源码(零)——客户端API使用

    首发CSDN:徐同学呀,原创不易,转载请注明源链接.我是徐同学,用心输出高质量文章,希望对你有所帮助. 本篇源码基于ZooKeeper3.7.0版本. 一.建立连接和会话 客户端可以通过创建一个 Zo ...

  7. Libcurl的编译_HTTP/HTTPS客户端源码示例

    HTTP/HTTPS客户端源码示例 环境:  zlib-1.2.8  openssl-1.0.1g  curl-7.36 Author:  Kagula LastUpdateDate: 2016-05 ...

  8. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

  9. maven快速入门番外篇——Eclipse下载GitHub上FastDFS-Client客户端源码并转化成maven工程以及打包到本地maven仓库

    由于fastdfs-client的jar包目前在中央仓库是没有坐标的,而在项目中要想实现文件的上传和下载就得使用到它,这不禁就让我们头疼,所以为了解决这个问题,我写下了这篇文章,希望对读者能有所帮助. ...

最新文章

  1. 12 c for. oracle rac,【案例】Oracle RAC FOR AIX搭建执行root.sh时两次报错的解决办法
  2. 计算机应用技术环境评估,计算机应用教程(第7版)(Windows 7与Office 2007环境)习题解答与上机练习...
  3. STM32 利用空闲中断接收数据
  4. 心离钱越远,钱离口袋越近,心离钱越近,钱离口袋越远
  5. java 输出一个爱心_java基础 之 几个常用的类
  6. 微信公众号在线答题小程序系统怎么做答题游戏活动
  7. 流媒体协议:互联网视频分发协议介绍(渐进式、HLS、DASH、HDS、RTMP协议)
  8. python求矩阵逆、伪逆、转置、矩阵乘法
  9. 鸿蒙初开再往前是什么,鸿蒙初开造句,用鸿蒙写一句话
  10. 杂牌机搞机之旅(一)——获得root权限(刷入magisk)
  11. mysql配置数据库邮件_SqlServer2008怎么配置数据库邮件?
  12. uni-app 二维码生成器
  13. 槟城usm大学计算机专业怎么样,马来西亚理科大学USM比你想的好太多了!
  14. 机械专业夹具类毕业设计题目汇总/组合机床、车床拨叉、飞锤支架、连接座、倒挡拨叉、盖、法兰盘、铜衬轴套、心轴零件、曲轴箱零件、托板、发动机曲轴、方刀架、车床变速箱、柴油机机体、车床滤油器、方刀架……
  15. 国产的蓝光存储设备能算信创产品吗?
  16. 你要的开源报修系统V2版本已发布,请及时更新最新源码。
  17. 如何获取各地的日照强度(太阳辐射)数据?
  18. 20200523_01_Multisim14.2+仿真+入门
  19. StyleGAN2探骊得珠(一):论文精读与注释,文中的SCALE这个词到底是什么意思?
  20. VC调试--输出调试字符串(含示例代码)

热门文章

  1. skywalking环境搭建
  2. 为什么人们默认 x86 代表 32 位处理器
  3. 哈希表-map(对于python来说是字典)
  4. 2019测试指南-测试测试原理
  5. 有关sublime的一些使用
  6. python模块详解 time与date time
  7. tableview的reloadData 产生的问题
  8. 在xml文件的Preference标签中,用extra给intent标签加参数
  9. (转)开源 Apache 服务器安全防护技术精要及实战
  10. 试着用windows live writer来写篇日志