目录

案例三 会话窗口

需求

代码实现


案例三 会话窗口

需求

设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

代码实现


package cn.it.window;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** Author lanson* Desc* nc -lk 9999* 有如下数据表示:* 信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4* 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)*/
public class WindowDemo03_SessionWindow {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);//3.Transformation//将9,3转为CartInfo(9,3)SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {@Overridepublic CartInfo map(String value) throws Exception {String[] arr = value.split(",");return new CartInfo(arr[0], Integer.parseInt(arr[1]));}});//需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("count");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class CartInfo {private String sensorId;//信号灯idprivate Integer count;//通过该信号灯的车的数量}
}

2021年大数据Flink(二十一):​​​​​​​案例三 会话窗口相关推荐

  1. 2021年大数据Flink(十一):流批一体API Source

    目录 Source 预定义Source 基于集合的Source 基于文件的Source ​​​​​​​基于Socket的Source 自定义Source 随机生成数据 ​​​​​​​MySQL Sou ...

  2. 2021年大数据HBase(十一):Apache Phoenix的视图操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...

  3. 2021年大数据Hive(十一):Hive调优

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive调优 一.本地模式 1.空key处理 二.SQL ...

  4. 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...

  5. 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    目录 总结 Flink-SQL常用算子 SELECT WHERE ​​​​​​​DISTINCT ​​​​​​​GROUP BY ​​​​​​​UNION 和 UNION ALL ​​​​​​​JOI ...

  6. 2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三

    目录 案例三 需求 编码步骤 代码实现-方式1 代码实现-方式2 案例三 需求 使用Flink SQL来统计5秒内 每个用户的 订单总数.订单的最大金额.订单的最小金额 也就是每隔5秒统计最近5秒的每 ...

  7. 2021年大数据Hadoop(十一):HDFS的元数据辅助管理

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的元数据辅助管 ...

  8. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  9. 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    目录 扩展阅读  双流Join 介绍 Window Join Interval Join ​​​​​​​代码演示1 ​​​​​​​代码演示2 重点注意 扩展阅读  双流Join 介绍 https:// ...

最新文章

  1. 【spring】spel表达式
  2. linux系统主机信任,Linux信任主机(SSH)
  3. Spark与Flink:对比与分析
  4. mysql改原始密码mac_MAC版修改MySQL初始密码的方法
  5. 小米MIX4不会采用四曲面屏:结果未必是坏事
  6. iText in Action 2nd5.2节(Events for basic building blocks)读书笔记
  7. excel制作跨职能流程图_用Excel规划求解工具,实现组合投资优化
  8. Linux对象文件是个啥东东
  9. 信创终端违规外联案例分析及防控措施
  10. 【imessage软件群推送】 “CMCC“ | grep password #待补充 重置后撤销暂存的变更
  11. 【LaTeX 教程】03. LaTeX 字体字号设置
  12. 英雄联盟LOL JAVA版
  13. 3轴码垛机械臂运动学逆解
  14. [SSM框架]—Mybatis入门
  15. 结巴(jieba)分词的使用-Java实现
  16. collections
  17. C/C++程序内存布局(data段,bss段,text段)以及static关键字详解
  18. 微信小程序开发之倒计时定时器
  19. Win系统 - 关于 CPU C-States 省电模式,你需要知道的事情
  20. 模型评价指标之ROC、AUC和GAUC

热门文章

  1. kotlin Bean加载失败lateinit property has not been initialized
  2. Go 知识点(03)— 非缓冲 channel 的长度始终为 0
  3. Python 标准库之 datetime
  4. 字符常量和仅包含一个字符的字符串之间的区别
  5. 【牛腩新闻发布系统】整合前台04
  6. 论文阅读工具ReadPaper
  7. Python中re的match、search、findall、finditer区别正则
  8. 用gensim学习word2vec
  9. 电子设计搜索引擎引入分析和见解
  10. Android AnimationUtils (动画)的使用