首先放上RocketMQ网络结构图,如下所示:

Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息。然后Producer还与Broker的Master结点建立长连接,用于发送消息。此外Producer还与Master维持了一个心跳。
Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息。然后Consumer还与Broker的Master和Slave结点建立长连接,用于订阅消息。此外Consumer还与Master和lslave维持了一个心跳。
以上就是RocketMQ所有的心跳机制。

客户端发送心跳

在Producer和Consumer启动时,会通过 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();发起心跳请求。此外在MQClientInstance启动时,会启动一个定时任务 this.startScheduledTask();。里面包含了各种各样的定时任务,其中就包括定期发送心跳信息到Broker。

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); // 发送心跳,默认时间间隔30秒} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

sendHeartbeanToAllBrokerWithLock在sendHeartbeatToAllBroker上加了锁,避免心跳混乱,没有什么特别之处。我们来看看sendHeartbeatToAllBroker做了什么:

 private void sendHeartbeatToAllBroker() {// 首先准备好心跳信息,主要是Producer和comsumer相关信息,内容后面会具体分析final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();if (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer");return;}if (!this.brokerAddrTable.isEmpty()) {long times = this.sendHeartbeatTimesTotal.getAndIncrement();Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {if (consumerEmpty) { // 如果没有conumser则剔除掉slave结点,因为producer只需要与master维持心跳即可if (id != MixAll.MASTER_ID)continue;}try {int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}this.brokerVersionTable.get(brokerName).put(addr, version);if (times % 20 == 0) {// 减少日志频率log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr);}}}}}}}}

这里主要做了两个工作:
1-预备好心跳信息
2-发送心跳
其中根据客户端的类型不同,发送的对象会又差别。如果是Producer启动,那么MQClientInstance里面的conumser是空的,那么会剔除掉Broker的slave结点,只向master发送心跳。如果是是Consumer启动,那么MQClientInstance里面的consumer不为空,就会向所有的broker结点发送心跳。
sendHearbeat()非常简单,包装RemotingCommand对象,然后就是RemotingClient的调用了,涉及到Netty通讯了。这个之前已经讨论过,具体可以参考:RocketMQ是如何通讯的?
发送心跳返回的是broker端MQ的版本号,拿到后会更新本地保存的broker版本控制信息。

心跳内容

心跳内容比较简单,包括客户端id,生产者信息和消费者信息,一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet有一个为空。但也不排除有的应用既是生产者,也是消费者,这种情况下producerDataSet和consumerDataSet都不为空。

public class HeartbeatData extends RemotingSerializable {private String clientID;private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();}

接下来,我们一个个分析,先看看ProducerData是什么:

public class ProducerData {private String groupName;
}

这非常简单了。。。就一个groupName,不需要过多解释了。下面再看ConsumerData:

public class ConsumerData {private String groupName; // 分组private ConsumeType consumeType; // 消费类型,有推模式和拉模式两种private MessageModel messageModel;// 消息类型,广播和集群消费两种private ConsumeFromWhere consumeFromWhere; // 从何处开始消费,从一开始偏移量,从最后偏移量,或者按时间戳消费。private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();// 订阅信息private boolean unitMode; // 单元模式,默认是false。这个与topic有关,但是没看懂拿来干嘛的?
}

ConsumerData主要包括的消费者的一些配置信息,如果写过消费者代码,对这些还是很熟悉的。其中SubscriptData是订阅信息,结构如下:

public class SubscriptionData implements Comparable<SubscriptionData> {public final static String SUB_ALL = "*"; // 常量,默认订阅所有tag类型消息private boolean classFilterMode = false;private String topic;// 主题private String subString;// 订阅表达式,例如"taga || tagb"private Set<String> tagsSet = new HashSet<String>();// tag列表private Set<Integer> codeSet = new HashSet<Integer>();// tag的hashcode列表private long subVersion = System.currentTimeMillis();private String expressionType; //订阅表达式类型,有tag模式和sql模式@JSONField(serialize = false)private String filterClassSource;
}

订阅信息里面比较常用的就是topic和subString,我们消费订阅信息主要就是这俩。例如:

consumer.subscribe("topic", "TagA || TagC || TagD");

其中的"TagA || TagC || TagD"就是这里的subString,创建订阅信息的时候,subString会被分割成TagA、TagB、TagD,然后保存至tagsSet里面。他们的hashcode会保存到codeSet里面。
以上就是心跳的所有内容。

Broker处理心跳

Broker处理心跳是在ClientManageProcessor中处理的,对于ProducerData的内容处理很简单,直接注册producer,把producer的ClientChannelInfo保存下来,后面与producer通讯的时候会用到。对于Consumer的处理就稍微复杂一点,除了注册consumer之外,如果消费分组配置不为空的话,还会创建一个用于重试的topic,这个在消息重新消费时有用。这部分在后面介绍consumer消费消息时会再次提到。

  public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerData data : heartbeatData.getConsumerDataSet()) {SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}String newTopic = MixAll.getRetryTopic(data.getGroupName());this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

为什么Producer不与NameSrv维持心跳呢

