2021年大数据Flink(二十一):案例三 会话窗口
目录
案例三 会话窗口
需求
代码实现
案例三 会话窗口
需求
设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
代码实现
package cn.it.window;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** Author lanson* Desc* nc -lk 9999* 有如下数据表示:* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4* 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)*/
public class WindowDemo03_SessionWindow {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);//3.Transformation//将9,3转为CartInfo(9,3)SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {@Overridepublic CartInfo map(String value) throws Exception {String[] arr = value.split(",");return new CartInfo(arr[0], Integer.parseInt(arr[1]));}});//需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("count");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class CartInfo {private String sensorId;//信号灯idprivate Integer count;//通过该信号灯的车的数量}
}
2021年大数据Flink(二十一):案例三 会话窗口相关推荐
- 2021年大数据Flink(十一):流批一体API Source
目录 Source 预定义Source 基于集合的Source 基于文件的Source 基于Socket的Source 自定义Source 随机生成数据 MySQL Sou ...
- 2021年大数据HBase(十一):Apache Phoenix的视图操作
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...
- 2021年大数据Hive(十一):Hive调优
全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive调优 一.本地模式 1.空key处理 二.SQL ...
- 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...
- 2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
目录 总结 Flink-SQL常用算子 SELECT WHERE DISTINCT GROUP BY UNION 和 UNION ALL JOI ...
- 2021年大数据Flink(三十六):Table与SQL 案例三
目录 案例三 需求 编码步骤 代码实现-方式1 代码实现-方式2 案例三 需求 使用Flink SQL来统计5秒内 每个用户的 订单总数.订单的最大金额.订单的最小金额 也就是每隔5秒统计最近5秒的每 ...
- 2021年大数据Hadoop(十一):HDFS的元数据辅助管理
2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的元数据辅助管 ...
- 2021年大数据Flink(四十八):扩展阅读 Streaming File Sink
目录 扩展阅读 Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读 配置详解 PartFile PartFile序列化编码 ...
- 2021年大数据Flink(四十五):扩展阅读 双流Join
目录 扩展阅读 双流Join 介绍 Window Join Interval Join 代码演示1 代码演示2 重点注意 扩展阅读 双流Join 介绍 https:// ...
最新文章
- 【spring】spel表达式
- linux系统主机信任,Linux信任主机(SSH)
- Spark与Flink:对比与分析
- mysql改原始密码mac_MAC版修改MySQL初始密码的方法
- 小米MIX4不会采用四曲面屏:结果未必是坏事
- iText in Action 2nd5.2节(Events for basic building blocks)读书笔记
- excel制作跨职能流程图_用Excel规划求解工具,实现组合投资优化
- Linux对象文件是个啥东东
- 信创终端违规外联案例分析及防控措施
- 【imessage软件群推送】 “CMCC“ | grep password #待补充 重置后撤销暂存的变更
- 【LaTeX 教程】03. LaTeX 字体字号设置
- 英雄联盟LOL JAVA版
- 3轴码垛机械臂运动学逆解
- [SSM框架]—Mybatis入门
- 结巴(jieba)分词的使用-Java实现
- collections
- C/C++程序内存布局(data段,bss段,text段)以及static关键字详解
- 微信小程序开发之倒计时定时器
- Win系统 - 关于 CPU C-States 省电模式,你需要知道的事情
- 模型评价指标之ROC、AUC和GAUC