SubscriptionState分析
KafkaConsumer从Kafka拉取消息的时候发送的请求时FetchRequest,在其中需要指定消费者希望拉取的起始消息的offset。为了消费者快速获取这个值,KafkaConsumer使用SubscriptionState来追踪TopicPartition与offset的关系:
一 SubscriptionType
NONE: 没有状态
AUTO_TOPICS:按照指定的topic 名字进行订阅,自动分配分区
AUTO_PATTERN:按照正则表达式匹配topic名字进行订阅,自动分配分区
USER_ASSIGNED:用户自己指定订阅的topic以及分区号
二TopicPartitionState
表示TopicPartition的消费状态,有以下几个字段:
position: 下一次要从kafka服务器端获取消息的offset
committed: 最近一次提交的offset
paused: 当前TopicPartition是否暂停消费
resetStrategy:重置position策略,该字段是否为空也表示是否需要重置position
三 SubscriptionState
3.1 比较核心的字段:
SubscriptionType:订阅类型
Pattern subscribedPattern:使用AUTO_PATTERN订阅模式,按照该字段的正则表达式进行topic匹配
Set<String> subscription: 针对自动分配分区用户请求的topic列表,我们可以通过changeSubscription方法添加topic
Set<TopicPartition> userAssignment:针对USER_ASSIGNED订阅模式,该集合包含分配给当前消费者的TopicPartition集合
PartitionStates<TopicPartitionState>assignment: 维护了一个<Topic
Partition->TopicPartitionState>的映射,而且还可以让这个map的元素可以旋转
Set<String> groupSubscription: 我们知道消费者组会有一个leader,它会使用该集合记录消费者组中所有消费者的订阅的topic,
而其他follower的该集合只保存自己的订阅的topic
boolean needsFetchCommittedOffsets: 我们需要从coordinator获取最近的提交的offset吗?
OffsetResetStrategy defaultResetStrategy: 默认的offset重设策略
ConsumerRebalanceListener listener: 用于监听分配分区操作
3.2 核心的方法
/**
* 设置AUTO_TOPICS订阅类型,初始化监听器,检测topic是否发生变化,如果发生变化重置
* @param topics
* @param listener
*/
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListenercannot be null");
// 设置订阅类型,是自动分配还是用户分配
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 初始化消费者再平衡监听器
this.listener = listener;
// 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscription
changeSubscription(topics);
}
/*** 设置AUTO_PATTERN订阅类型,检测topic是否发生变化,如果发生变化重置* @param topics*/ public void subscribeFromPattern(Set<String> topics) {if (subscriptionType != SubscriptionType.AUTO_PATTERN)throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " +subscriptionType);// 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscriptionchangeSubscription(topics); }
/*** 当订阅的topic集合发生变化时,重置订阅的主题集合,并且把所有的topic添加到groupSubscription* @param topicsToSubscribe*/ private void changeSubscription(Set<String> topicsToSubscribe) {if (!this.subscription.equals(topicsToSubscribe)) {this.subscription = topicsToSubscribe;// 消费者自身订阅的topic添加到groupSubscriptionthis.groupSubscription.addAll(topicsToSubscribe);} }
/*** leader收到JoinGroupResponse的时候,包含了全部消费者的topic,此时将topic添加到groupSubscription* @param topics*/ public void groupSubscribe(Collection<String> topics) {if (this.subscriptionType == SubscriptionType.USER_ASSIGNED)throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);this.groupSubscription.addAll(topics); }/*** 重置groupSubscription的订阅,只包含该用户订阅的主题。*/ public void resetGroupSubscription() {this.groupSubscription.retainAll(subscription); }
/**获取分配的分区的状态,然后看哪些分区是可以fetch数据,返回那些可以fetch数据的topic partition*/ public List<TopicPartition> fetchablePartitions() {List<TopicPartition> fetchable = new ArrayList<>();for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {if (state.value().isFetchable())fetchable.add(state.topicPartition());}return fetchable; }
// 取消订阅 public void unsubscribe() {this.subscription = Collections.emptySet();this.userAssignment = Collections.emptySet();this.assignment.clear();this.subscribedPattern = null;this.subscriptionType = SubscriptionType.NONE; }
/**根据coordinator返回的partitions改变这assignment */ public void assignFromSubscribed(Collection<TopicPartition> assignments) {if (!this.partitionsAutoAssigned())throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");for (TopicPartition tp : assignments)if (!this.subscription.contains(tp.topic()))throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");// after rebalancing, we always reinitialize the assignment valuethis.assignment.set(partitionToStateMap(assignments));this.needsFetchCommittedOffsets = true; } /**根据指定的partitions集合,构建一个<partition,TopicPartitionState>映射*/ private Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());for (TopicPartition tp : assignments)map.put(tp, new TopicPartitionState());return map; }
/** 根据用户提供的指定的partitions 改变assignment */ public void assignFromUser(Set<TopicPartition> partitions) {setSubscriptionType(SubscriptionType.USER_ASSIGNED);if (!this.assignment.partitionSet().equals(partitions)) {this.userAssignment = partitions;Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();for (TopicPartition partition : partitions) {TopicPartitionState state = assignment.stateValue(partition);if (state == null)state = new TopicPartitionState();partitionToState.put(partition, state);}this.assignment.set(partitionToState);this.needsFetchCommittedOffsets = true;} }
// 判断下一次消费的消息的offset是否为空 public boolean hasValidPosition() {return position != null; }
// 判断下一次消费的位置是否为空 public boolean hasAllFetchPositions() {for (TopicPartitionState state : assignment.partitionStateValues())// 下一次消费的位置为空,返回falseif (!state.hasValidPosition())return false;return true; }// 如果下一次消费位置没有,则返回那些TopicPartition集合 public Set<TopicPartition> missingFetchPositions() {Set<TopicPartition> missing = new HashSet<>();for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {if (!state.value().hasValidPosition())missing.add(state.topicPartition());}return missing; }
// 判断该分区是否分配了,而且是否可以fetch数据了 public boolean isFetchable(TopicPartition tp) {return isAssigned(tp) && assignedState(tp).isFetchable(); }
SubscriptionState分析相关推荐
- 消费者rebalance机制分析
一 触发rebalance的时机 # 有新的消费者加入 # 有消费者宕机或者下线 # 消费者主动退出消费者组 # 消费者组订阅的topic出现分区数量变化 # 消费者调用unsubscrible取消对 ...
- Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...
- 你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析
目录: <Kafka Producer设计分析> <KafkaProducer类代码分析> <RecordAccumulator类代码分析> <Sender类 ...
- linphone-LinphoneManager.java文件分析
介绍 本篇主要是对个人对LinphoneManger类的理解及对上面的注释,这是对linphone研究的一个开始. 会慢慢对linphone逐步分析, 随着时间的推进, 我会对linphone有进一步 ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- 2022-2028年中国自动驾驶系统行业现状调研分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...
- 2022-2028年中国阻尼涂料市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...
- 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...
- 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告
[报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...
最新文章
- 如果当前没有拿得出手的简历,也别慌,努力的话最多两年情况就能改变
- Opengl-基本章节的学习成果
- 6段Python代码刻画深度学习历史:从最小二乘法到深度神经网络
- js定时器和linux命令locate
- 对比两个字符串相等_字符串匹配问题
- Oracle对象被锁如何处理
- mysql applicationcontext.xml_配置applicationcontext.xml文件
- win7录屏_学用系列|清晰、体积小,这些录屏工具适合正在为录屏苦恼的你
- java 获取图片路径_Java获取文件路径的几种方式
- oracle如何恢复被误误删除的pdb
- BZOJ4198: [Noi2015]荷马史诗(哈夫曼树)
- #GeekPoint# 苹果的 AR 眼镜
- 安卓ui xml_创建声明性XML UI语言
- mail.yahoo.com.cn:yahoo邮箱用outlook无法发信问题的解决办法
- Android OpenGL ES 3.0 粒子特效
- SystemVerilog学习-02-数据类型
- word换pdf并且自动生成目录
- 软件测试之搜索框功能点用例梳理
- 自动拖取win10聚焦壁纸到桌面
- 美国python课程 得a_干货:五门CS基础课推荐(价值两万多美元的美国名校课程,零基础,转专业都可学!...
热门文章
- php保存ppt,ppt怎么保存到电脑桌面?
- 池化技术及jdk的线程池讲解
- MAC 安装brew raw.githubusercontent.com port 443: Connection refused 本人亲自认证过,踩过多种方案,最终认证的解决方案
- jena 开发之 mysql数据导入_在Jena框架下基于MySQL数据库实现本体的存取操作
- PostgreSQL中的pg_hba.conf
- word 2007如何插入参考文献引用
- 华为开发者大会鸿蒙2.0系统,鸿蒙2.0来了!华为开发者大会HDC 2020宣布
- elasticsearch 条件去重_Elasticsearch学习之查询去重
- pandas series 判断是否包含某个值
- 华为Android10版怎么截屏,安卓手机截图方法 华为手机如何截图 - 云骑士一键重装系统...