导语
  在前面的分析中分析了关于Producer发送消息的逻辑,并且追踪到了在DefaultMQPushConsumerImpl 类中的有对应的消息监听方法,这个消息监听的方法是从Consumer调用 start()方法就开始启动的,但是从Consumer的示例代码中会看到,并没有像是Producer那样的明显表示Receive的方法而是通过我们下面要分析的方法来实现。

文章目录

  • Consumer示例
    • 消息接收逻辑
    • ConsumeMessageConcurrentlyService 类
    • consumeMessageDirectly()
    • 方法调用链分析
    • @ChannelHandler.Sharable说明
  • 总结

Consumer示例

  下面就是在RocketMQ中提供的一段示例代码,可以看到大致分为三个步骤

  • 1、Instantiate with specified consumer group name.
  • 2、Subscribe one more more topics to consume.
  • 3、Register callback to execute on arrival of messages fetched from brokers.
  • 3、Launch the consumer instance.

  这里简单的来对比一下Producer的操作,如下

  • 1、Instantiate with a producer group name.
  • 2、Launch the instance.
  • 3、Create a message instance, specifying topic, tag and message body.
  • 4、Call send message to deliver message to one of brokers.

  通过上面的官方提示,对于Producer的SendMessage之前已经进行了追踪,那么Consumer端其他的操作在什么地方呢?


public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {/** Instantiate with specified consumer group name.*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** Subscribe one more more topics to consume.*/
//        consumer.subscribe("TopicTest", "*");consumer.subscribe("World", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/**  Launch the consumer instance.*/consumer.start();System.out.printf("Consumer Started.%n");}
}

消息接收逻辑

  在上面代码中有一段逻辑,并且从日志的输出效果中我们会看到这段逻辑的不同之处,从下面截图中会看到,在Consumer测试类最后加了一段输出代码,按照正常的输出流程,这段代码会在所有的消息都发送完成之后才会输出。因为代码是从上到下执行的,但是有一种情况这段代码的输出不会影响其他逻辑,那就是多线程,其他线程不会影响主线程的正常执行。那么这段多线程在什么地方执行呢?

  通过输出结果的跟进,会看到实际上进行输出结果的是在下面这段代码中,这段代码consumer注册了一个MessageListener,消息监听者,并且传入了一个 并发消息监听。既然是监听,那它监听的肯定是一个服务,到底是个什么样的服务呢?继续来跟进

跟进注册方法
  进入到注册方法中会看到,这个注册方法是由DefaultMQPushConsumer类来提供,并且传入的这个Listener被两个地方使用了

  • DefaultMQPushConsumer 类自身的 messageListener对象
  • DefaultMQPushConsumerImpl 类的registerMessageListener()方法所使用

      到这里,隐约的会觉得这个事情不是很简单。观察一下在注册的时候传入的对象其实是直接由接口实现的,而这个消息的读取也是直接从所定义的接口方法中来获取到的。在这个类的实现类中直接找到对应方法中并没有实际的调用逻辑

      通过查看类继承关系发现,直接使用匿名实现类的方式比较多一点,而实现类却是很少。

      通过上面的分析可以知道我们需要追踪的并不是MessageListenerConcurrently 接口的实现类。而是需要追踪它里面唯一的一个实现方法。如下。

      会发现这个方法被ConsumeMessageConcurrentlyService类中所调用。并且调用的对象是this.messageListener,从这里可以知道,其实MessageListener也实现了这个接口。真是这样么?其实并不是这样这个监听还是就是 MessageListenerConcurrently 对象;这里就来分析一下这个实现模式。

ConsumeMessageConcurrentlyService 类

  Consumer消息并发服务类,这个类实现了一个接口ConsumeMessageService,这个接口有如下的几个方法


public interface ConsumeMessageService {void start();void shutdown();void updateCorePoolSize(int corePoolSize);void incCorePoolSize();void decCorePoolSize();int getCorePoolSize();ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume);
}

  并且有如下的两个实现类,这两个实现类其中一个就是我们上面提到的。

  其实可以看到,这个接口提供的规则里面最为重要就是最后的两个方法。

  • consumeMessageDirectly() 直接使用消息
  • submitConsumeRequest() 提交Consumer请求

  下面就来分析这两个方法在ConsumeMessageConcurrentlyService类中的实现

