KafkaConsumer从Kafka拉取消息的时候发送的请求时FetchRequest,在其中需要指定消费者希望拉取的起始消息的offset。为了消费者快速获取这个值,KafkaConsumer使用SubscriptionState来追踪TopicPartition与offset的关系:

一 SubscriptionType

NONE: 没有状态

AUTO_TOPICS:按照指定的topic 名字进行订阅,自动分配分区

AUTO_PATTERN:按照正则表达式匹配topic名字进行订阅,自动分配分区

USER_ASSIGNED:用户自己指定订阅的topic以及分区号

二TopicPartitionState

表示TopicPartition的消费状态,有以下几个字段:

position: 下一次要从kafka服务器端获取消息的offset

committed: 最近一次提交的offset

paused: 当前TopicPartition是否暂停消费

resetStrategy:重置position策略,该字段是否为空也表示是否需要重置position

三 SubscriptionState

3.1 比较核心的字段:

SubscriptionType:订阅类型

Pattern subscribedPattern:使用AUTO_PATTERN订阅模式,按照该字段的正则表达式进行topic匹配

Set<String> subscription: 针对自动分配分区用户请求的topic列表,我们可以通过changeSubscription方法添加topic

Set<TopicPartition> userAssignment:针对USER_ASSIGNED订阅模式,该集合包含分配给当前消费者的TopicPartition集合

PartitionStates<TopicPartitionState>assignment: 维护了一个<Topic

Partition->TopicPartitionState>的映射,而且还可以让这个map的元素可以旋转

Set<String> groupSubscription: 我们知道消费者组会有一个leader,它会使用该集合记录消费者组中所有消费者的订阅的topic,

而其他follower的该集合只保存自己的订阅的topic

boolean needsFetchCommittedOffsets: 我们需要从coordinator获取最近的提交的offset吗?

OffsetResetStrategy defaultResetStrategy: 默认的offset重设策略

ConsumerRebalanceListener listener: 用于监听分配分区操作

3.2 核心的方法

/**
 * 设置AUTO_TOPICS订阅类型,初始化监听器,检测topic是否发生变化,如果发生变化重置
 * @param topics
 * @param listener
 */
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListenercannot be null");
    // 设置订阅类型,是自动分配还是用户分配
   
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
    // 初始化消费者再平衡监听器
   
this.listener = listener;
    // 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscription
   
changeSubscription(topics);
}

/*** 设置AUTO_PATTERN订阅类型,检测topic是否发生变化,如果发生变化重置* @param topics*/
public void subscribeFromPattern(Set<String> topics) {if (subscriptionType != SubscriptionType.AUTO_PATTERN)throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +subscriptionType);// 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscriptionchangeSubscription(topics);
}
/*** 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscription* @param topicsToSubscribe*/
private void changeSubscription(Set<String> topicsToSubscribe) {if (!this.subscription.equals(topicsToSubscribe)) {this.subscription = topicsToSubscribe;// 消费者自身订阅的topic添加到groupSubscriptionthis.groupSubscription.addAll(topicsToSubscribe);}
}
 
 
/*** leader收到JoinGroupResponse的时候,包含了全部消费者的topic,此时将topic添加到groupSubscription* @param topics*/
public void groupSubscribe(Collection<String> topics) {if (this.subscriptionType == SubscriptionType.USER_ASSIGNED)throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);this.groupSubscription.addAll(topics);
}/*** 重置groupSubscription的订阅,只包含该用户订阅的主题。*/
public void resetGroupSubscription() {this.groupSubscription.retainAll(subscription);
}
/**获取分配的分区的状态,然后看哪些分区是可以fetch数据,返回那些可以fetch数据的topic partition*/
public List<TopicPartition> fetchablePartitions() {List<TopicPartition> fetchable = new ArrayList<>();for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {if (state.value().isFetchable())fetchable.add(state.topicPartition());}return fetchable;
}
// 取消订阅
public void unsubscribe() {this.subscription = Collections.emptySet();this.userAssignment = Collections.emptySet();this.assignment.clear();this.subscribedPattern = null;this.subscriptionType = SubscriptionType.NONE;
}
/**根据coordinator返回的partitions改变这assignment */
public void assignFromSubscribed(Collection<TopicPartition> assignments) {if (!this.partitionsAutoAssigned())throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");for (TopicPartition tp : assignments)if (!this.subscription.contains(tp.topic()))throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");// after rebalancing, we always reinitialize the assignment valuethis.assignment.set(partitionToStateMap(assignments));this.needsFetchCommittedOffsets = true;
}
/**根据指定的partitions集合,构建一个<partition,TopicPartitionState>映射*/
private Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());for (TopicPartition tp : assignments)map.put(tp, new TopicPartitionState());return map;
}
/** 根据用户提供的指定的partitions 改变assignment */
public void assignFromUser(Set<TopicPartition> partitions) {setSubscriptionType(SubscriptionType.USER_ASSIGNED);if (!this.assignment.partitionSet().equals(partitions)) {this.userAssignment = partitions;Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();for (TopicPartition partition : partitions) {TopicPartitionState state = assignment.stateValue(partition);if (state == null)state = new TopicPartitionState();partitionToState.put(partition, state);}this.assignment.set(partitionToState);this.needsFetchCommittedOffsets = true;}
}
// 判断下一次消费的消息的offset是否为空
public boolean hasValidPosition() {return position != null;
}
// 判断下一次消费的位置是否为空
public boolean hasAllFetchPositions() {for (TopicPartitionState state : assignment.partitionStateValues())// 下一次消费的位置为空,返回falseif (!state.hasValidPosition())return false;return true;
}// 如果下一次消费位置没有,则返回那些TopicPartition集合
public Set<TopicPartition> missingFetchPositions() {Set<TopicPartition> missing = new HashSet<>();for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {if (!state.value().hasValidPosition())missing.add(state.topicPartition());}return missing;
}
// 判断该分区是否分配了,而且是否可以fetch数据了
public boolean isFetchable(TopicPartition tp) {return isAssigned(tp) && assignedState(tp).isFetchable();
}

