1.概述

转载:Flink 源码之时间处理

2.Flink支持的时间类型

  • EventTime: 每条数据都携带时间戳。Operator处理数据的时候所有依赖时间的操作依据数据携带的时间戳。可以支持乱序数据的处理。时间戳信息可以在数据源产生数据的时候指定(SourceFunction的中调用context的collectWithTimestamp收集元素),也可以使用DataStream的assignTimestampsAndWatermarks指定。通常来说在每条数据中会有一个字段存储时间戳信息。
  • ProcessingTime: 数据不携带任何时间戳的信息。operator使用系统当前时间作为每一条数据的处理时间。如果数据存在乱序的情况,Flink无法察觉。ProcessingTime为系统的默认值。
  • IngestionTime: 和EventTime 类似,不同的是Flink会使用系统时间作为timestamp绑定到每条数据(数据进入Flink系统的时候使用系统当前时间为时间戳绑定数据)。可以防止Flink内部处理数据是发生乱序的情况。但无法解决数据到达Flink之前发生的乱序问题。如果需要处理此类问题,建议使用EventTime。

3.设置Flink系统使用的时间类型

使用Environment的setStreamTimeCharacteristic方法指定系统使用的时间类型。方法参数为TimeCharacteristic

TimeCharacteristic为枚举类型,定义如下。

@PublicEvolving
public enum TimeCharacteristic {ProcessingTime,IngestionTime,EventTime
}

和之前所说的时间类型一一对应。

StreamExecutionEnvironmentsetStreamTimeCharacteristic方法源码如下:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}
}

这里我们发现如果系统TimeCharacteristicEventTime或者IngestionTime,会设置一个默认的自动watermark间隔时间(auto watermark interval)。这个参数是用来对齐集群中所有机器的watermark的。所有发送到下游的watermark一定是auto watermark interval的整数倍(通过源码分析发现该配置仅对IngestionTime生效)。具体逻辑在下文StreamSourceContexts部分分析。

4.StreamSourceContexts

StreamSourceContexts类负责根据系统的TimeCharacteristic来决定生成哪种类型的SourceContext。SourceContext在SourceFunction使用(参见 Flink 使用之数据源),不同的SourceContext对数据timestamp处理的行为不同。

SourceFunction中使用的SourceContext由getSourceContext方法决定。

getSourceContext方法的调用链如下所示:

  • SourceStreamTask中的LegacySourceFunctionThread.run: headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain); 在这一行代码中传入了StreamStatusMaintainer。可以追溯到StreamTask的getStreamStatusMaintainer方法,返回的是一个OperatorChain。
  • StreamSource.run: this.ctx = StreamSourceContexts.getSourceContext

getSourceContext方法的源码如下:

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {case EventTime:ctx = new ManualWatermarkContext<>(output,processingTimeService,checkpointLock,streamStatusMaintainer,idleTimeout);break;case IngestionTime:ctx = new AutomaticWatermarkContext<>(output,watermarkInterval,processingTimeService,checkpointLock,streamStatusMaintainer,idleTimeout);break;case ProcessingTime:ctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}

从源码可以看出,SourceContext有三种:

  • EventTime使用ManualWatermarkContext
  • ProcessingTime使用NonTimestampContext
  • IngestionTime使用AutomaticWatermarkContext

其中ManualWatermarkContextAutomaticWatermarkContext具有相同的父类WatermarkContext。

下面逐个分析WatermarkContext的方法。

