目录

案例一 基于时间的滚动和滑动窗口

需求

代码实现


案例一 基于时间的滚动和滑动窗口

需求

nc -lk 9999

有如下数据表示:

信号灯编号和通过该信号灯的车的数量

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,4

需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口

需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口

代码实现

package cn.it.window;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** Author lanson* Desc* nc -lk 9999* 有如下数据表示:* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4* 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口* 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口*/
public class WindowDemo01_TimeWindow {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);//3.Transformation//将9,3转为CartInfo(9,3)SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {@Overridepublic CartInfo map(String value) throws Exception {String[] arr = value.split(",");return new CartInfo(arr[0], Integer.parseInt(arr[1]));}});//分组//KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");// * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口//timeWindow(Time size窗口大小, Time slide滑动间隔)SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)//.timeWindow(Time.seconds(5))//当size==slide,可以只写一个//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count");// * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS.keyBy(CartInfo::getSensorId)//.timeWindow(Time.seconds(10), Time.seconds(5)).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum("count");//4.Sink
/*
1,5
2,5
3,5
4,5
*///result1.print();result2.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class CartInfo {private String sensorId;//信号灯idprivate Integer count;//通过该信号灯的车的数量}
}

2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口相关推荐

  1. 2021年大数据Flink(二十):案例二 基于数量的滚动和滑动窗口

    目录 案例二 基于数量的滚动和滑动窗口 需求 代码实现 案例二 基于数量的滚动和滑动窗口 需求 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗 ...

  2. 2021年大数据Flink(九):Flink原理初探

    Flink原理初探 Flink角色分工 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程. JobManager: 它扮演的是集群管理者的角色,负责调度任务.协调 checkp ...

  3. 2021年大数据Hive(九):Hive的数据压缩

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive的数据压缩 一.MR支持的压缩编码 二.压缩配置 ...

  4. 2021年大数据Kafka(九):kafka消息存储及查询机制原理

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 kafka消息存储及查询机制原理 一.Kafka数据存储机制 ...

  5. 2021年大数据HBase(九):Apache Phoenix的安装

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 前言 系列历史文章 安装Phoenix 一.下载 二.安装 1.上传安装包 ...

  6. 2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三

    目录 案例三 需求 编码步骤 代码实现-方式1 代码实现-方式2 案例三 需求 使用Flink SQL来统计5秒内 每个用户的 订单总数.订单的最大金额.订单的最小金额 也就是每隔5秒统计最近5秒的每 ...

  7. 2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示

    Allowed Lateness案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解 ...

  8. 2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示

    目录 Watermaker案例演示 需求 API 代码实现-1-开发版-掌握 代码实现-2-验证版-了解 Watermaker案例演示 需求 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时 ...

  9. 2021年大数据Flink(十八):Flink Window操作

    目录 ​​​​​​​Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 ​​​​​​​按照slide和size分类 ​​​​​​​总结 Window ...

最新文章

  1. 被Python「苦虐」的日子太惨了!
  2. Linux下SSH命令使用方法详解
  3. 服务器加根网线用不用修改路由器,安装设置无线路由器需要用几根网线?
  4. 13.C++ vector 操作
  5. 微信抢红包的方案_微信社群运营应该怎么运作?
  6. python花瓣飘零_Python 爬虫: 抓取花瓣网图片
  7. java udp转发_【Java】UDP发包的简单实现
  8. b站学python哪个好学_B站上那个不用钱的学习python的资源
  9. 会议论文参考文献格式(待更新)
  10. Python:正则表达式re.compile()
  11. android 打电话流程,Android 打电话 流程
  12. 由浅入深理解区块链技术
  13. 【Qt网络编程】实现TCP协议通信
  14. UDAF和UDF的介绍
  15. kubernetes系列之五:IPVS概览
  16. 《权威指南》笔记 -- 8.4 作为值的函数
  17. background-position什么意思
  18. 基于ViewFlipper实现图片浏览组件
  19. 未来10年什么最赚钱 未来十大热门行业盘点
  20. 加拿大安省欢ajax,加拿大安省EOI系统上线,开始接受申请!

热门文章

  1. Kubernetes 中 设置pod不部署在同一台节点上
  2. Postman使用Date数据类型,Postman发送Date类型数据,Postman模拟前端调用
  3. 2017 年已读书单总结
  4. Anaconda3+python3.7.10+TensorFlow2.3.0+PyQt5环境搭建
  5. 人生的路上,要懂得坚强
  6. win10系统的“管理员”
  7. 正确“假期休息模式”
  8. pycharm设置编写的脚本页面长行实现自动换行(windows版)
  9. Lidar激光雷达市场
  10. MindSpore静态图语法支持