SubscriptionState分析相关推荐

  1. 消费者rebalance机制分析

    一 触发rebalance的时机 # 有新的消费者加入 # 有消费者宕机或者下线 # 消费者主动退出消费者组 # 消费者组订阅的topic出现分区数量变化 # 消费者调用unsubscrible取消对 ...

  2. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

  3. 你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析

    目录: <Kafka Producer设计分析> <KafkaProducer类代码分析> <RecordAccumulator类代码分析> <Sender类 ...

  4. linphone-LinphoneManager.java文件分析

    介绍 本篇主要是对个人对LinphoneManger类的理解及对上面的注释,这是对linphone研究的一个开始. 会慢慢对linphone逐步分析, 随着时间的推进, 我会对linphone有进一步 ...

  5. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  6. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  7. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  8. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  9. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

最新文章

  1. 如果当前没有拿得出手的简历,也别慌,努力的话最多两年情况就能改变
  2. Opengl-基本章节的学习成果
  3. 6段Python代码刻画深度学习历史:从最小二乘法到深度神经网络
  4. js定时器和linux命令locate
  5. 对比两个字符串相等_字符串匹配问题
  6. Oracle对象被锁如何处理
  7. mysql applicationcontext.xml_配置applicationcontext.xml文件
  8. win7录屏_学用系列|清晰、体积小,这些录屏工具适合正在为录屏苦恼的你
  9. java 获取图片路径_Java获取文件路径的几种方式
  10. oracle如何恢复被误误删除的pdb
  11. BZOJ4198: [Noi2015]荷马史诗(哈夫曼树)
  12. #GeekPoint# 苹果的 AR 眼镜
  13. 安卓ui xml_创建声明性XML UI语言
  14. mail.yahoo.com.cn:yahoo邮箱用outlook无法发信问题的解决办法
  15. Android OpenGL ES 3.0 粒子特效
  16. SystemVerilog学习-02-数据类型
  17. word换pdf并且自动生成目录
  18. 软件测试之搜索框功能点用例梳理
  19. 自动拖取win10聚焦壁纸到桌面
  20. 美国python课程 得a_干货:五门CS基础课推荐(价值两万多美元的美国名校课程,零基础,转专业都可学!...

热门文章

  1. php保存ppt,ppt怎么保存到电脑桌面?
  2. 池化技术及jdk的线程池讲解
  3. MAC 安装brew raw.githubusercontent.com port 443: Connection refused 本人亲自认证过,踩过多种方案,最终认证的解决方案
  4. jena 开发之 mysql数据导入_在Jena框架下基于MySQL数据库实现本体的存取操作
  5. PostgreSQL中的pg_hba.conf
  6. word 2007如何插入参考文献引用
  7. 华为开发者大会鸿蒙2.0系统,鸿蒙2.0来了!华为开发者大会HDC 2020宣布
  8. elasticsearch 条件去重_Elasticsearch学习之查询去重
  9. pandas series 判断是否包含某个值
  10. 华为Android10版怎么截屏,安卓手机截图方法 华为手机如何截图 - 云骑士一键重装系统...