4.1 WatermarkContext类

 @Overridepublic void collect(T element) {// 防止和checkpoint操作同时进行synchronized (checkpointLock) {// 改变stream的状态为ACTIVE状态streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);if (nextCheck != null) {// failOnNextCheck:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。this.failOnNextCheck = false;} else {scheduleNextIdleDetectionTask();}processAndCollect(element);}}

WatermarkContext的streamStatusMaintainer只有一个实现类OperatorChain。该变量由StreamTaskoperatorChain传入。

nextCheckScheduledFuture类型。

failOnNextCheck:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。

如果没有安排下一次空闲检查,需要调用scheduleNextIdleDetectionTask。代码稍后分析。

最后调用processAndCollect方法,包含具体的处理和收集数据的逻辑。该方法为抽象方法,稍后分析。

scheduleNextIdleDetectionTask代码如下:

private void scheduleNextIdleDetectionTask() {if (idleTimeout != -1) {// reset flag; if it remains true when task fires, we have detected idlenessfailOnNextCheck = true;// 安排一个空闲检测任务。该任务在idleTimeout之后执行// getCurrentProcessingTime()返回的是系统当前时间nextCheck = this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + idleTimeout,new IdlenessDetectionTask());}
}

IdlenessDetectionTask的源码如下:

private class IdlenessDetectionTask implements ProcessingTimeCallback {@Overridepublic void onProcessingTime(long timestamp) throws Exception {synchronized (checkpointLock) {// set this to null now;// the next idleness detection will be scheduled again// depending on the below failOnNextCheck condition// 设置nextCheck为null// 这样下次调用collect方法的时候会再次安排一个空闲检测任务nextCheck = null;if (failOnNextCheck) {// 标记数据源为空闲markAsTemporarilyIdle();} else {// 再次安排一个空闲检测任务scheduleNextIdleDetectionTask();}}}
}

markAsTemporarilyIdle方法:

@Override
public void markAsTemporarilyIdle() {synchronized (checkpointLock) {// 设置operatorChain的状态为空闲streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);}
}

经过以上分析我们不难发现collect方法具有自动空闲检测的功能。数据被收集的时候会设置stream为active状态,并设置一个空闲检查任务。该任务会在idleTimeout时间之后触发。如果在此期间内,仍没有数据被数据源采集,该数据源会被标记为空闲。如果期间内有数据到来,failOnNextCheck会被设置为false。此时空闲检测任务执行之后便不会标记数据源为空闲状态,取而代之的是再次安排一个空闲检测任务。

collectWithTimestamp方法在收集元素的同时,为元素绑定时间戳。代码如下:

@Override
public void collectWithTimestamp(T element, long timestamp) {synchronized (checkpointLock) {streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);if (nextCheck != null) {this.failOnNextCheck = false;} else {scheduleNextIdleDetectionTask();}processAndCollectWithTimestamp(element, timestamp);}
}

这段方法和collect方法的逻辑完全一致。同样具有定期检测数据源是否闲置的功能。在方法最后调用了子类的processAndCollectWithTimestamp方法。

emitWatermark方法用于向下游发送watermark。代码如下:

@Override
public void emitWatermark(Watermark mark) {// 此处多了一个判断,在允许使用watermark的情形下才会调用if (allowWatermark(mark)) {synchronized (checkpointLock) {streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);if (nextCheck != null) {this.failOnNextCheck = false;} else {scheduleNextIdleDetectionTask();}processAndEmitWatermark(mark);}}
}

此方法的逻辑和collect方法逻辑基本一致,不再赘述。

close方法用于关闭SourceContext,该方法会取消下一次空闲检测任务。代码如下:

@Override
public void close() {cancelNextIdleDetectionTask();
}

4.2 ManualWatermarkContext 类

EventTime时间类型使用的是ManualWatermarkContext。ManualWatermarkContext相比父类多了两个成员变量:

  • output: 负责输出数据流中的元素。对于StreamSource而言output为AbstractStreamOperator$CountingOutput包装的RecordWriterOutput
  • reuse:数据流中一个元素的包装类。该类在此被复用,不必反复创建。

ManualWatermarkContext实现父类的方法如下:

@Override
protected void processAndCollect(T element) {output.collect(reuse.replace(element));
}@Override
protected void processAndCollectWithTimestamp(T element, long timestamp) {output.collect(reuse.replace(element, timestamp));
}@Override
protected void processAndEmitWatermark(Watermark mark) {output.emitWatermark(mark);
}@Override
protected boolean allowWatermark(Watermark mark) {// 永远允许发送watermark,所以返回truereturn true;
}

4.3 AutomaticWatermarkContext 类

IngestionTime时间类型使用的是AutomaticWatermarkContext。
此类的构造方法如下:

private AutomaticWatermarkContext(final Output<StreamRecord<T>> output,final long watermarkInterval,final ProcessingTimeService timeService,final Object checkpointLock,final StreamStatusMaintainer streamStatusMaintainer,final long idleTimeout) {super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);this.output = Preconditions.checkNotNull(output, "The output cannot be null.");Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");// 通过 auto watermark interval配置this.watermarkInterval = watermarkInterval;this.reuse = new StreamRecord<>(null);this.lastRecordTime = Long.MIN_VALUE;// 获取系统当前时间long now = this.timeService.getCurrentProcessingTime();// 设置一个watermark发送定时器,在watermarkInterval时间之后触发this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}

WatermarkEmittingTask主要代码逻辑如下:

@Override
public void onProcessingTime(long timestamp) {// 获取系统当前时间final long currentTime = timeService.getCurrentProcessingTime();// 加锁,不能和checkpoint操作同时运行synchronized (lock) {// we should continue to automatically emit watermarks if we are active// 需要OperatorChain的状态为ACTIVEif (streamStatusMaintainer.getStreamStatus().isActive()) {// idleTimeout 不等于-1意味着设置了数据源的空闲超时时间// 发送watermark的时候也检查数据源空闲时间if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) {// if we are configured to detect idleness, piggy-back the idle detection check on the// watermark interval, so that we may possibly discover idle sources faster before waiting// for the next idle check to firemarkAsTemporarilyIdle();// no need to finish the next check, as we are now idle.cancelNextIdleDetectionTask();} else if (currentTime > nextWatermarkTime) {// align the watermarks across all machines. this will ensure that we// don't have watermarks that creep along at different intervals because// the machine clocks are out of sync// 取watermarkTime 为最接近currentTime 的watermarkInterval整数倍// 这称为watermark对齐操作,因为集群机器的时间是不同步的final long watermarkTime = currentTime - (currentTime % watermarkInterval);// 发送watermarkoutput.emitWatermark(new Watermark(watermarkTime));// 设置下次发送的watermark的时间,注意和下次执行发送watermark任务的时间不同nextWatermarkTime = watermarkTime + watermarkInterval;}}}// 再次安排一个watermark发送任务long nextWatermark = currentTime + watermarkInterval;nextWatermarkTimer = this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output));
}

通过以上分析我们不难发现AutomaticWatermarkContext是自动定时发送watermark到下游的。发送的间隔为watermarkInterval。

processAndCollect方法和逻辑如下所示:

@Override
protected void processAndCollect(T element) {lastRecordTime = this.timeService.getCurrentProcessingTime();output.collect(reuse.replace(element, lastRecordTime));// this is to avoid lock contention in the lockingObject by// sending the watermark before the firing of the watermark// emission task.// lastRecordTime如果大于nextWatermarkTime需要立即发送一次watermark// nextWatermarkTime为下次要发送的watermark的时间,和下次执行发送watermark任务的时间不同// 发送的watermark的时间一定比执行发送watermark任务的时间早// 如果没有此判断,到下次发送watermark任务执行之后,发送的watermark时间会早于这条数据的时间,下游不会及时处理这条数据。if (lastRecordTime > nextWatermarkTime) {// in case we jumped some watermarks, recompute the next watermark timefinal long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);// nextWatermarkTime比lastRecordTime大// 因此下游会立即开始处理这条数据nextWatermarkTime = watermarkTime + watermarkInterval;output.emitWatermark(new Watermark(watermarkTime));// we do not need to register another timer here// because the emitting task will do so.}
}

processAndCollectWithTimestamp方法如下所示。第二个参数timestamp被忽略。IngestionTime使用系统时间作为元素绑定时间。

@Override
protected void processAndCollectWithTimestamp(T element, long timestamp) {processAndCollect(element);
}

最后我们分析下allowWatermarkprocessAndEmitWatermark方法。AutomaticWatermarkContext不允许我们显式要求发送watermark。只能通过定时任务发送。只有当waterMark时间为Long.MAX_VALUE并且nextWatermarkTime不为Long.MAX_VALUE才可以发送。发送过这个特殊的watermark之后,关闭定时发送watermark的任务。代码如下所示:

@Override
protected boolean allowWatermark(Watermark mark) {// allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emitsreturn mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE;
}/** This will only be called if allowWatermark returned {@code true}. */
@Override
protected void processAndEmitWatermark(Watermark mark) {nextWatermarkTime = Long.MAX_VALUE;output.emitWatermark(mark);// we can shutdown the watermark timer now, no watermarks will be needed any more.// Note that this procedure actually doesn't need to be synchronized with the lock,// but since it's only a one-time thing, doesn't hurt eitherfinal ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;if (nextWatermarkTimer != null) {nextWatermarkTimer.cancel(true);}
}

4.4 NonTimestampContext 类

这个类比较简单,不处理任何和timestamp相关的逻辑。也不会发送任何watermark。在此不做过多的分析。

5 ProcessingTime 调用链

InternalTimeServiceImpl.registerProcessingTimeTimer
SystemProcessingTimeService.registerTimer
SystemProcessingTimeService.wrapOnTimerCallback
ScheduledTask.run
TimerInvocationContext.invoke
InternalTimeServiceImpl.onProcessingTime(): triggerTarget.onProcessingTime(timer);

4.5.1 InternalTimeServiceImpl.registerProcessingTimeTimer

registerProcessingTimeTimer方法注册一个ProcessingTime定时器:

@Override
// 该方法主要在windowOperator和SimpleTimerService中调用
// 在windowOperator调用,namespace传入当前window
// 在SimpleTimerService调用,namespace传入VoidNamespace.INSTANCE
public void registerProcessingTimeTimer(N namespace, long time) {// 这是一个PriorityQueue。获取timestamp最小的timerInternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();// 如果新加入的timer的timestamp是最小的,方法返回trueif (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// check if we need to re-schedule our timer to earlier// 如果新加入的timer的timetstamp在队列中最小(最先执行)// 需要取消掉原有的timer// 再重新注册timer,timestamp为新加入timer的timetstampif (time < nextTriggerTime) {if (nextTimer != null) {nextTimer.cancel(false);}nextTimer = processingTimeService.registerTimer(time, this);}}
}

InternalTimeServiceImpl维护了一个processingTimeTimersQueue变量。该变量是一个有序的队列,存储了一系列定时器对象。

InternalTimeServiceManager在获取InternalTimeServiceImpl会调用它的startTimerService方法。该方法会把第一个(时间最早的timer)注册到一个ScheduledThreadPoolExecutor上。因此第一个timer到时间的时候会调用InternalTimeServiceImplonProcessingTime方法。

InternalTimeServiceImpl的onProcessingTime方法代码如下:

@Override
public void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;// 一直循环获取时间小于参数time的所有定时器,并运行triggerTarget的onProcessingTime方法// 例如WindowOperator中的internalTimerService,triggerTarget就是WindowOperator自身while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {processingTimeTimersQueue.poll();keyContext.setCurrentKey(timer.getKey());triggerTarget.onProcessingTime(timer);}// 执行到这一步的时候timer的timetamp刚好大于参数time// 此时在安排下一个定时器if (timer != null && nextTimer == null) {nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);}
}

由以上分析可知processingTimeTimersQueue的timer中,始终会有一个timestamp最小的timer被注册为定时任务。每次触发定时器总会有一个timestamp刚好大于该定时器timestamp的定时器(来自processingTimeTimersQueue)被安排定时执行。

4.5.2 SystemProcessingTimeService.registerTimer

上部分 InternalTimeServiceImpl.registerProcessingTimeTimer会调用
SystemProcessingTimeService.registerTimer方法。其源代码如下:

@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark// T says we won't see elements in the future with a timestamp smaller or equal to T.// With processing time, we therefore need to delay firing the timer by one ms.// 此处计算delay的值// 依照英文注释所言,这里额外延迟1ms触发是要和watermark的语义一致long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;// we directly try to register the timer and only react to the status on exception// that way we save unnecessary volatile accesses for each timertry {// 这里schedule一个timer// wrapOnTimerCallback返回一个ScheduledTask对象// ScheduledTask对象封装了定时timestamp和定时执行的任务逻辑return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException e) {final int status = this.status.get();if (status == STATUS_QUIESCED) {return new NeverCompleteFuture(delay);}else if (status == STATUS_SHUTDOWN) {throw new IllegalStateException("Timer service is shut down");}else {// something else happened, so propagate the exceptionthrow e;}}
}

4.5.3 InternalTimeServiceImpl创建逻辑

一个Operator持有一个InternalTimeServiceImpl实例。调用链如下:

  • AbstractStreamOperator.getInternalTimerService
  • InternalTimeServiceManager.registerOrGetTimerService
    另外,SystemProcessingTimeService在StreamTask的invoke方法中创建。

6.EventTime 调用逻辑

各个Task接收watermark到响应watermark事件的调用链如下:

StreamTaskNetworkInput.processElement
StatusWatermarkValve.inputWatermark
StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels
OneInputStreamTask.emitWatermark
AbstractStreamOperator.processWatermark
InternalTimeServiceManager.advanceWatermark
InternalTimeServiceImpl.advanceWatermark: triggerTarget.onEventTime(timer);

以windowOperator为例。如果系统的TimeCharacteristic设置的是EventTime,每次元素到来之后都会注册一个EventTime定时器,时间为window结束时间。

6.1 InternalTimeServiceImpl.registerEventTimeTimer

@Override
public void registerEventTimeTimer(N namespace, long time) {eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

注册一个EventTime定时器就是在eventTimeTimersQueue中添加一个timer。eventTimeTimersQueueprocessingTimeTimersQueue结构完全一样。只不过是用于专门存放EventTime的定时器。下面的问题就是什么时候Flink会使用这些timer触发计算呢?

6.2 InternalTimeServiceImpl.advanceWatermark

这个方法在接收到watermark的时候调用。主要逻辑为从eventTimeTimersQueue中依次取出触发时间小于参数time的所有定时器,调用triggerTarget.onEventTime方法。triggerTarget.onEventTime含有operator基于eventTime计算的具体逻辑。

advanceWatermark方法代码如下:

public void advanceWatermark(long time) throws Exception {currentWatermark = time;InternalTimer<K, N> timer;while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {eventTimeTimersQueue.poll();keyContext.setCurrentKey(timer.getKey());triggerTarget.onEventTime(timer);}
}

上面的方法在InternalTimeServiceManager中调用。InternalTimeServiceManager的advanceWatermark方法循环调用内部所有InternalTimerService的advanceWatermark方法。

public void advanceWatermark(Watermark watermark) throws Exception {for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {service.advanceWatermark(watermark.getTimestamp());}
}

该方法的调用在AbstractStreamOperator的processWatermark中,代码如下:

public void processWatermark(Watermark mark) throws Exception {if (timeServiceManager != null) {timeServiceManager.advanceWatermark(mark);}// 向下游继续发送watermarkoutput.emitWatermark(mark);
}

按照调用链,我们继续跟踪到OneInputStreamTask的emitWatermark方法:

@Override
public void emitWatermark(Watermark watermark) throws Exception {synchronized (lock) {watermarkGauge.setCurrentWatermark(watermark.getTimestamp());operator.processWatermark(watermark);}
}

接下来是StatusWatermarkValve的findAndOutputNewMinWatermarkAcrossAlignedChannels方法:

private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception {long newMinWatermark = Long.MAX_VALUE;boolean hasAlignedChannels = false;// determine new overall watermark by considering only watermark-aligned channels across all channelsfor (InputChannelStatus channelStatus : channelStatuses) {// 阅读inputStreamStatus方法可知input channel变为空闲状态的时候watermark对齐状态为false// 获取所有对齐状态channel的watermark最小值if (channelStatus.isWatermarkAligned) {hasAlignedChannels = true;newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);}}// we acknowledge and output the new overall watermark if it really is aggregated// from some remaining aligned channel, and is also larger than the last output watermark// 发送watermarkif (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {lastOutputWatermark = newMinWatermark;output.emitWatermark(new Watermark(lastOutputWatermark));}
}

接下来分析inputWatermark方法:

public void inputWatermark(Watermark watermark, int channelIndex) throws Exception {// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {long watermarkMillis = watermark.getTimestamp();// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.if (watermarkMillis > channelStatuses[channelIndex].watermark) {// 更新channel的watermarkchannelStatuses[channelIndex].watermark = watermarkMillis;// previously unaligned input channels are now aligned if its watermark has caught up// 设置channel的watermark对齐状态为true// 该channel之前是空闲状态,且watermark已被更新,因此这里设置其对齐状态为trueif (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {channelStatuses[channelIndex].isWatermarkAligned = true;}// now, attempt to find a new min watermark across all aligned channels// 调用上个代码片段的方法findAndOutputNewMinWatermarkAcrossAlignedChannels();}}
}

最后我们跟踪到调用inputWatermark方法的位置在StreamTaskNetworkInput的processElement方法:

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}
}

很明显,该方法判断接收到元素的类型调用对应的处理逻辑。再向上跟踪就是Task之间传递数据的逻辑,会在后续博客中分析。

7.TimestampAssigner

经过上面的分析我们已经了解了operator是怎样的传递和响应接收到的watermark的。接下来还有一个地方需要研究,那就是watermark是怎样的产生的。

watermark可以在两个地方产生:

  • 数据源调用emitWatermark方法。博客开头StreamSourceContexts部分已经分析了源码。此处不再赘述。
  • 调用DataStream的assignTimestampsAndWatermarks方法。

assignTimestampsAndWatermarks有两个版本,一个接收AssignerWithPeriodicWatermarks另一个是AssignerWithPunctuatedWatermarks。我们先看源代码,稍后分析他们的不同之处。

AssignerWithPeriodicWatermarks版本的代码如下所示:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);
}

