聊聊flink的InternalTimeServiceManager
序
本文主要研究一下flink的InternalTimeServiceManager
InternalTimeServiceManager
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@Internal
public class InternalTimeServiceManager<K> {@VisibleForTestingstatic final String TIMER_STATE_PREFIX = "_timer_state";@VisibleForTestingstatic final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";@VisibleForTestingstatic final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";private final KeyGroupRange localKeyGroupRange;private final KeyContext keyContext;private final PriorityQueueSetFactory priorityQueueSetFactory;private final ProcessingTimeService processingTimeService;private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;private final boolean useLegacySynchronousSnapshots;InternalTimeServiceManager(KeyGroupRange localKeyGroupRange,KeyContext keyContext,PriorityQueueSetFactory priorityQueueSetFactory,ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);this.keyContext = Preconditions.checkNotNull(keyContext);this.processingTimeService = Preconditions.checkNotNull(processingTimeService);this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;this.timerServices = new HashMap<>();}@SuppressWarnings("unchecked")public <N> InternalTimerService<N> getInternalTimerService(String name,TimerSerializer<K, N> timerSerializer,Triggerable<K, N> triggerable) {InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}@SuppressWarnings("unchecked")<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {timerService = new InternalTimerServiceImpl<>(localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));timerServices.put(name, timerService);}return timerService;}Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {return Collections.unmodifiableMap(timerServices);}private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(String name,TimerSerializer<K, N> timerSerializer) {return priorityQueueSetFactory.create(name,timerSerializer);}public void advanceWatermark(Watermark watermark) throws Exception {for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {service.advanceWatermark(watermark.getTimestamp());}}// Fault Tolerance Methods ///public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {Preconditions.checkState(useLegacySynchronousSnapshots);InternalTimerServiceSerializationProxy<K> serializationProxy =new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);serializationProxy.write(stream);}public void restoreStateForKeyGroup(InputStream stream,int keyGroupIdx,ClassLoader userCodeClassLoader) throws IOException {InternalTimerServiceSerializationProxy<K> serializationProxy =new InternalTimerServiceSerializationProxy<>(this,userCodeClassLoader,keyGroupIdx);serializationProxy.read(stream);}Methods used ONLY IN TESTS @VisibleForTestingpublic int numProcessingTimeTimers() {int count = 0;for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {count += timerService.numProcessingTimeTimers();}return count;}@VisibleForTestingpublic int numEventTimeTimers() {int count = 0;for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {count += timerService.numEventTimeTimers();}return count;}
}
复制代码
- InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
- getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
- registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
PriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
public interface PriorityQueueSetFactory {@Nonnull<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName,@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
复制代码
- PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口
HeapPriorityQueueElement
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@Internal
public interface HeapPriorityQueueElement {/*** The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any* {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when* elements are removed from a {@link HeapPriorityQueue}.*/int NOT_CONTAINED = Integer.MIN_VALUE;/*** Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.*/int getInternalIndex();/*** Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning* {@link HeapPriorityQueue}.** @param newIndex the new index in the timer heap.*/void setInternalIndex(int newIndex);
}
复制代码
- HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法
PriorityComparable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java
public interface PriorityComparable<T> {int comparePriorityTo(@Nonnull T other);
}
复制代码
- PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对
Keyed
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java
public interface Keyed<K> {K getKey();
}
复制代码
- Keyed接口定义了getKey方法,用于返回该对象的key
InternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java
@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {/** Function to extract the key from a {@link InternalTimer}. */KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;/** Function to compare instances of {@link InternalTimer}. */PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =(left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());/*** Returns the timestamp of the timer. This value determines the point in time when the timer will fire.*/long getTimestamp();/*** Returns the key that is bound to this timer.*/@Nonnull@OverrideK getKey();/*** Returns the namespace that is bound to this timer.*/@NonnullN getNamespace();
}
复制代码
- InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR
TimerHeapInternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {/** The key for which the timer is scoped. */@Nonnullprivate final K key;/** The namespace for which the timer is scoped. */@Nonnullprivate final N namespace;/** The expiration timestamp. */private final long timestamp;private transient int timerHeapIndex;TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {this.timestamp = timestamp;this.key = key;this.namespace = namespace;this.timerHeapIndex = NOT_CONTAINED;}@Overridepublic long getTimestamp() {return timestamp;}@Nonnull@Overridepublic K getKey() {return key;}@Nonnull@Overridepublic N getNamespace() {return namespace;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o instanceof InternalTimer) {InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;return timestamp == timer.getTimestamp()&& key.equals(timer.getKey())&& namespace.equals(timer.getNamespace());}return false;}@Overridepublic int getInternalIndex() {return timerHeapIndex;}@Overridepublic void setInternalIndex(int newIndex) {this.timerHeapIndex = newIndex;}void removedFromTimerQueue() {setInternalIndex(NOT_CONTAINED);}@Overridepublic int hashCode() {int result = (int) (timestamp ^ (timestamp >>> 32));result = 31 * result + key.hashCode();result = 31 * result + namespace.hashCode();return result;}@Overridepublic String toString() {return "Timer{" +"timestamp=" + timestamp +", key=" + key +", namespace=" + namespace +'}';}@Overridepublic int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {return Long.compare(timestamp, other.getTimestamp());}
}
复制代码
- TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除
HeapPriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {@Nonnullprivate final KeyGroupRange keyGroupRange;@Nonnegativeprivate final int totalKeyGroups;@Nonnegativeprivate final int minimumCapacity;public HeapPriorityQueueSetFactory(@Nonnull KeyGroupRange keyGroupRange,@Nonnegative int totalKeyGroups,@Nonnegative int minimumCapacity) {this.keyGroupRange = keyGroupRange;this.totalKeyGroups = totalKeyGroups;this.minimumCapacity = minimumCapacity;}@Nonnull@Overridepublic <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(@Nonnull String stateName,@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {return new HeapPriorityQueueSet<>(PriorityComparator.forPriorityComparableObjects(),KeyExtractorFunction.forKeyedObjects(),minimumCapacity,keyGroupRange,totalKeyGroups);}
}
复制代码
- HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet
小结
- InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
- registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
- PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(
InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口
);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet
doc
- InternalTimeServiceManager
聊聊flink的InternalTimeServiceManager相关推荐
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- 聊聊flink的HistoryServer
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...
- 聊聊flink的TimeCharacteristic
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...
- 聊聊flink JobManager的heap大小设置
序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...
- 聊聊flink的StateTtlConfig
序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...
- 聊聊flink的AscendingTimestampExtractor
序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...
- 聊聊flink的CheckpointScheduler
序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...
- 聊聊flink的NetworkEnvironmentConfiguration
序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...
- 聊聊flink Table的groupBy操作
序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...
最新文章
- mysql.err日志分析_Mysql日志解析
- 关于nil和 null和NSNull的相关问题
- 风电功率预测matlab,一种基于二十四节气的风电功率预测方法与流程
- Dropbox 的用户付费账户降级策略
- sql2008“备份集中的数据库备份与现有的xx数据库不同”解决方法
- python 操作 表格
- ORACLE 进入sqlplus的协议适配器错误
- 《搭建你的数字积木 数字电路与逻辑设计》(1)
- Unity接入谷歌支付
- Python正则表达式快速入门
- pdf转图片 jpg png
- flask中jinjia2的学习
- 怎么破解Windows账户密码
- Google Earth Engine——美国人口数据可视化分析
- Excel插入图表失真(数据格式原因)修复笔记
- ZBrush坐标轴控制
- 一个微信小程序的案例
- 什么是vue-resource?
- 思科交换机命令大全(二)
- 专升本高数第一章试题_专升本高数——第一章 函数极限与连续性
热门文章
- (正确姿势)centos7 如何从U盘拷贝文件
- 论文速读:AI能从人类的愚蠢中学到什么?
- 《2019中国硬科技发展白皮书》发布,中美硬科技创新指数PK
- [附下载]英特尔中国研究院携手生态伙伴发布《机器人4.0白皮书》
- 重磅推荐:中国人工智能趋势报告(完整版)
- 自然语言处理(NLP)前沿进展报告
- Nature:科学家成功绘制出大脑神经细胞“地图”
- 魔性“合成大西瓜”背后,我用 350 行代码解开了碰撞之谜!
- 乐视视频 App 图标改为“欠 122 亿”,网友:我在别家分红包,却在你家随份子!...
- 三分钟黑了阿里?马云下死命令留他?吴翰清辟谣:我没黑过阿里