本文主要研究一下flink的InternalTimeServiceManager

InternalTimeServiceManager

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java

@Internal
public class InternalTimeServiceManager<K> {@VisibleForTestingstatic final String TIMER_STATE_PREFIX = "_timer_state";@VisibleForTestingstatic final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";@VisibleForTestingstatic final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";private final KeyGroupRange localKeyGroupRange;private final KeyContext keyContext;private final PriorityQueueSetFactory priorityQueueSetFactory;private final ProcessingTimeService processingTimeService;private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;private final boolean useLegacySynchronousSnapshots;InternalTimeServiceManager(KeyGroupRange localKeyGroupRange,KeyContext keyContext,PriorityQueueSetFactory priorityQueueSetFactory,ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);this.keyContext = Preconditions.checkNotNull(keyContext);this.processingTimeService = Preconditions.checkNotNull(processingTimeService);this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;this.timerServices = new HashMap<>();}@SuppressWarnings("unchecked")public <N> InternalTimerService<N> getInternalTimerService(String name,TimerSerializer<K, N> timerSerializer,Triggerable<K, N> triggerable) {InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}@SuppressWarnings("unchecked")<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {timerService = new InternalTimerServiceImpl<>(localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));timerServices.put(name, timerService);}return timerService;}Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {return Collections.unmodifiableMap(timerServices);}private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(String name,TimerSerializer<K, N> timerSerializer) {return priorityQueueSetFactory.create(name,timerSerializer);}public void advanceWatermark(Watermark watermark) throws Exception {for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {service.advanceWatermark(watermark.getTimestamp());}}//              Fault Tolerance Methods             ///public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {Preconditions.checkState(useLegacySynchronousSnapshots);InternalTimerServiceSerializationProxy<K> serializationProxy =new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);serializationProxy.write(stream);}public void restoreStateForKeyGroup(InputStream stream,int keyGroupIdx,ClassLoader userCodeClassLoader) throws IOException {InternalTimerServiceSerializationProxy<K> serializationProxy =new InternalTimerServiceSerializationProxy<>(this,userCodeClassLoader,keyGroupIdx);serializationProxy.read(stream);}Methods used ONLY IN TESTS              @VisibleForTestingpublic int numProcessingTimeTimers() {int count = 0;for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {count += timerService.numProcessingTimeTimers();}return count;}@VisibleForTestingpublic int numEventTimeTimers() {int count = 0;for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {count += timerService.numEventTimeTimers();}return count;}
}
复制代码
  • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
  • getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
  • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的

PriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java

public interface PriorityQueueSetFactory {@Nonnull<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName,@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}
复制代码
  • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口

HeapPriorityQueueElement

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java

@Internal
public interface HeapPriorityQueueElement {/*** The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any* {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when* elements are removed from a {@link HeapPriorityQueue}.*/int NOT_CONTAINED = Integer.MIN_VALUE;/*** Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.*/int getInternalIndex();/*** Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning* {@link HeapPriorityQueue}.** @param newIndex the new index in the timer heap.*/void setInternalIndex(int newIndex);
}
复制代码
  • HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法

PriorityComparable

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java

public interface PriorityComparable<T> {int comparePriorityTo(@Nonnull T other);
}
复制代码
  • PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对

Keyed

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java

public interface Keyed<K> {K getKey();
}
复制代码
  • Keyed接口定义了getKey方法,用于返回该对象的key

InternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java

@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {/** Function to extract the key from a {@link InternalTimer}. */KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;/** Function to compare instances of {@link InternalTimer}. */PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =(left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());/*** Returns the timestamp of the timer. This value determines the point in time when the timer will fire.*/long getTimestamp();/*** Returns the key that is bound to this timer.*/@Nonnull@OverrideK getKey();/*** Returns the namespace that is bound to this timer.*/@NonnullN getNamespace();
}
复制代码
  • InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR

TimerHeapInternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java