consumeMessageDirectly()

  进入这个方法之前先来分析这个方法的两个参数

  • MessageExt: 消息扩展类
  • brokerName:Broker的Name,这个是通过配置文件中定义的

public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName)

方法调用链分析

  在分析这个方法之前先来整理一下方法的调用链,通过方法调用链来找到具体整个的数据传递的流程。
接口方法第一次被调用
  根据方法的追踪接口方法this.messageListener.consumeMessage(),第一次被调用是在ConsumeMessageConcurrentlyService 类的consumeMessageDirectly()方法中。进入到该方法,发现如果要使用下面这个方法的话需要有人调用这个ConsumeMessageConcurrentlyService类中的方法,那么在什么地方进行调用的呢?继续跟进

ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);

MQClientInstance中的consumeMessageDirectly()
  ConsumeMessageConcurrentlyService中的consumeMessageDirectly()的方法调用是被MQClientInstance类中的consumeMessageDirectly() 方法调用。是通过链式调用的方式,但是这里的链式调用并不需要太关心,应为既然调用了ConsumeMessageConcurrentlyService类中的方法那么它的上一步返回的就一定是对应的对象。这里我们关系在MQClientInstance中调用的方法。

ClientRemotingProcessor中的consumeMessageDirectly()
  在MQClientInstance中调用的方法是肯定是被其他跌俄方调用的,在ClientRemotingProcessor类中的consumeMessageDirectly()方法被调用了,跟之前一样,会看到,是一个MQClientInstance的对象,这个对象是Client和Server共同使用的。既然是ClientRemotingProcessor类中的方法就说明是Client的一个处理线程。

ClientRemotingProcessor的processRequest()方法
  继续跟进上面的方法,既然是一个线程处理方法,就需要有调用这个方法的入口,就追踪到了它的调用实现方法processRequest()。并且返回的是一个RemotingComand对象,这个对象之前说过的,被Request和Response共同使用。到这里会发现传入的参数都是一个Request为命名规则。

@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:return this.notifyConsumerIdsChanged(ctx, request);case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:return this.resetOffset(ctx, request);case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:return this.getConsumeStatus(ctx, request);case RequestCode.GET_CONSUMER_RUNNING_INFO:return this.getConsumerRunningInfo(ctx, request);case RequestCode.CONSUME_MESSAGE_DIRECTLY:return this.consumeMessageDirectly(ctx, request);default:break;}return null;}

NettyRemotingAbstract中的processRequestCommand方法调用
  追踪上面的request方法,进入到了一个Netty的抽象类,在这个抽象类中方法的调用是通过多线程的方式来进行的,这里看到了我们向往已久的东西,就是Response类,这个就是在Producer端我们分析的时候获取到的对象。也就是说从这里开始对象就进行了从Request到Response的交接。更为重要的一点是,这个多线程的调用。

  上面方法的调用是被如下的一个方法调用,传入和ChannelHandlerContext以及Request参数,在一个什么样上下文中处理一个Request参数,这个方法的功能很明确。方法体中对消息的类型进行了判断。但是这个地方到底在什么地方被调用的呢。就到了最终的调用方


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception


最终调用
  会看到最终的调用被封装到了NettyClientHandler对象中,但是这个对象实现了SimpleChannelInboundHandler类中的方法,而NettyClientHandler类是一个内部类,分别在NettyRemotingServer和NettyRemotingClient都有定义,并且实现的方法都是一样的,但是在NettyRemotingServer端定义的时候有如下的一个注解@ChannelHandler.Sharable,对于这个注解在后面的分析中会做说明,它是由Netty提供的。

@ChannelHandler.Sharable说明

  在上面的逻辑中出现了一个@ChannelHandler.Sharable注解,这个注解被Netty所提供。当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化的ByteBuf实例相关的内存。Netty为此提供了一个实用方法ReferenceCountUtil.release() 但是以这种方式管理资源可能很繁琐。
  一个更加简单的方式是使用SimpleChannelInboundHandler。 由于SimpleChannelInboundHandler会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。而上面这个注解,标注一个channel handler可以被多个channel安全地共享。
  ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline中。因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。用于这种用法的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

  到这里就可以理解为什么要使用这个注解了,在之前的Producer分析中再客户端start()方法中有这样一段代码,由于IDEA比较智能,它对未使用的对象所持有的态度就是不给它颜色。会看到这里的handler对象并没有被使用到,也就是说所有的对于这个对象的配置就只是简单的配置一下。

  但是,在NettyRemotingServer类的实现中有下面这个方法,这个方法的操作是在start()方法中被调用。

 private void prepareSharableHandlers() {handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);encoder = new NettyEncoder();connectionManageHandler = new NettyConnectManageHandler();serverHandler = new NettyServerHandler();}