AssignerWithPunctuatedWatermarks版本的代码如下所示:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPunctuatedWatermarksOperator<T> operator =new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);
}

这两个版本的代码基本一致,仅仅是使用的operator不同。

TimestampsAndPeriodicWatermarksOperator

首先我们分析下TimestampsAndPeriodicWatermarksOperator源码。如下所示:

public class TimestampsAndPeriodicWatermarksOperator<T>extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {private static final long serialVersionUID = 1L;private transient long watermarkInterval;private transient long currentWatermark;public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {super(assigner);// 允许此operator和它前后的其他operator形成operator chainthis.chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;// 获取env中配置的自动watermark触发间隔watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();// 注册一个processing time定时器,在watermarkInterval之后触发,调用本类的onProcessingTime方法getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}@Overridepublic void processElement(StreamRecord<T> element) throws Exception {// 调用用户传入的TimestampAssigner的extractTimestamp方法,获取timestampfinal long newTimestamp = userFunction.extractTimestamp(element.getValue(),element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 收集此元素和timestamp并发往下游output.collect(element.replace(element.getValue(), newTimestamp));}@Override// open方法中注册的定时器触发的时候执行此方法public void onProcessingTime(long timestamp) throws Exception {// register next timer// 调用用户传入的方法获取当前watermarkWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}// 再次schedule一个processing time定时任务long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}/*** Override the base implementation to completely ignore watermarks propagated from* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit* watermarks from here).*/// 忽略上游的所有watermark// 有一个例外就是上接收到timestamp为Long.MAX_VALUE的watermark// 此时意味着输入流已经结束,需要将这个watermark发往下游@Overridepublic void processWatermark(Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {currentWatermark = Long.MAX_VALUE;output.emitWatermark(mark);}}@Overridepublic void close() throws Exception {super.close();// emit a final watermark// operator关闭的时候再次出发一次watermark发送操作Watermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}}
}

TimestampsAndPunctuatedWatermarksOperator

该类的源码分析如下:

public class TimestampsAndPunctuatedWatermarksOperator<T>extends AbstractUdfStreamOperator<T, AssignerWithPunctuatedWatermarks<T>>implements OneInputStreamOperator<T, T> {private static final long serialVersionUID = 1L;private long currentWatermark = Long.MIN_VALUE;public TimestampsAndPunctuatedWatermarksOperator(AssignerWithPunctuatedWatermarks<T> assigner) {super(assigner);this.chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<T> element) throws Exception {final T value = element.getValue();// 调用用户方法获取timestampfinal long newTimestamp = userFunction.extractTimestamp(value,element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);// 收集元素output.collect(element.replace(element.getValue(), newTimestamp));// 调用用户方法获取watermark,发送给下游final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {currentWatermark = nextWatermark.getTimestamp();output.emitWatermark(nextWatermark);}}/*** Override the base implementation to completely ignore watermarks propagated from* upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit* watermarks from here).*/// 和TimestampsAndPeriodicWatermarksOperator的方法一样,不再赘述@Overridepublic void processWatermark(Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {currentWatermark = Long.MAX_VALUE;output.emitWatermark(mark);}}
}

经过分析可知这两个operator最大的区别是TimestampsAndPeriodicWatermarksOperator会周期性的发送watermark,即便没有数据,仍会周期性发送timestamp相同的watermark,而TimestampsAndPunctuatedWatermarksOperator不会周期性发送watermark,只在每次元素到来的时候才发送watermark。

8.AscendingTimestampExtractor

这个timestamp提取器适用于顺序到来元素携带的timestamp严格递增的场景。

以下是extractTimestamp方法的源代码。该方法多了一个判断逻辑。如果新元素提取出的timestamp比currentTimestamp小的话,说明timestamp没有严格递增。接下来violationHandler的handleViolation会被调用。handleViolation是timestamp没有严格递增时候的回调函数。用户可以自己实现回调函数,也可以使用系统实现好的两个回调,分别是:

  • IgnoringHandler:忽略没有严格递增的情况,不作任何处理。
  • FailingHandler:抛出RuntimeException。
  • LoggingHandler:使用日志记录。
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}
}

BoundedOutOfOrdernessTimestampExtractor

watermark最常用的场景就是允许一定程度的数据乱序(有一个来迟数据的最大允许容忍时间,超过这个时间的数据不会被计算,由旁路输出处理)。Flink根据这种场景为我们实现好了一个timestamp提取器。该提取器中有一个重要变量maxOutOfOrderness,含义为上句话括号中所述的数据来迟最大容忍时间。该提取器是一个抽象类,使用时需要用户继承此类,实现extractTimestamp(T element)方法,编写根据元素来获取timestamp的逻辑。

该提取器的extractTimestamp(T element, long previousElementTimestamp)方法和分析如下所示:

@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {// 调用用户实现的方法,从元素获取timestamplong timestamp = extractTimestamp(element);// currentMaxTimestamp存储了已处理数据最大的timestamp// 初始值为Long.MIN_VALUE + maxOutOfOrdernessif (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;
}

此方法由之前所讲的两个operator调用。用户不需要考虑如何实现这个方法,只需要实现该方法间接调用的extractTimestamp(T element)方法即可。

getCurrentWatermark获取当前watermark方法代码如下:

