欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channeln/


ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。
首先来看看ChannelN的成员变量:

private final Map<String, Consumer> _consumers = Collections.synchronizedMap(new HashMap<String, Consumer>());
private volatile Consumer defaultConsumer = null;
private final ConsumerDispatcher dispatcher;private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();private volatile CountDownLatch finishedShutdownFlag = null;private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
private long nextPublishSeqNo = 0L;
private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
private volatile boolean onlyAcksReceived = true;

源代码中有关ChannelN的呈现顺序有所不同,这里博主为了区分开来,重新排了序。


processAsync(Command command)

在AMQChannel这个抽象类中唯一的抽象方法即为此方法,这个方法主要用来针对接受到broker的AMQCommand进行进一步的处理,至于怎么接受Socket,怎么封装成帧,怎么确定一个AMQComand已经封装完毕,都已在调用此方法前完成。此方法可以处理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等这些从broker端回传的AMQComand.
这个方法也比较长,下面也会涉及到这个方法内的内容。


Confirm.Select & Basic.Publish

在RabbitMQ之消息确认机制(事务+Confirm)这篇文章中,博主就讲到RabbitMQ的producer端确认机制分为事务机制和Confirm机制,这里就来阐述下Confirm机制的内部实现。
和Confirm机制有关的成员变量有:

private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
private long nextPublishSeqNo = 0L;
private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
private volatile boolean onlyAcksReceived = true;

在使用Confirm机制的时候,首先要置Channel为Confirm模式,即向broker端发送Confirm.Select。
业务代码(DEMO实例):

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {//TODO}public void handleNack(long deliveryTag, boolean multiple) throws IOException {//TODO}
});
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.waitForConfirms();

在创建完Channel之后调用channel.confirmSelect()方法即可,confirmSelect()代码如下:

public Confirm.SelectOk confirmSelect()throws IOException
{if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;return (Confirm.SelectOk)exnWrappingRpc(new Confirm.Select(false)).getMethod();
}

这里的成员变量nextPublishSeqNo是用来为Confirm机制服务的,当Channel开启Confirm模式的时候,nextPublishSeqNo=1,标记第一条publish的序号,当Publish时:

public void basicPublish(String exchange, String routingKey,  boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException
{if (nextPublishSeqNo > 0) {unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}BasicProperties useProps = props;if (props == null) {useProps = MessageProperties.MINIMAL_BASIC;}transmit(new AMQCommand(new Basic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(),useProps, body));
}

client端向broker端Basic.Pubish发送消息并将当前的序号加入到unconfirmedSet中,并自加nextPublishSeqNo++等待下一个消息的发送。

有关Confirm.Select的详细用法可以参考:RabbitMQ之消息确认机制(事务+Confirm)

之后等待broker的确认回复(Basic.Ack/.Nack):channel.waitForConfirms()

public boolean waitForConfirms(long timeout)throws InterruptedException, TimeoutException {if (nextPublishSeqNo == 0L)throw new IllegalStateException("Confirms not selected");long startTime = System.currentTimeMillis();synchronized (unconfirmedSet) {while (true) {if (getCloseReason() != null) {throw Utility.fixStackTrace(getCloseReason());}if (unconfirmedSet.isEmpty()) {boolean aux = onlyAcksReceived;onlyAcksReceived = true;return aux;}if (timeout == 0L) {unconfirmedSet.wait();} else {long elapsed = System.currentTimeMillis() - startTime;if (timeout > elapsed) {unconfirmedSet.wait(timeout - elapsed);} else {throw new TimeoutException();}}}}
}

可以看到waitForConfirms其实本质上是在等待unconfirmedSet变成empty,否则就线程wait()。

当接收到broker端的ACK/NACK回复时,一步步的经过处理到达processAsync(Command command)方法,然后进而处理Basic.Ack/.Nack帧。

else if (method instanceof Basic.Ack) {Basic.Ack ack = (Basic.Ack) method;callConfirmListeners(command, ack);handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);return true;
} else if (method instanceof Basic.Nack) {Basic.Nack nack = (Basic.Nack) method;callConfirmListeners(command, nack);handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);return true;
}

首先是将相应的Method做一下转换,之后callConfirmListeners(),这个方法是调用成员变量confirmListeners这个list里的所有的ConfirmListener:

private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();

这个ConfirmListener的list就需要在channel.basicPushlish()调用之前先:

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {//TODO}public void handleNack(long deliveryTag, boolean multiple) throws IOException {//TODO}
});

在调用完ConfirmListener之后继续调用handleAckNack方法:

private void handleAckNack(long seqNo, boolean multiple, boolean nack) {if (multiple) {unconfirmedSet.headSet(seqNo + 1).clear();} else {unconfirmedSet.remove(seqNo);}synchronized (unconfirmedSet) {onlyAcksReceived = onlyAcksReceived && !nack;if (unconfirmedSet.isEmpty())unconfirmedSet.notifyAll();}
}

