窗口的分类

按照时间生成Window,为TimeWindow,根据窗口实现原理可分为三类:

  1. 滚动窗口(Tumbling Window):将数据依据固定的窗口长度对数据进行分片。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会重叠。适用于做每个时间段的聚合计算
  2. 滑动窗口(Sliding Window):由固定的窗口长度和滑动间隔组成,适用于最近一个时间段内统计,窗口长度固定,可有重叠
  3. 会话窗口(Session Window):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新的数据就会生成新的窗口。会话窗口是指一段用户持续活跃周期,由非活跃的间隙分隔开,时间不对齐

按照指定的数据条数生成一个Window,与时间无关,为CountWindow:

  1. 滚动窗口(Tumbling Window):默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行
  2. 滑动窗口(Sliding Window):和滚动窗口的函数名完全一致,只是传入的参数不同,滑动窗口需要传入两个参数,一个是window_size,一个是sliding_size

注意事项:所有窗口是左闭右开

代码实际案例

关于TimeWindow的算子的应用

def time_window_func(sEnv: StreamExecutionEnvironment) = {sEnv.setParallelism(1)val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999).flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)/**滚动窗口*/val tumblingDS = keyByDS.timeWindow(Time.seconds(3)).sum(1)tumblingDS.print()/**滑动窗口*/val slidingDS = keyByDS.timeWindow(Time.seconds(3), Time.seconds(1))slidingDS.sum(1).print()/**会话窗口:固定时间间隔为10s的事件时间会话窗口*/val sessionDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))sessionDS.sum(1).print()sEnv.execute()}

关于CountWindow的算子应用

  def count_window_func(sEnv: StreamExecutionEnvironment) = {val keyByDS = sEnv.socketTextStream("localhost", 9999).flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)/**滚动函数*/keyByDS.countWindow(3).reduce((t1, t2) =>{(t1._1, t2._2 + t2._2)}).print()/**滑动窗口 这里sliding_size为2,表示每收到两个相同的key的数据就计算一次,每一次计算的window范围是3个元素*/keyByDS.countWindow(3, 2).reduce((t1, t2) =>{(t1._1, t1._2 + t2._2)}).print()sEnv.execute()}

但是在实际生产过程中,会用到很多会话窗口的算子进行处理数据,会话窗口包括固定时间窗口和动态时间窗口的操作,,这里我简述几个简单的关于会话窗口的算子操作

def session_windows(sEnv: StreamExecutionEnvironment) = {sEnv.setParallelism(1)val keyByDS: KeyedStream[(String, Int), String] = sEnv.socketTextStream("localhost", 9999).flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)/**固定时间间隔为10s的事件时间会话窗口*/keyByDS.window(EventTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()/**动态时间间隔的事件时间会话窗口*/keyByDS.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)] {override def extract(eles: (String, Int)): Long = {//动态指定并返回Session Gapeles._2 + 10}})).allowedLateness(Time.seconds(10))  //迟到生存期,默认是0,即事件时间窗口窗口在水印到来后结束,无需考虑事件迟到的情况.sum(1).print()/**固定时间间隔为10s的处理时间会话窗口*/keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1).print()/**动态时间间隔的处理时间会话窗口*/keyByDS.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[(String, Int)] {override def extract(t: (String, Int)): Long = {//根据事件特征确定会话窗口间隔t._1.length}})).sum(1).print()sEnv.execute()}

全局窗口案例

    /**全窗口函数(full window functions): ProcessWindowFunction 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据*//**全局窗口:将相同key的所有元素聚在一个, 但是这种窗口没有起点和终点,因此必须自定义触发器*/val processDS: DataStream[String] = keyByDS.countWindow(3).process(new ProcessWindowFunction[(String, Int), String, String, GlobalWindow] {override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {out.collect(elements.mkString(","))}})

Flink的窗口计算案例相关推荐

  1. flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...

    本文阐述 Flink 的事件时间和 Watermark 机制,剖析 Watermark 产生和传递的流程. 1 Event time 和 Watermark 的关系 1.1 Event time 和 ...

  2. flink 处理迟到数据(Trigger、设置水位线延迟时间、允许窗口处理迟到数据、将迟到数据放入侧输出流、代码示例、迟到数据触发窗口计算重复结果处理)

    文章目录 前言 1.Trigger 2.处理迟到数据 2.1 设置水位线延迟时间 2.2 允许窗口处理迟到数据 2.3 将迟到数据放入侧输出流 3.实操 3.1 代码示例 3.2 中间遇到的异常 3. ...

  3. flink 自定义 窗口_《从0到1学习Flink》—— Flink Data transformation(转换)

    前言 在第一篇介绍 Flink 的文章 <<从0到1学习Flink>-- Apache Flink 介绍> 中就说过 Flink 程序的结构 Flink 应用程序结构就是如上图 ...

  4. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  5. Flink流式计算从入门到实战 一

    文章目录 一.理解Flink与流计算 1.初识Flink 2.Flink的适用场景 3.流式计算梳理 二.Flink安装部署 1.Flink的部署方式 2.获取Flink 3.实验环境与前置软件 4. ...

  6. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  7. 基于Flink秒级计算时CPU监控图表数据中断问题

    基于Flink进行秒级计算时,发现监控图表中CPU有数据中断现象,通过一段时间的跟踪定位,该问题目前已得到有效解决,以下是解决思路: 一.问题现象 以SQL02为例,发现本来10秒一个点的数据,有时会 ...

  8. Apache Flink,流计算?不仅仅是流计算!

    阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...

  9. Apache Flink,流计算?不仅仅是流计算! 1

    阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...

最新文章

  1. O - Layout POJ - 3169(差分约束)
  2. 第一篇:Entity Framework 简介
  3. 做系统ghost步骤图解_用好这工具,小孩都能会重装系统!
  4. poj3280 Cheapest Palindrome(回文串区间dp)
  5. 【论文分享】ACL 2020 信息抽取任务中的新动向
  6. 王道考研操作系统笔记(第二章)附:王道考研408所有PPT和思维导图
  7. Java8 实战系列-02-lambda 表达式简介
  8. [ 英语 ] 语法重塑 之 英语学习的核心框架 —— 英语兔学习笔记(1)
  9. BIP与Siebel系统集成
  10. 爬虫——获取页面源代码
  11. 小程序「Github开源社区」
  12. MSP430F5529之捕获模式下的HCSR04超声测距(粗略)
  13. 考研html模板素材,2020考研英语作文模板素材:品质类模板
  14. 微信小程序分享盆友圈(onShareTimeline)
  15. C++里string转换为int
  16. 直播真的适合金融行业吗?
  17. c++初级(本人scdn)
  18. 【ES6】阮一峰ES6学习之Module的语法
  19. Task 05 Python 爬虫入门
  20. IEEE 国际自动化科学与工程会议 (CASE2023)

热门文章

  1. Composer的实用部分
  2. 前端React教程第二课 React生命周期设计思想
  3. 操作系统中任务调度的实现
  4. 域名注册流程:如何选择购买合适的域名?
  5. android 性格测试 csdn,华为加密状态码查询脚本
  6. 如何选择良好的视觉引导定位技术?
  7. pystrich生成code128、ean13、qrcode二维码
  8. Spring MVC 防止XSS注入
  9. db2 日期英式写法_db2 日期时间格式
  10. vue 父子组件 组件挂载 组件通信 slot插槽