@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {/** The key for which the timer is scoped. */@Nonnullprivate final K key;/** The namespace for which the timer is scoped. */@Nonnullprivate final N namespace;/** The expiration timestamp. */private final long timestamp;private transient int timerHeapIndex;TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {this.timestamp = timestamp;this.key = key;this.namespace = namespace;this.timerHeapIndex = NOT_CONTAINED;}@Overridepublic long getTimestamp() {return timestamp;}@Nonnull@Overridepublic K getKey() {return key;}@Nonnull@Overridepublic N getNamespace() {return namespace;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o instanceof InternalTimer) {InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;return timestamp == timer.getTimestamp()&& key.equals(timer.getKey())&& namespace.equals(timer.getNamespace());}return false;}@Overridepublic int getInternalIndex() {return timerHeapIndex;}@Overridepublic void setInternalIndex(int newIndex) {this.timerHeapIndex = newIndex;}void removedFromTimerQueue() {setInternalIndex(NOT_CONTAINED);}@Overridepublic int hashCode() {int result = (int) (timestamp ^ (timestamp >>> 32));result = 31 * result + key.hashCode();result = 31 * result + namespace.hashCode();return result;}@Overridepublic String toString() {return "Timer{" +"timestamp=" + timestamp +", key=" + key +", namespace=" + namespace +'}';}@Overridepublic int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {return Long.compare(timestamp, other.getTimestamp());}
}
复制代码
  • TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除

HeapPriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java

public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {@Nonnullprivate final KeyGroupRange keyGroupRange;@Nonnegativeprivate final int totalKeyGroups;@Nonnegativeprivate final int minimumCapacity;public HeapPriorityQueueSetFactory(@Nonnull KeyGroupRange keyGroupRange,@Nonnegative int totalKeyGroups,@Nonnegative int minimumCapacity) {this.keyGroupRange = keyGroupRange;this.totalKeyGroups = totalKeyGroups;this.minimumCapacity = minimumCapacity;}@Nonnull@Overridepublic <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(@Nonnull String stateName,@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {return new HeapPriorityQueueSet<>(PriorityComparator.forPriorityComparableObjects(),KeyExtractorFunction.forKeyedObjects(),minimumCapacity,keyGroupRange,totalKeyGroups);}
}
复制代码
  • HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

小结

  • InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射;getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
  • registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
  • PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口(InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口);HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet

doc

  • InternalTimeServiceManager

聊聊flink的InternalTimeServiceManager相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  3. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  4. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  5. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  6. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  7. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  8. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

  9. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. mysql.err日志分析_Mysql日志解析
  2. 关于nil和 null和NSNull的相关问题
  3. 风电功率预测matlab,一种基于二十四节气的风电功率预测方法与流程
  4. Dropbox 的用户付费账户降级策略
  5. sql2008“备份集中的数据库备份与现有的xx数据库不同”解决方法
  6. python 操作 表格
  7. ORACLE 进入sqlplus的协议适配器错误
  8. 《搭建你的数字积木 数字电路与逻辑设计》(1)
  9. Unity接入谷歌支付
  10. Python正则表达式快速入门
  11. pdf转图片 jpg png
  12. flask中jinjia2的学习
  13. 怎么破解Windows账户密码
  14. Google Earth Engine——美国人口数据可视化分析
  15. Excel插入图表失真(数据格式原因)修复笔记
  16. ZBrush坐标轴控制
  17. 一个微信小程序的案例
  18. 什么是vue-resource?
  19. 思科交换机命令大全(二)
  20. 专升本高数第一章试题_专升本高数——第一章 函数极限与连续性

热门文章

  1. (正确姿势)centos7 如何从U盘拷贝文件
  2. 论文速读:AI能从人类的愚蠢中学到什么?
  3. 《2019中国硬科技发展白皮书》发布,中美硬科技创新指数PK
  4. [附下载]英特尔中国研究院携手生态伙伴发布《机器人4.0白皮书》
  5. 重磅推荐:中国人工智能趋势报告(完整版)
  6. 自然语言处理(NLP)前沿进展报告
  7. Nature:科学家成功绘制出大脑神经细胞“地图”
  8. 魔性“合成大西瓜”背后,我用 350 行代码解开了碰撞之谜!
  9. 乐视视频 App 图标改为“欠 122 亿”,网友:我在别家分红包,却在你家随份子!...
  10. 三分钟黑了阿里?马云下死命令留他?吴翰清辟谣:我没黑过阿里