这个方法本意上是对收到某条消息的ACK或者NACK的处理,发送消息时Basic.Publish的nextPublishNo对应于相应的ACK/NACK的deliveryTag,将其从unconfirmedSet中删除即可,如果有NACK帧,则将其相应的标识onlyAcksReceived设置为false,判断此时unconfirmedSet是否为空,如果条件成立则notifyAll(),将waitForConfirm唤起,返回onlyAcksReceived的状态。

如果channel.waitForConfirm()返回为false,则说明broker没有接受client发送的消息,此时需要在业务代码中做进一步处理,比如重发。


Basic.Qos

消费者在开启ACK的情况下,对接受到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中,由于消费者自身处理能力有限,从RabbitMQ获取一定数量的消息好厚,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接受来自队列的消息。在这种场景下,我们可以设置Basic.Qos中的prefetch_count来达到这个效果。


Basic.Consume

与消费有关的成员变量:

private final Map<String, Consumer> _consumers =Collections.synchronizedMap(new HashMap<String, Consumer>());
private volatile Consumer defaultConsumer = null;
private final ConsumerDispatcher dispatcher;

源码如下:

/*** Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}* method.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param consumerTag a client-generated consumer tag to establish context* @param noLocal true if the server should not deliver to this consumer* messages published on this channel's connection* @param exclusive true if this is an exclusive consumer* @param callback an interface to the consumer object* @param arguments a set of arguments for the consume* @return the consumerTag associated with the new consumer* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk*/
public String basicConsume(String queue, boolean autoAck, String consumerTag,boolean noLocal, boolean exclusive, Map<String, Object> arguments,final Consumer callback)throws IOException
{BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>() {public String transformReply(AMQCommand replyCommand) {String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();_consumers.put(actualConsumerTag, callback);dispatcher.handleConsumeOk(callback, actualConsumerTag);return actualConsumerTag;}};rpc(new Basic.Consume.Builder().queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build(),k);try {return k.getReply();} catch(ShutdownSignalException ex) {throw wrap(ex);}
}

这个方法最精简的只要两个参数,即String queue和Consumer callback:public String basicConsume(String queue, Consumer callback)。

方法主要是发送Basic.Consume帧,然后等待Basic.ConsumeOk帧。待收到broker端的Basic.ConsumeOk帧之后,触发BlockingRpcContinuation中的transformReply()方法。有关BlockingRpcContinuation在[五]RabbitMQ-客户端源码之AMQChannel中有陈述。transformReply()方法先是提取consumerTag,这个consumerTag是在channel.basicConsume()方法中设置的,是其中的一个参数,如果设置了此参数,那么consumerTag就是这个参数的值;如果没有设置这个consumerTag,Broker会返回一个consumerTag,类似:amq.ctag-Mg0eSv2GgfG6UzfncD8E9g。然后作为key和Consumer这个回调函数一起放置到_consumer这个回调函数中以备后面检索调用。这个consumerTag还作为transformReply()方法的返回值,存入到BlockingRpcContinuation对象中,既而在basicConsume这个方法最后调用k.getReply()方法是获取其值,也就是说basicConsume方法的返回值就是consumerTag。

当发送Basic.Consume帧之后,由broker返回的是Basic.ConsumeOk帧+Basic.Deliver帧,Basic.ConsumerOk帧由上面方法处理,Basic.Deliver帧由processAsync处理。

说到basicConsume方法,还有一个重要的就是设置Consumer这个回调函数。一般为了方便直接使用RabbitMQ客户端自带的QueueingConsumer来处理,当然也可以实现一个自定义的Consumer,当然了需要实现Consumer这个接口,可以参考QueueingConsumer的父类DefaultConsumer, 有关Consumer相关的更多细节,可以参考:[九]RabbitMQ-客户端源码之Consumer。

dispatcher.handleConsumeOk(callback, actualConsumerTag);这段代码实际上就是:callback.handleConsumeOk(actualConsumerTag),这个还是调用到Consumer的方法处理。


Basic.Get

上面的Basic.Consume是基于push模式的,而Basic.Get是基于pull模式的。相关的代码如下:

public GetResponse basicGet(String queue, boolean autoAck)throws IOException
{AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder().queue(queue).noAck(autoAck).build());Method method = replyCommand.getMethod();if (method instanceof Basic.GetOk) {Basic.GetOk getOk = (Basic.GetOk)method;Envelope envelope = new Envelope(getOk.getDeliveryTag(),getOk.getRedelivered(),getOk.getExchange(),getOk.getRoutingKey());BasicProperties props = (BasicProperties)replyCommand.getContentHeader();byte[] body = replyCommand.getContentBody();int messageCount = getOk.getMessageCount();return new GetResponse(envelope, props, body, messageCount);} else if (method instanceof Basic.GetEmpty) {return null;} else {throw new UnexpectedMethodError(method);}
}

基本上就是客户端发送Basic.Get至Broker,Broker返回Basic.GetOK并携带数据。注意方法最后返回GetResponse对象,这个对象就是包装了一下数据。


