• Event Time (事件时间):事件创建的时间(必须包含在数据源中的元素⾥⾯)
• Ingestion Time (摄⼊时间):数据进⼊ Flink 的 source 算⼦的时间,与机器相关
• Processing Time (处理时间):执⾏操作算⼦的本地系统时间,与机器相关


⽔位线:系统认为时间戳⼩于⽔位线的事件都已经到达了
因此, Window 的执⾏也是由 Watermark 触发的(⽔位线 >= 窗⼝结束时间)

⽔位线产⽣的公式:⽔位线 = 系统观察到的最⼤事件时间 - 最⼤延迟时间
最⼤延迟时间由程序员⾃⼰设定

实验

1.下载netcat
1)下载netcat,地址:https://eternallybored.org/misc/netcat/(两个版本下载哪个都可以),默认32位的,但是也可以在64位运行
2)将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。
3)执行下面命令 nc -l -p 9999(这个命令就是linux下的nc -lk 9999)

2.开启flink从socket中读取数据

package com.claroja;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class WaterMark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);
//        env.getConfig().setAutoWatermarkInterval(60 * 1000L); //默认200ms更新水位线,每进来一个event都记录最新的时间,但200ms才会更新DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MapFunction<String, Tuple2<String, Long>>() {// Tuple2<key, timestamp>,事件事件必须是毫秒时间戳@Overridepublic Tuple2<String, Long> map(String s) throws Exception {String[] arr = s.split(" ");return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 水位线必须在keyby之前WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))// 最大延迟时间是5s.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {// 设置event_time@Overridepublic long extractTimestamp(Tuple2<String, Long> r, long l) {return r.f1; // 告诉系统元素的事件时间戳是r.f1字段}})).keyBy(r -> r.f0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {long count = 0L;for (Tuple2<String, Long> i : iterable) {count += 1;}collector.collect("窗口中共有 " + count + " 条元素");}}).print();env.execute();}
}

Flink watermark相关推荐

  1. Flink WaterMark 详解

    摘录仅供学习使用,原文来自: Flink详解系列之五--水位线(watermark) - 简书 1.概念 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的 ...

  2. 一文理解Flink 水位线(Flink Watermark)

    文章目录 Flink 中的时间语义 `处理时间` `事件时间` 水位线(Watermark) `事件时间和窗口` `什么是水位线` 有序流中的水位线 乱序流中的水位线 `水位线的特性` `如何生成水位 ...

  3. Flink Watermark相关概念(窗口、水位线、迟到事件)

    Flink 为实时计算提供了三种时间,即事件时间(event time).摄入时间(ingestion time)和处理时间(processing time).在进行 window 计算时,使用摄入时 ...

  4. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  5. flink中的WaterMark调研和具体实例

    一些基本概念介绍: Event Time 事件时间是每个事件在其生产设备上发生的时间 Ingestion Time 摄取时间是数据进入Flink的时间 Processing Time 处理时间是是指正 ...

  6. 这一次,你能彻底搞懂 Flink!

    近年来,AI 场景发展得如火如荼,同时其计算规模也越来越大.这也让专注于数据处理的 Flink 有了较大的发展空间.Flink作为在大数据生态里实时处理的一个新框架,在一定程度上也有一定的难度. Fl ...

  7. 为什么阿里如此钟爱Flink?

    近年来,AI 场景发展得如火如荼,同时其计算规模也越来越大.这也让专注于数据处理的 Flink 有了较大的发展空间.Flink作为在大数据生态里实时处理的一个新框架,在一定程度上也有一定的难度. Fl ...

  8. Flink再度霸榜,还不上车?

    近些年,随着5G和移动互联网的发展,企业业务不断升级对数据实时计算处理能力要求越来越高,如故障定位监测.电商促销等.Flink 作为新一代开源大数据计算引擎,以高吞吐.低延迟的优异实时计算能力,使企业 ...

  9. 【Flink】Flink 的输出 Output CountingOutput

    文章目录 1.概述 1.1 RecordWriterOutput 1.2 BroadcastingOutputCollector 2. RecordWriterOutput 3.Broadcastin ...

最新文章

  1. AppBaseJs 类库 网上常用的javascript函数及其他js类库写的
  2. 卧槽!成就了Java,开发框架排第一,你还是不够了解它!
  3. java 4d_GitHub - wm3445/Java-concurrency at 4d10ae51a9deec37340fc40d03f205cfbe8de43b
  4. 最近发现了好多好资源,赶紧收藏一下!【粒子特效】
  5. 谷歌新App观妙中国发布:AR传承文化艺术,小米vivo应用宝可体验
  6. Ubuntu/CentOS下使用脚本自动安装 Docker
  7. java 正则 空格_Java中关于空格的正则表达式
  8. Spring Security OAuth2源码解析(二)
  9. 同花顺2020年净利润17亿元增长近一倍,DAU超1400万
  10. oracle按时间点还原数据
  11. UITableView 界面小实例
  12. JAVA编写的一个简单的Socket实现的HTTP响应服务器
  13. html中加载页面时调用函数,js页面加载时调用函数方法
  14. oracle rman备份和恢复数据库,Oracle rman备份和还原恢复数据库
  15. Android信息处理机制
  16. 服务器防护是什么?为什么需要防护?
  17. 路由器刷openwrt
  18. 论文阅读:基于多模态词向量的语句距离计算方法
  19. 五大靠谱的婚恋相亲APP详细特点缺点分析!
  20. MyEclipse启动加速与优化

热门文章

  1. 什么是哲学为基础的设计模式?
  2. 宽依赖和窄依赖_Kardemir开始生产窄钢板,进入板材市场
  3. 使用遇到的问题_水性漆在使用过程中遇到的问题以及解决方案
  4. python wmi 显卡型号_python - wmi模块学习(windwos硬件信息获取)
  5. python建立一个字符串_python字符串基本方法
  6. Windows下如何安装MySQL服务
  7. C语言 判断一个数是否为素数
  8. 关于C++中的pow()函数
  9. 什么是软件测试资产,观点:当前是国内推广软件资产管理的最佳时期
  10. 377. 组合总和 Ⅳ(JavaScript)