2019独角兽企业重金招聘Python工程师标准>>>

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻译:https://www.jianshu.com/p/68ab40c7f347

1. 几个重要的概念简述:

  • Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算

  • start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间

  • event-time: 事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间

  • Watermarks:可以把他理解为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。

2.如何使用Watermarks处理乱序的数据流

什么是乱序呢?可以理解为数据到达的顺序和他的event-time排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等

因为Watermarks是用来触发window窗口计算的,我们可以根据事件的event-time,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会。

假如我们设置10s的时间窗口(window),那么0~10s,10~20s都是一个窗口,以0~10s为例,0位start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
当B到达的时候,Watermarks为max(12.8,5)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
触发计算的时候,会将AC(因为他们都小于10)都计算进去

通过上面这种方式,我们就将迟到的C计算进去了

这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算的,加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。

3.看一个代码的实际例子

下面代码中的BoundedOutOfOrdernessGenerator就是一个典型的Watermarks实例

package xuwei.tech;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.meituan.flink.common.conf.FlinkConf;
import com.meituan.flink.common.kafka.MTKafkaConsumer08;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;/*** Created by smile on 14/11/2017. 统计每 10 秒内每种操作有多少个*/
public class EventTimeWindowCount {private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class);public static void main(String[] args) throws Exception { // 获取作业名String jobName = FlinkConf.getJobName(args); // 获取执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置使用 EventTime// 作为时间戳(默认是// ProcessingTime)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 开启 Checkpoint(每 10 秒保存一次检查点,模式为 Exactly Once)env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置从 Kafka 的 topic// "log.orderlog" 中读取数据MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName);DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默认接上次开始消费,以下的写法(setStartFromLatest)可以从最新开始消费,相应的还有(setStartFromEarliest// 从最旧开始消费)// DataStream<String> stream =// env.addSource(consumer.getInstance("log.orderlog", new// SimpleStringSchema()).setStartFromLatest());DataStream<String> orderAmount = // 将读入的字符串转化为 OrderRecord 对象stream.map(new ParseOrderRecord()) // 设置从 OrderRecord 对象中提取时间戳的方式,下文 BoundedOutOfOrdernessGenerator// 类中具体实现该方法.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 对象的 action// 字段进行分流(相同 action// 的进入相同流,不同 action// 的进入不同流).keyBy("action") // 触发 10s 的滚动窗口,即每十秒的数据进入同一个窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 将同一窗口的每个 OrderRecord 对象的 count// 字段加起来(其余字段只保留第一个进入该窗口的,后进去的丢弃).sum("count") // 将结果从 OrderRecord 对象转换为 String,每十万条输出一条.flatMap(new ParseResult()); // 如果想每条都输出来,那就输得慢一点,每 10 秒输出一条数据(请将上一行的 flatMap 换成下一行的 map)// .map(new ParseResultSleep());// 输出结果(然后就可以去 Task Manage 的 Stdout 里面看)// 小数据量测试的时候可以这么写,正式上线的时候不要这么写!数据量大建议还是写到 Kafka Topic 或者其他的下游里面去orderAmount.print();env.execute(jobName);}public static class ParseOrderRecord implements MapFunction<String, OrderRecord> {@Overridepublic OrderRecord map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);long id = jsonObject.getLong("id");int dealId = jsonObject.getInteger("dealid");String action = jsonObject.getString("_mt_action");double amount = jsonObject.getDouble("amount");String timestampString = jsonObject.getString("_mt_datetime"); // 将字符串格式的时间戳解析为 long 类型,单位毫秒SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date timestampDate = simpleDateFormat.parse(timestampString);long timestamp = timestampDate.getTime();return new OrderRecord(id, dealId, action, amount, timestamp);}}public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 将数据中的时间戳字段(long 类型,精确到毫秒)赋给// timestamp 变量,此处是// OrderRecord 的 timestamp// 字段long timestamp = record.timestamp;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the// out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}public static class ParseResult implements FlatMapFunction<OrderRecord, String> {private static long msgCount = 0;@Overridepublic void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十万条输出一条,防止输出太多在 Task// Manage 的 Stdout 里面刷新不出来if (msgCount == 0) {out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp)+ " action: " + record.action + " count = " + record.count);msgCount = 0;}msgCount++;msgCount %= 100000;}}public static class ParseResultSleep implements MapFunction<OrderRecord, String> {@Overridepublic String map(OrderRecord record) throws Exception { // 每 10 秒输出一条数据,防止输出太多在 Task Manage 的 Stdout 里面刷新不出来// 正式上线的时候不要这么写!Thread.sleep(10000);return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: "+ record.action + " count = " + record.count;}}public static class OrderRecord {        public long id;        public int dealId;        public String action;        public double amount;        public long timestamp;        public long count;        public OrderRecord() {}        public OrderRecord(long id, int dealId, String action, double amount, long timestamp) {            this.id = id;            this.dealId = dealId;            this.action = action;            this.amount = amount;            this.timestamp = timestamp;            this.count = 1;}}
}