这个问题的同类问题是,为什么Consumer不与NameSrv维持心跳?或者说,为什么Broker不与NameSrv维持心跳?其实Producer、Consumer、Broker都与NameSrv有“维持心跳”的动作,就是Producer、Consumer定期从NameSrv拉取Topic路由信息,Broker定期向NameSrv注册包装了Topic路由的broker信息,只是它们没有明显的使用HeartbeatData相关的写法。HeartbeatData相关的内容都在common工程下的protocol.heartbeat包下:

有个可能的原因是,客户端和broker的心跳维持信息比较复杂,不像客户端与NameSrv、Broker与NameSrv那样需求几乎稳定不变,所以作者单独写了心跳模块。至于真实的原因是什么,我目前也不知道。

RocketMQ如何维持心跳相关推荐

  1. Java总结 - 基础知识

    文章目录 - - -计算机技术演化- - - 1 编程语言演化 1.1 写在最前 1.2 汇编 1.3 VB->C->C++ 1.4 Java(Sun公司) 1.5 Java演变 2 技术 ...

  2. rocketmq源码分析之broker心跳检测

    1.BrokerController发送心跳包 org.apache.rocketmq.broker.BrokerController#start() // 向所有的NameSrv注册Broker信息 ...

  3. rocketmq 组监听_最全的RocketMQ学习指南,程序员必备的中间件技能

    一.简介 RocketMq是阿里开发出来的一个消息中间件,后捐献给Apache.官网上是这样介绍的: Apache RocketMQ™ is a unified messaging engine, l ...

  4. 17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列

    点击上方"方志朋",选择"置顶公众号" 技术文章第一时间送达! 作者:28cm不含头(来自:知乎) 原文链接: https://www.zhihu.com/qu ...

  5. RocketMQ 基本概念

    2019独角兽企业重金招聘Python工程师标准>>> 名称解析 Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息. Consumer:消息消费者,负责消费消息 ...

  6. 深入剖析RocketMQ源码-NameServer

    作者:vivo互联网服务器团队-Ye Wenhao 一.RocketMQ架构简介 1.1 逻辑部署图 (图片来自网络) 1.2 核心组件说明 通过上图可以看到,RocketMQ的核心组件主要包括4个, ...

  7. 进阶必看的 RocketMQ ,就这篇了

    RocketMQ 整体架构设计 整体的架构设计主要分为四大部分,分别是:Producer.Consumer.Broker.NameServer. 为了更贴合实际,我画的都是集群部署,像 Broker ...

  8. 大写的服,看完这篇你还不懂RocketMQ算我输

    目录 RocketMQ介绍 RocketMQ概念 为什么要用RocketMQ? 异步解耦 削峰填谷 分布式事务最终一致性 数据分发 RocketMQ架构 RocketMQ消息类型 普通消息 顺序消息 ...

  9. RocketMq 的最佳实践

    1 生产者 1.1 发送消息注意事项 1 Tags的使用 一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识.tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订 ...

  10. Spring Cloud Alibaba 消息队列:基于 RocketMQ 实现服务异步通信

    本讲咱们将学习以下三方面内容: 介绍消息队列与 Alibaba RocketMQ: 掌握 RocketMQ 的部署方式: 讲解微服务接入 RocketMQ 的开发技巧: 首先咱们先来认识什么是消息队列 ...

最新文章

  1. 跨时钟域设置set_false_path的问题
  2. bootstrap-翻页-对齐链接
  3. java transaction cn,JAVA中如何用TRANSACTION来对数据库进行
  4. python入门基础系列_Python3基础系列-基本入门语法
  5. RequestBody注解
  6. c/c++整理--c++面向对象(5)
  7. 微信公众开放平台开发08---纯java 实现微信开发:编写自定义菜单
  8. sqlserver中某列转成以逗号连接的字符串及逆转、数据行转列列转行
  9. ES权威指南[官方文档学习笔记]-12 more complicated searches
  10. CODEVS 3657 括号序列
  11. eclipse中文版设置字体大小
  12. JAVA中无法加载主类什么意思_找不到或无法加载主类什么意思?
  13. LinuxC:锁、条件变量、信号量实现线程间的同步 生产者与消费者 pthread_mutex_init pthread_cond_init sem_init
  14. 我要拿走你的蜡烛 1004
  15. 使用flask从零构建自动化运维平台系列三
  16. netlink使用方法
  17. 如何利用PuTTY连接Windows主机和Linux虚拟机
  18. 最易理解的C语言教学 第一章
  19. [JZOJ5952] 凯旋而归 ([BZOJ 5092]【Lydsy1711月赛】分割序列)【高维前缀和】【DP】
  20. 轮廓线DP(插头DP 裸 经典骨牌)

热门文章

  1. 手机版的python如何运行常用数列结构_Python新手学习基础之数据结构-列表1
  2. F5 LTM1600 HA
  3. Rplot函数图形参数设置
  4. 傻瓜式自制鼠标光标,超简单
  5. 英语思维导图大全 介词(七)
  6. 使用链表实现栈stack
  7. ARP缓存表过期问题
  8. Serialization assertion safeVersionRead == safeSerializationVersion failed.
  9. 股票控件android,一个Android股票应用(含源码)
  10. AWS的下一站:3.8万亿美元的企业IT市场