本文主要研究一下flink的consecutive windowed operations

实例

DataStream<Integer> input = ...;DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new Summer());DataStream<Integer> globalResults = resultsPerKey.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new TopKWindowFunction());
  • 本实例首先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)

TimestampsAndPeriodicWatermarksOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java

public class TimestampsAndPeriodicWatermarksOperator<T>extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {private static final long serialVersionUID = 1L;private transient long watermarkInterval;private transient long currentWatermark;public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {super(assigner);this.chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}@Overridepublic void processElement(StreamRecord<T> element) throws Exception {final long newTimestamp = userFunction.extractTimestamp(element.getValue(),element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);output.collect(element.replace(element.getValue(), newTimestamp));}@Overridepublic void onProcessingTime(long timestamp) throws Exception {// register next timerWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}/*** Override the base implementation to completely ignore watermarks propagated from* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit* watermarks from here).*/@Overridepublic void processWatermark(Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {currentWatermark = Long.MAX_VALUE;output.emitWatermark(mark);}}@Overridepublic void close() throws Exception {super.close();// emit a final watermarkWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}}
}
  • 假设assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks类型的参数,那么创建的是TimestampsAndPeriodicWatermarksOperator;它在open的时候根据指定的watermarkInterval注册了一个延时任务
  • 该延时任务会回调onProcessingTime方法,而onProcessingTime在这里则会调用AssignerWithPeriodicWatermarks的getCurrentWatermark方法获取watermark,然后重新注册新的延时任务,延时时间为getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;这里的watermarkInterval即为env.getConfig().setAutoWatermarkInterval设置的值
  • AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了注册延时任务实现不断定时的效果外,还会在新的watermark值大于currentWatermark的条件下发射watermark

SystemProcessingTimeService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

public class SystemProcessingTimeService extends ProcessingTimeService {private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);private static final int STATUS_ALIVE = 0;private static final int STATUS_QUIESCED = 1;private static final int STATUS_SHUTDOWN = 2;// ------------------------------------------------------------------------/** The containing task that owns this time service provider. */private final AsyncExceptionHandler task;/** The lock that timers acquire upon triggering. */private final Object checkpointLock;/** The executor service that schedules and calls the triggers of this task. */private final ScheduledThreadPoolExecutor timerService;private final AtomicInteger status;public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {this(failureHandler, checkpointLock, null);}public SystemProcessingTimeService(AsyncExceptionHandler task,Object checkpointLock,ThreadFactory threadFactory) {this.task = checkNotNull(task);this.checkpointLock = checkNotNull(checkpointLock);this.status = new AtomicInteger(STATUS_ALIVE);if (threadFactory == null) {this.timerService = new ScheduledThreadPoolExecutor(1);} else {this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);}// tasks should be removed if the future is canceledthis.timerService.setRemoveOnCancelPolicy(true);// make sure shutdown removes all pending tasksthis.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);}@Overridepublic long getCurrentProcessingTime() {return System.currentTimeMillis();}@Overridepublic ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark// T says we won't see elements in the future with a timestamp smaller or equal to T.// With processing time, we therefore need to delay firing the timer by one ms.long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;// we directly try to register the timer and only react to the status on exception// that way we save unnecessary volatile accesses for each timertry {return timerService.schedule(new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException e) {final int status = this.status.get();if (status == STATUS_QUIESCED) {return new NeverCompleteFuture(delay);}else if (status == STATUS_SHUTDOWN) {throw new IllegalStateException("Timer service is shut down");}else {// something else happened, so propagate the exceptionthrow e;}}}//......
}
  • SystemProcessingTimeService的registerTimer方法根据指定的timestamp注册了一个延时任务TriggerTask;timerService为JDK自带的ScheduledThreadPoolExecutor;TriggerTask的run方法会在service状态为STATUS_LIVE时,触发ProcessingTimeCallback(这里为TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {//......@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {//......} else {for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {if (lateDataOutputTag != null){sideOutput(element);} else {this.numLateRecordsDropped.inc();}}}/*** Emits the contents of the given window using the {@link InternalWindowFunction}.*/@SuppressWarnings("unchecked")private void emitWindowContents(W window, ACC contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());processContext.window = window;userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);}//......
}
  • WindowOperator的processElement方法会把element添加到windowState,这里为HeapAggregatingState,即在内存中累积,之后调用triggerContext.onElement方法(里头使用的是trigger.onElement方法,这里的trigger为EventTimeTrigger)获取TriggerResult,如果需要fire,则会触发emitWindowContents,如果需要purge则会清空windowState;emitWindowContents则是调用userFunction.process执行用户定义的窗口操作

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ?TriggerResult.FIRE :TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window,OnMergeContext ctx) {// only register a timer if the watermark is not yet past the end of the merged window// this is in line with the logic in onElement(). If the watermark is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {ctx.registerEventTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "EventTimeTrigger()";}public static EventTimeTrigger create() {return new EventTimeTrigger();}
}
  • EventTimeTrigger的onElement方法会判断,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE,告知WindowOperator可以emitWindowContents

