消息消费流程图

当消息存储到broker后,启动消费者消费,每个消费者都是一个DefaultMQPushConsumer,都要实现consumeMessage方法,每次去拉取新消息,

消费者启动初始化

  • DefaultMQPushConsumerImpl#start方法
    public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;
//检测配置是否合法this.checkConfig();
//复制订阅信息this.copySubscription();
//设置消费者客户端实例名称为进程IDif (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}
//创建MQClient实例用于请求broker端this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
//创建负载均衡器this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {//消息消费广播模式,将消费进度保存在本地case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;//消息消费集群模式,将消费进度保存在远端Broker,默认为集群模式case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();//创建顺序消息消费服务if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());}//创建并发消息消费服务else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;//把我们实例里面的consume方法注册到该方法里的messageListener初始化this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//消息消费服务启动this.consumeMessageService.start();//注册消费者实例boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//启动消费者客户端mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();}
  • mQClientFactory.start()这个方法(MQClientInstance#start)
   public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// 拉数据this.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}
  • 向broker发送消息请求最新

作者:hcq0514
链接:https://www.jianshu.com/p/4757079f871f
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

RocketMQ 消息发送与消费源码分析 - 简书

RocketMQ消息消费源码分析相关推荐

  1. RocketMQ消息消费源码分析(二消息的消费)

    首先回到DefaultMQPushConsumerImpl  start方法 public synchronized void start() throws MQClientException {sw ...

  2. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  3. producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

    有序消息 消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序. 顺序消息生产者 public static vo ...

  4. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

  5. 消息长度_nsq消息队列源码分析

    nsq的源码比较简单,值得一读,特别是golang开发人员,下面重点介绍nsqd,nsqd是nsq的核心,其他的都是辅助工具,看完这篇文章希望你能对消息队列的原理和实现有一定的了解. nsqd是一个守 ...

  6. Android Handler消息机制源码分析

    一,前言 众多周知, Android 只允许在主线程中更新UI,因此主线程也称为UI线程(ActivityThread). 如此设计原因有二: (1) 由于UI操作的方法都不是线程安全的,如果多个线程 ...

  7. rocketmq client端源码分析(1)-consumer实现

    rocketmq客户端实现如果集成了spring-boot则写一个监听就可以实现业务逻辑.这个流程是这样的呢. 首先我们实现了监听接口RocketMQListener或者RocketMQReplyLi ...

  8. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  9. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  10. DDPush开源推送框架源码分析之APPServer到DDPush

    DDPush 任意门推送 DDPush是什么 DDPush (Dimension Door Push),任意门推送,是一款开源免费的单机千万级实时信息推送服务器,使用Java语言开发,具有简单.稳定. ...

最新文章

  1. 与基础事务管理器通讯失败
  2. IT人 不要一辈子靠技术生存(转)
  3. JavaScript正则表达式-基础入门
  4. 2017年上半年全国高等学校安徽考区计算机水平考试,教务处关于2017年上半年全国高等学校(安徽考区)计算机水平考试报名的通知-教务处...
  5. 如何在Ubuntu 16.04上安装和配置GitLab
  6. python调用activateMQ进行数据传输
  7. zabbix 3.2 mysql_zabbix3.2的server和zabbix-agent2.2怎么监控MySQL的办法
  8. sql 语句中where条件和jion on条件的区别
  9. 膜拜高手!Python竟然开发命令行版网易云音乐!
  10. python破解b站验证码实现登陆
  11. 将网站上的web字体base,woff2转换成.ttf系统字体
  12. JavaScript之堆栈溢出
  13. 用友U9 UAP平台解析
  14. C#窗体调用地图(高德地图)-实现公交线路查询
  15. 电视电脑盒子,一机两用,电视秒变电脑
  16. 数据结构上机实验6.29
  17. 使用MQTTNet包实现客户端与服务端通讯
  18. 为FireFox增加自定义搜索引擎
  19. 计量经济学复习笔记(六)updated
  20. 微信气泡主题设置_微信主题! 米老鼠微信主题气泡设置教程方法

热门文章

  1. 如何在Windows上制作一个包含.lib和.dll的Rust Crate包
  2. jqgrid 获取所有行数据
  3. Atitit. Atiposter 发帖机 新特性 poster new feature   v7 q39
  4. ecplise反编译插件
  5. Autodesk MapGuide Enterprise 2012开发技术入门培训视频录像下载
  6. Windows服务器系统的端口要求
  7. TCP/IP源码分析
  8. linux内核分析及应用 -- Linux 网络层数据流分析(上)
  9. STM32相关问题解决方法
  10. WIN10下搭建vue开发环境