文章目录

  • 前言
  • 1.Trigger
  • 2.处理迟到数据
    • 2.1 设置水位线延迟时间
    • 2.2 允许窗口处理迟到数据
    • 2.3 将迟到数据放入侧输出流
  • 3.实操
    • 3.1 代码示例
    • 3.2 中间遇到的异常
    • 3.3 结果演示
  • 4. 迟到数据触发窗口计算重复结果处理

前言

  迟到数据,是指在watermark之后到来的数据,事件时间在水位线之前。所以只有在事件时间语义下,讨论迟到数据的处理才有意义。对于乱序流,可以设置一个延迟时间;对于窗口计算,可以设置窗口的允许延迟时间;另外可以将迟到数据输出到Side Outputs


1.Trigger

  Trigger决定窗口调用窗口函数的时间,抽象类Trigger含有的方法

onElement() called for each element that is added to a window.

onEventTime() called when a registered event-time timer fires.

onProcessingTime() called when a registered processing-time timer fires.

onMerge() elevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.

clear() performs any action needed upon removal of the corresponding window.


前三个方法返回TriggerResult,对应下列枚举,决定window操作

    /** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);

2.处理迟到数据

2.1 设置水位线延迟时间

In order to work with event time, Flink needs to know the events timestamps, meaning each element in the stream needs to have its event timestamp assigned. This is usually done by accessing/extracting the timestamp from some field in the element by using a TimestampAssigner.Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time. You can configure this by specifying a WatermarkGenerator.

  watermark标记了event time的进展,是整个应用的全局逻辑时钟。水位线会随着数据在任务间流动,从而给每个任务指明当前的事件时间。生成水位线需要设置TimestampAssigner(分配事件时间的时间戳)和WatermarkGenerator(生成水位线的方法,on events 或者 periodically)
  当设置水位线的延迟后,所有定时器就都会按照延迟后的水位线来触发,注意一般情况不应该把延迟设置得太大,否则会大幅度降低流处理的实时性,视需求一般设在毫秒~秒级

如 处理无序流常用:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

BoundedOutOfOrdernessWatermarks源码

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {/** The maximum timestamp encountered so far. */private long maxTimestamp;/** The maximum out-of-orderness that this watermark generator assumes. */private final long outOfOrdernessMillis;/*** Creates a new watermark generator with the given out-of-orderness bound.** @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.*/public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// start so that our lowest watermark would be Long.MIN_VALUE.this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;}// ------------------------------------------------------------------------@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));}
}

2.2 允许窗口处理迟到数据

When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See event time and especially late elements for a more thorough discussion of how Flink deals with event time.
By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness specifies by how much time elements can be late before they are dropped, and its default value is 0. Elements that arrive after the watermark has passed the end of the window but before it passes the end of the window plus the allowed lateness, are still added to the window. Depending on the trigger used, a late but not dropped element may cause the window to fire again. This is the case for the EventTimeTrigger.
In order to make this work, Flink keeps the state of windows until their allowed lateness expires. Once this happens, Flink removes the window and deletes its state, as also described in the Window Lifecycle section.
By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.

  对于窗口计算,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么迟到数据就要被丢弃,因此可以设置延迟时间,允许继续处理迟到数据的。默认情况下延迟时间为0,若设置延迟时间后,watermark超过窗口结束时间戳,但未超过 延迟后的时间戳,迟到数据仍然可添加到窗口中,触发计算。
  中间过程可视为,在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。逐步修正计算结果,最终得到准确的统计值。

注:对于GlobalWindows,不会存在迟到数据,因为全局窗口的结束时间戳为Long.MAX_VALUE.

DataStream<T> input = ...;input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>);

2.3 将迟到数据放入侧输出流

Using Flink’s side output feature you can get a stream of the data that was discarded as late.
You first need to specify that you want to get late data using sideOutputLateData(OutputTag) on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation:

  窗口后关闭,仍然有迟到数据,则用侧输出流来收集关窗后的迟到数据,保证数据不丢失。因为窗口已经真正关闭,只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新

3.实操

3.1 代码示例

pojo对象

public class Event {public String user;public String url;public long timestamp;public Event() {}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}@Overridepublic int hashCode() {return super.hashCode();}public String getUser() {return user;}public void setUser(String user) {this.user = user;}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}
}

核心代码
(1) 从kakfa抽取数据,并处理为Event数据流
(2) 设置watermark
(3) window窗口操作
(4) 输出到控制台

public class LaterDataTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//读取kafkaProperties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.42.102:9092,192.168.42.103:9092,192.168.42.104:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStream<Event> stream = env.addSource(newFlinkKafkaConsumer<String>("clicks",new SimpleStringSchema(),properties)).flatMap(new FlatMapFunction<String, Event>() {@Overridepublic void flatMap(String s, Collector<Event> collector) throws Exception {String[] split = s.split(",");collector.collect(new Event(split[0], split[1], Long.parseLong(split[2])));}});//水位线  乱序流 需要设置Duration和timestampAssignerSingleOutputStreamOperator<Event> watermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));watermarks.print("input");//定义一个输出标签OutputTag<Event> late = new OutputTag<Event>("late") {};//设置延迟时间SingleOutputStreamOperator<UrlViewCount> aggregate = watermarks.keyBy(new KeySelector<Event, String>() {@Overridepublic String getKey(Event value) throws Exception {return value.url;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.minutes(1)).sideOutputLateData(late).aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}},new ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();Long next = elements.iterator().next();long currentWatermark = context.currentWatermark();out.collect(new UrlViewCount(s + " 水位线" + currentWatermark, next, start, end));}});//输出aggregate.print("result");aggregate.getSideOutput(late).print("late");env.execute();}
}
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};DataStream<T> input = ...;SingleOutputStreamOperator<T> result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>);DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