小结

  • flink支持consecutive windowed operations,比如先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)
  • AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它们有两个功能,一个是从element提取timestamp作为eventTime,一个就是发射watermark;由于element实际上不一定是严格按eventTime时间到来的,可能存在乱序,因而watermark的作用就是限制迟到的数据进入窗口,不让窗口无限等待迟到的可能属于该窗口的element,即告知窗口eventTime小于等于该watermark的元素可以认为都到达了(窗口可以根据自己设定的时间范围,借助trigger判断是否可以关闭窗口然后开始对该窗口数据执行相关操作);对于consecutive windowed operations来说,上游的watermark会forward给下游的operations
  • Trigger的作用就是告知WindowOperator什么时候可以对关闭该窗口开始对该窗口数据执行相关操作(返回TriggerResult.FIRE的情况下),对于EventTimeTrigger来说,其onElement方法的判断逻辑跟watermark相关,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE

doc

  • Consecutive windowed operations

聊聊flink的consecutive windowed operations 1相关推荐

  1. 聊聊flink的consecutive windowed operations

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的consecutive windowed operations 实例 DataStream<In ...

  2. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  3. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  4. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  5. 聊聊flink的CheckpointScheduler 1

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  6. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

  7. 聊聊flink的Async I/O

    序 本文主要研究一下flink的Async I/O 实例 // This example implements the asynchronous request and callback with F ...

  8. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  9. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  10. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

最新文章

  1. 使用ZFS的十条理由
  2. 【Raspberry Pi】定时运行python程序读温湿度传感器数据发邮件
  3. 利用python去除红章
  4. 五分钟用vue实现一个五星打分效果
  5. 不使用session,借助redis实现验证码
  6. PHP中发送邮件的几种方法总结
  7. H+ HPlus创建新的标签页/选项卡
  8. UML常用图--类图,用例图,序列图(时序图),协作图(通信图),状态图,活动图
  9. CDOJ 796 DAGE(Big Brother)
  10. 公司版苹果开发者账号注册申请流程
  11. html如何给图片加链接
  12. 校招22届大疆 嵌入式面经/23届投递可私戳内推!
  13. Unity插件精选:炫酷粒子特效(V客学院知识分享)
  14. 楼教主(楼天成)的ACM心路历程
  15. WebDAV之葫芦儿·派盘+SwiftScan
  16. JVM中引用计数法与可达性分析
  17. 手把手教你写第一个C语言程序
  18. Hexo-github日历图
  19. ABAC - 基于属性的访问控制 - 复杂场景下访问控制解决之道
  20. java毕业设计大学生二手物品交易网站演示记录2021Mybatis+系统+数据库+调试部署

热门文章

  1. SPEA将出席MWS China 2022
  2. 企业erp系统服务器,ERP系统是什么
  3. linux系统python中的列表 || python中的集合
  4. 作为学生,我是怎么半年赚到人生第一个三十万的
  5. map返回另一个对象
  6. sympy 求微分方程_Sympy常用函数总结
  7. OSChina 周五乱弹 —— 谁说胖,来一个neng一个
  8. GYM 101086 B.Brother Louie(dfs)
  9. 5.计算圆周长和面积的c代码
  10. python自动排版公众号_请问微信公众号推文如何实现自动排版?