转载于:https://my.oschina.net/xiaominmin/blog/3057628

初学Flink,对Watermarks的一些理解和感悟(透彻2)相关推荐

  1. 初学Java对某些问题的理解

    1下面的代码中每个词表示什么意思? publicy①   class②   Hello③ { public④  static⑤  void⑥  main⑦ (String[] args)⑧{ } } ...

  2. Flink 原理与实现:理解 Flink 中的计算资源

    Operator Chains 为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task.每个task在一个线程中执行.将operators链 ...

  3. 性能测试初学_对loadrunner脚本的理解

    例子:loadrunner自带的飞机订票系统 录制登陆操作脚本 对照F1帮助及火狐抓包的理解,整理loadrunner脚本与抓包的关联.此为个人理解,有些未证实,权当记录. Action() { we ...

  4. Flink批流一体的理解

    在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务: MapReduce只支持批处理任务: Storm只支持流处理任务: Spark ...

  5. Flink Parallelism 和 Slot 深度理解

    相信使用过 Flink 的你或多或少遇到过下面这个问题(笔者自己的项目曾经也出现过这样的问题),错误信息如下: Caused by: akka.pattern.AskTimeoutException: ...

  6. Flink Kafka两阶段提交理解

    参考     https://zhuanlan.zhihu.com/p/111304281 https://developer.aliyun.com/article/720793 flink官网文档链 ...

  7. python index函数时间复杂度_初学python之以时间复杂度去理解列表常见使用方法

    列表list,一个有序的队列 列表内的个体为元素,由若干个元素按照顺序进行排列,列表是可变化的,也就是说可以增删 list定义 常用的列表定义方式: 使用[] 或者 a = list() 取数列表可以 ...

  8. flink的operator state简单理解

    每个子任务都有自己的状态,子任务之间的状态互相隔离,互不影响 每个子任务的状态一般都是列表的形势,相当于子列表 持久化存储的时候会把每个子任务的子列表组合起来,组成一个大的列表的结构 当任务失败重启的 ...

  9. Flink Interval Join源码理解

    参考: https://www.jianshu.com/p/179beca9f307 interval join :两条数据流+between边界+过期数据清理 demo: 下面看下源码实现 inte ...

最新文章

  1. SAP MM 物料库存转固定资产,报错:You cannot post to asset in company code 1900 fiscal year 2021
  2. python命令解析_python解析命令行
  3. signature=89b7a6bcfac55abae5ac369dafee29f4,Capecitabine
  4. 化腐朽为神奇:推荐一个让算法动起来更好理解的学习项目!
  5. python怎么安装包-怎么安装python包
  6. Angular JS - 9 - SeaJS加载js模块
  7. crm——stark组件核心原理
  8. Nginx -静态资源Web服务
  9. 【网易游戏面试题】.NET中强引用和弱引用是什么
  10. Electron的学习笔记
  11. 领英·影响力2020:职场人再定位,千里马伯乐新论
  12. 实时数据库中的二级压缩技术
  13. CodeForces - 721E
  14. iOS底层探索之类的结构—cache分析(下)
  15. Camera 花屏、分屏、卡屏,黑屏问题分析
  16. 密码战争,区块链技术之路
  17. python支付宝自动转账_如何使用python实现支付宝转账接口
  18. 什么是ESP/MSR 分区,如何建立ESP/MSR 分区
  19. 安卓模拟器最优选择+抖音检测全绕过简述
  20. 老年人智能手机APP开发界面设计具体策略

热门文章

  1. HDU 1269 移动城堡 联通分量 Tarjan
  2. 项目经理问:为什么总是只有我在加班 – 挂包袱现象
  3. dedecms织梦模板修改专题路径的方法
  4. 使用Response.ContentType 来控制下载文件的类型
  5. 使用 .NET 实现 Ajax 长连接 (Part 1 - Comet Web Service)
  6. 移动互联网漫谈(4)-移动通信网络
  7. 3.2-3.3 Hive中常见的数据压缩
  8. DOM操作之属性和样式操作
  9. Java利用POI生成Excel强制换行
  10. ACM-线段树扫描线总结