@Override
public final Watermark getCurrentWatermark() {// this guarantees that the watermark never goes backwards.// 主要逻辑在此,发送watermark的时间为减去maxOutOfOrderness// 含义为maxOutOfOrderness时间之前的数据已经到齐// 这样保证了只有maxOutOfOrderness时间之前的数据才进行计算long potentialWM = currentMaxTimestamp - maxOutOfOrderness;// 此处防止watermark倒流if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);
}

IngestionTimeExtractor

和AutomaticWatermarkContext生成watermark的逻辑基本一致,只是没有watermark对齐操作。使用系统当前时间作为watermark的timestamp发往下游。

public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> {private static final long serialVersionUID = -4072216356049069301L;private long maxTimestamp;@Overridepublic long extractTimestamp(T element, long previousElementTimestamp) {// make sure timestamps are monotonously increasing, even when the system clock re-syncsfinal long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return now;}@Overridepublic Watermark getCurrentWatermark() {// make sure timestamps are monotonously increasing, even when the system clock re-syncsfinal long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return new Watermark(now - 1);}
}

【Flink】Flink 源码之时间处理相关推荐

  1. v35.03 鸿蒙内核源码分析(时间管理) | 内核基本时间单位是谁 | 百篇博客分析HarmonyOS源码

    子曰:"譬如为山,未成一篑,止,吾止也:譬如平地,虽覆一篑,进,吾往也." <论语>:子罕篇 百篇博客系列篇.本篇为: v35.xx 鸿蒙内核源码分析(时间管理篇) | ...

  2. 【Flink】flink highavailabilityservices 源码解析

    1.概述 转载:https://www.freesion.com/article/5743743878/ 写在前面:源码查看入口 runtime ---> Entrypoint 不同模式对应不同 ...

  3. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  4. Flink Checkpoint源码浅析

    1. JobManager 端checkpoint调度 dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> exec ...

  5. Flink Cep 源码分析

    复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤.关联.聚合等技术,根据事 ...

  6. Flink checkpoint源码理解

    参考:https://blog.jrwang.me/2019/flink-source-code-checkpoint/#checkpoint-%E7%9A%84%E5%8F%91%E8%B5%B7% ...

  7. Flink内核源码(八)Flink Checkpoint

    Flink中Checkpoint是使Flink 能从故障恢复的一种内部机制.检查点是 Flink 应用状态的一个一致性副本,在发生故障时,Flink 通过从检查点加载应用程序状态来恢复. 核心思想:是 ...

  8. flink CompactingHashTable源码解析

    CompactingHashTable是使用flink管理内存的hash表. 这个table被设计分为两个部分,一部分是hash索引,用来定位数据的具体位置,而另一部分则是被分区的内存buffer用来 ...

  9. 【Flink】源码-Flink重启策略-简介 Task恢复策略 重启策略监听器

    文章目录 1.概述 3.固定间隔 4.失败率 4.1 案例 5. 无重启策略 5.1 案例 6.实际代码演示 7. Task恢复策略 8.重启策略监听器 8.1 测试 M.参考 1.概述 ​ Flin ...

最新文章

  1. UIActivityViewController使用
  2. 【冷门实用小工具】轻量级流程图工具ClickCharts PRO绿色版,ClickCharts PRO下载【亲测有效】
  3. MIT霸气护学生:你换导师,我替你买单!
  4. 什么才是有效的工程教育的方法?
  5. linux 提权一文通
  6. 哈夫曼编码之大根堆小根堆揭西县
  7. ubuntu系统批量端口永久开放
  8. 机器人学习--Carnegie Mellon University 认知机器人学课程
  9. TVM:通过Python接口(AutoTVM)来编译和优化模型
  10. 将稍大文件存储到远程SQL Server服务器
  11. usc计算机博士游戏专业,USC工科博士专业排名,必然得仔细的看
  12. ps批量修改名片文字_PS批量制作多项字幕条
  13. [A3C]:Tensorflow代码实现详解
  14. box-sizing的属性值
  15. python爬iptv直播源_GitHub - xkloveme/iptv-m3u: python 爬的直播源数据
  16. 降低CSS特异性的策略
  17. kubernetes部署 rook ceph
  18. 知道创宇研发技能表v3.0
  19. 转载 | 纵览一季度 NFT 发展全景(上):发展历史与生态概述
  20. 微信会员php源码,美容院SPA会员管理系统(含微信端) v2.0

热门文章

  1. 斗鱼Q3财报:移动端季度平均MAU再创新高至6190万,付费用户720万
  2. 11月1日至11日 全国处理快件47.76亿件
  3. 疑似黑鲨5系列游戏手机已备案:或首批搭载骁龙898
  4. 任正非:未来是云时代,华为也要转向云战略
  5. LG电子发布旋转双屏5G手机Wing 售价约6800元
  6. 799元首发!小米手表Color:14天超长续航、专业运动健康管理
  7. iPhone 12概念渲染图流出:乔布斯“遗志”将被继承?
  8. 全球品牌百强榜单出炉:中国品牌仅有华为上榜
  9. 苹果推送iOS13.1.3更新:iOS13发布仅一个月疯狂补Bug
  10. 《诛仙Ⅰ》票房破3亿 QQ阅读《诛仙》小说全平台收入增长11.7倍