事务

和事务有关的代码:

/** Public API - {@inheritDoc} */
public Tx.SelectOk txSelect()throws IOException
{return (Tx.SelectOk) exnWrappingRpc(new Tx.Select()).getMethod();
}/** Public API - {@inheritDoc} */
public Tx.CommitOk txCommit()throws IOException
{return (Tx.CommitOk) exnWrappingRpc(new Tx.Commit()).getMethod();
}/** Public API - {@inheritDoc} */
public Tx.RollbackOk txRollback()throws IOException
{return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
}

这里可以看到基本对于事务的处理是采用rpc的方法一对一的进行交互,有关RabbitMQ的事务机制可以参考:RabbitMQ之消息确认机制(事务+Confirm)。


其余

ChannelN还有:

  • 关于Exchange,Queue的申明创建,删除,绑定,解绑
  • 关闭处理
  • Basic.Return
  • Basic.Flow
  • Basic.Recover
  • Basic.Cancel
  • Basic.Ack/.Nack/.Reject

这些就不做详细介绍了。有兴趣的同学可以继续翻阅源码,这些都比较简单。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channeln/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


[八]RabbitMQ-客户端源码之ChannelN相关推荐

  1. RabbitMQ 客户端源码系列 - Channel

    前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...

  2. RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码

    目录 第十章-RabbitMQ之Spring客户端源码 1. 前言 2. 客户端消费代码 2.1 消费的实现方式 2.2 消费中注解解释 2.3 推测Spring实现过程 3.MQ消费源码分析 3.1 ...

  3. ZooKeeper客户端源码(零)——客户端API使用

    首发CSDN:徐同学呀,原创不易,转载请注明源链接.我是徐同学,用心输出高质量文章,希望对你有所帮助. 本篇源码基于ZooKeeper3.7.0版本. 一.建立连接和会话 客户端可以通过创建一个 Zo ...

  4. maven快速入门番外篇——Eclipse下载GitHub上FastDFS-Client客户端源码并转化成maven工程以及打包到本地maven仓库

    由于fastdfs-client的jar包目前在中央仓库是没有坐标的,而在项目中要想实现文件的上传和下载就得使用到它,这不禁就让我们头疼,所以为了解决这个问题,我写下了这篇文章,希望对读者能有所帮助. ...

  5. Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析

      我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...

  6. boost::asio异步模式的C/S客户端源码实现

    异步模式的服务器源码 //g++ -g async_tcp_server.cpp -o async_tcp_server -lboost_system //#include <iostream& ...

  7. zookeeper 客户端_zookeeper进阶-客户端源码详解

    流程图 先看一下客户端源码的流程图 总体流程 总体流程 开启SendThread线程 开启EventThread 总结 下面根据源码讲解,大家整合源码和流程图一起看最好,本篇内容比较多建议收藏起来看. ...

  8. grpc-go客户端源码分析

    grpc-go客户端源码分析 代码讲解基于v1.37.0版本. 和grpc-go服务端源码分析一样,我们先看一段示例代码, const (address = "localhost:50051 ...

  9. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

最新文章

  1. New Phytologist:王建军等综述全球变化下的山地微生物组和生态系统功能
  2. Windows 2008 R2 SP1部署Exchange2010 SP1(原创)
  3. IDEA 2020 本土化,真的是全中文了,新手,开心了!
  4. Hi3516A开发-- UBI 文件系统使用指南
  5. JQuery中的ID选择器
  6. Spring jndi连接数据库
  7. 利用Lucene.net搭建站内搜索(3)---创建索引
  8. Java Web学习总结(26)——Servlet不同版本之间的区别
  9. python 字典转对象
  10. Algs4-1.2.8引用型变量赋值-数组复制
  11. Maven的Pom文件 ( Eclipse中创建Maven工程, 使用注意点,DevOps相关)
  12. Gradle 构建 android 应用常见问题解决指南
  13. linux 开启allow_url_fopen,如何开启allow_url_fopen函数
  14. GitHub 爬虫项目
  15. An动画优化之遮罩层动画
  16. 没有自制力的人有什么资格谈努力
  17. mate桌面暗色调超好看的配置
  18. C#执行js中的函数的问题,以sohu邮箱登陆密码js的MD5为例
  19. 结构光三维重建之光栅图像相位解算(MATLAB)
  20. Endnote插入参考文献时中英文混排字体设置

热门文章

  1. java重置_JAVA復制數組和重置數組大小
  2. Linux--结构体的详细学习
  3. solidity智能合约[37]-以太坊虚拟机数据存储
  4. corosync+pacemaker在centos7上的安装,配置简述
  5. 图像特效——摩尔纹 moir
  6. jQuery获取iframe的document对象的方法
  7. JVM学习笔记(一)------基本结构
  8. Requirements of an SAP system administrator
  9. SQL 备份还原单个表
  10. 人人网 6.0 版申请页面随着滚动条拖动背景图片滚动出现,具体使用 JavaScript 和 CSS 原理是什么?...