&emps; 总结一下就是,所有的通过Channel的Connection创建以及Channel的创建都是来自于this.mQClientAPIImpl.start();方法而这个方法是被MQClientInstance中的start()方法调用,而MQClientInstance类则是把Producer 发送消息和 Consumer接收消息连接起来的关键类。

总结

  上面方法的整个的调用过程看上去很乱,但是实际上很简单,就只是将方法调用串联起来,并没有涉及到其中的一些其他参数以及配置的讨论,顺着上面的思路可以很好的了解Consumer的消息接收逻辑,总结一下就是从消息发送开始就已经准备开始接收了。其中有很多保证高效性的操作。在后续的分析中还会进行详解。这篇博客中最为关键的一点就是@ChannelHandler.Sharable注解的说明,以及this.mQClientAPIImpl.start()方法的使用时机。

从源码分析RocketMQ系列-Consumer消息接收逻辑相关推荐

  1. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  2. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  3. 从源码分析RocketMQ系列-Remoting通信架构源码详解

    导语   这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameSe ...

  4. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  5. 从源码分析RocketMQ系列-MQClientInstance类详解

    导语   在之前的分析中,看到有一个类MQClientInstance,这个无论是在Producer端还是在Consumer端都是很重要的一个类,很多的功能都是从这个类发起的,这边分享中就来详细的看看 ...

  6. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

  7. 从源码分析RocketMQ系列-start()方法详解

    导语   在之前的分析中主要介绍的是关于Producer 发送消息的逻辑,但是在实例代码中有一个操作是producer.start()方法,在Consumer中看到的方法是consumer.start ...

  8. 从源码分析RocketMQ系列-Producer的SendResult的封装

    导语   通过之前博客的Producer的SendResult来自哪里分析到发送的核心机制,了解了在发送之前被使用的几个Hook,以及发送消息的RequestHeader的封装,但是这些封装都被一个t ...

  9. 从源码分析RocketMQ系列-Producer的invokeSync()方法

    导语   在之前的博客中通过对于Producer中SendResult的跟踪找到了在Client模块下的所有的封装以及消费的过程,深入到对接Remoting模块的接口中对消息的封装以及发送回收等.但是 ...

最新文章

  1. 彩虹物语服务器维护,11.19《彩虹物语》服务器维护及数据互通公告
  2. Android studio 4.1 不显示光标当前的类名、方法名
  3. 数据中心背后的地缘政治学
  4. Anaconda简介及其下载 安装 配置 使用 卸载
  5. 六十三、Vue中非父子(兄弟)组件间传值,插槽的使用和作用域插槽(非常重要)
  6. TCP通信过程大讨论
  7. 修改Android Studio默认的gradle配置文件
  8. Bailian2765 POJ NOI0113-03 八进制小数【进制】
  9. Linux终端基本命令
  10. 78. Subsets 1
  11. 盲人画家:人真的有第三只眼
  12. 8大排序算法图文解说
  13. 数据库课程设计——实验报告管理系统(超详细)
  14. Java好还是网优好,java和seo哪个好
  15. 2022茶艺师(初级)试题及在线模拟考试
  16. 使用js脚本实现网页版微信定时发送信息
  17. 一种更简单的求最小平方均值函数(MSE)的方法 -- 梯度下降法。
  18. RT_Thread_空闲线程
  19. 计算机ip怎么换路由器,教你如何修改路由器LAN口IP地址的方法
  20. 巧用友盟UShare、ULink玩转裂变营销

热门文章

  1. 记一次简单的vue组件单元测试
  2. 看看80万程序员怎么评论:前端程序员会不会失业?
  3. Debian下PostgreSQL修改密码与配置详解
  4. 一个图的带权邻接表存储结构的应用
  5. MSSQL中使用CASE函数来灵活返回结果
  6. 220v转5v阻容降压电路
  7. 微信支付,银联支付,支付宝支付——三大支付总结
  8. 深入实践Spring Boot1.3 使用Spring Boot
  9. MapReduce编程(四) 求均值
  10. 【Android】1.1 开发环境安装和配置