在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。

从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线),具体如下图所示。

window的生成时间是基于wall clock(processing time)的,与event time无关。



watermark = 进入 Flink 窗口的最大的事件时间(maxEventTime)— 指定的延迟时间(t)


watermark的本质是使用event time 做一个函数映射生成触发window计算的wall clock时间(processing time).


3.1 生成的时机


3.2 水位线分配器

  • Periodic Watermarks

周期性分配水位线比较常用,是我们会指示系统以固定的时间间隔发出的水位线。在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。比如下面的例子是手动设置每隔1s发出水位线。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 手动设置时间间隔为1s


public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 1000L;// 延迟时长是1s@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}
  • Punctuated Watermarks



public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {return new Watermark(extractedTimestamp);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}







从Flink 1.12 开始WaterMark接口进行了更新,原理没有变化。


-- TimestampAssigner
-- WatermarkGenerator


/*** A {@code TimestampAssigner} assigns event time timestamps to elements. These timestamps are used* by all functions that operate on event time, for example event time windows.** <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations represent* it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as {@link* System#currentTimeMillis()} does it.** @param <T> The type of the elements to which this assigner assigns timestamps.*/
public interface TimestampAssigner<T> {/*** The value that is passed to {@link #extractTimestamp} when there is no previous timestamp* attached to the record.*/long NO_TIMESTAMP = Long.MIN_VALUE;/*** Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of* any particular time zone or calendar.** <p>The method is passed the previously assigned timestamp of the element. That previous* timestamp may have been assigned from a previous assigner. If the element did not carry a* timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value* Long#MIN_VALUE}).** @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if*     no timestamp has been assigned yet.* @return The new timestamp.*/long extractTimestamp(T element, long recordTimestamp);

默认实现的类: RecordTimestampAssigner

/*** A {@link TimestampAssigner} that forwards the already-assigned timestamp. This is for use when* records come out of a source with valid timestamps, for example from the Kafka Metadata.*/
public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {@Overridepublic long extractTimestamp(E element, long recordTimestamp) {return recordTimestamp;}


/*** The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a* fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/
public interface WatermarkGenerator<T> {/*** Called for every event, allows the watermark generator to examine and remember the event* timestamps, or to emit a watermark based on the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks are generated depends on {@link* ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);