3.2 中间遇到的异常

(1) 泛型擦除

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.util.OutputTag could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point

处理:

改为

(2) 未设置并行度为1,窗口操作和预期不符


处理

原因:
  在水位线的传递过程中,所有的上游并行任务符合木桶原理,最短的那一块决定了桶的水位。

3.3 结果演示

kafka生成数据

./kafka-console-producer.sh  --broker-list 192.168.42.102:9092,192.168.42.103:9092,192.168.42.104:9092 --topic clicks

水位线允许2秒延迟,窗口允许1分钟迟到数据,之后的迟到数据发送到late标签的侧输出流中
(1) 消费到时间戳7000的数据,此时窗口水位线时间戳4999


(2)时间戳7000之后的迟到数据,仍然触发窗口计算


(3)接收到时间戳67000窗口关闭后,迟到数据写入侧输出流,需要手动合并


4. 迟到数据触发窗口计算重复结果处理

When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes the end of the window. In these cases, when a late but not dropped element arrives, it could trigger another firing for the window. These firings are called late firings, as they are triggered by late events and in contrast to the main firing which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows, as they may “bridge” the gap between two pre-existing, unmerged windows.
The elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.

  late firings更新计算结果后,数据流中将包含同一窗口计算的多个结果,需要对重复数据进行删除

flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)相关推荐

  1. 2019-nCoV肺炎疫情同程查询-完整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/trip2019ncov 手机查询链接(已开发好的):www.woyaocha.net/trip2019ncov 接口使用 我要查询 ...

  2. 快递查询(快递单号智能识别/快递公司+快递单号)-完整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/express 查询说明 接口一:快递单号智能识别 快递单号智能识别,是根据查询的快递单号自动智能识别出该运单所属的快递公司,再获取快 ...

  3. 院校代码/高等学校查询-整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/school 接口描述 功能描述 通过院校名称获取院校对应的院校代码/高等学校,或者通过院校代码/高等学校获取对应的院校名称:后期将会 ...

  4. 行政区划代码查询-完整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/area 接口描述 功能描述 根据行政区划代码获取行政区划名称,或者根据行政区划名称获取行政区划代码. URL 示例 1)http 协 ...

  5. POS 商户编号查询-完整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/posmcc 接口描述 功能描述 通过商户编号获取中国大陆地区所有线下收单机构的商户信息,如:收单机构.MCC编码类别.适用费率及商户 ...

  6. 手机号码查询-整提供 Demo 代码示例及数据专业且全面的 API 查询接口

    更多资料请参考:www.woyaocha.net/product/mobile 接口描述 功能描述 获取手机号码段的省市区以及运营商名称,通俗的说就是获取手机号码的归属地及电信运营商名称. URL 示 ...

  7. html文件设置成mac桌面,Mac软件教程:让你的Mac桌面放入无限多的文件

    对于那些喜欢将各种文件放在桌面的 Mac 用户,比如PC6小编,每天都会在上面存放各种文档,图片等一些文件,不管屏幕有多大,很快就会堆积大量的文件在桌面上,可是又懒得或没时间去规整他们,那该怎么办? ...

  8. Origin导入数据时设置对应的时间

    时间格式按照yyyymmdd排列的数据格式,导入到Origin中显示是数值&文本 在属性中选择日期,在显示中选择自定义,在自定义显示中选择yyyymmdd

  9. 将List中数据放入实体类中再放入List中(对List数据中进行分页处理)

    [java]  view plain  copy List<TagInfo> tagInfoList=new ArrayList<TagInfo>(); for(int j=0 ...

最新文章

  1. LeetCode 51. N-Queens--回溯法 pyhon,java,c++解法
  2. springboot2.0系列(二):配置属性
  3. python 中 if __name__ == '__main__' 判断的作用
  4. 安装mysql为什么下载了很多_mysql下载与安装过程
  5. python3的3D开发-基于blender(1)
  6. Android获取当前位置的三种方式及其使用方法
  7. SAP Hybris和ABAP Netweaver里的DAO(Data access object)
  8. 动画 制作_您希望制作的10个醒目的徽标动画
  9. mysql 替换非中文_mysql中的正则操作 匹配手机号,匹配中文,替换
  10. docker dead but pid file exists 问题
  11. 华为跨域bgp_通知:2019华为认证体系全新升级!
  12. pom.xml文件引入tools.jar
  13. mysql join 主表唯一_mysql left join 右表数据不唯一的情况解决方法
  14. python绘制中国_用Python画中国地图(下)
  15. 国庆节怎么少得了国旗:国旗头像
  16. 9月第1周基金排行榜 | TokenInsight
  17. vue-cli 3 和 vue-cli 2的区别[转]
  18. CSS入门学习笔记+案例(1)
  19. redistemplate opsforvalue和boundValueOps
  20. C#中File和FileInfo的区别和用法

热门文章

  1. android3D摄像机
  2. docker初识_初识Docker - 阮少爷的个人空间 - OSCHINA - 中文开源技术交流社区
  3. 第二章 图像基本运算及变换
  4. Windows下的Autorun程序
  5. 翟欣欣微博发声解释“真相”
  6. 小程序源码:王者荣耀装逼神器助手-多玩法安装简单
  7. 领导又让我做可视化报告,找了几天,终于让我找到神器了
  8. 不要在我寂寞的时候说爱我
  9. ADA开发环境的建立
  10. 【历史】- 一段关于 Unix、Linux 和 Windows 的暗黑史