Flink程序的基本构建块是转换(请注意,Flink的DataSet API中使用的DataSet也是内部流 )。

1、实时需求

每隔5秒,统计最近10秒的窗口数据

2、开发环境部署

1. 官网建议使用IDEA,IDEA默认集成了Java和Maven,使用起来方便
2. 本次使用了Flink-1.12版本

3 、实时代码开发

public class StreamWordCount {public static void main(String[] args) throws Exception {//创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//DataSource操作DataStreamSource<String> sourceStream = env.socketTextStream("192.168.153.10", 6666);//通过匿名内部类的方式实现flatMap算子final SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = sourceStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {final String[] words = line.split(" ");for (String word : words) {collector.collect(new Tuple2<>(word, 1));}}});//keyBy分组操作final KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatMapStream.keyBy(value -> value.f0);//每隔5秒,统计最近10秒的窗口数据WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//sum求和操作final SingleOutputStreamOperator<Tuple2<String, Integer>> result = window.sum(1);//输出结果result.print();//执行程序env.execute("StreamWordCount");}
}

实时代码开发需连接集群,具体集群搭建方式参考Flink集群部署

4、离线需求

对文件中的单词内容进行统计计数

5、离线代码开发

class BatchWordCount {public static void main(String[] args) throws Exception {//创建执行环境final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//创建数据源final DataSource<String> source = env.fromElements("Flink flink sqoop hadoop", "flume hadoop MapReduce flink");//Transformation操作final FlatMapOperator<String, Tuple2<String, Integer>> flatMap = source.flatMap(new FlatMapClass());final AggregateOperator<Tuple2<String, Integer>> result = flatMap.groupBy(0).sum(1);//输出结果result.print();}private static class FlatMapClass implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {final String[] words = line.split(" ");for (String word : words) {collector.collect(new Tuple2<String,Integer>(word, 1));}}}
}

Flink实战——每隔5秒,统计最近10秒的窗口数据相关推荐

  1. TypeScript 3.9 正式发布!平均编译时长从 26 秒缩短至 10 秒

    作者 | 微软官方博客 译者 | 核子可乐 策划 | 小智 稿源 | 前端之巅 今天,微软在其官方博客宣布:TypeScript 3.9 版本已经正式发布,详情见下文. 有些朋友可能对 TypeScr ...

  2. 微信朋友圈视频变长从6秒增加为10秒

    微信iOS版本发布更新了,"大视频"允许拍摄更长时间的视频:从原来的6秒增加为10秒.新版微信还支持从相册里分享视频到朋友圈,不再强制要求只有直接拍摄的"小视频" ...

  3. 2021-1-22 potplayer 快进如何精准到1秒而不是10秒、9秒、3秒这么混乱

    原因:因为看视频的细节,然后前进和后退跳过了好多秒,只能重看的那种,非常难受.potplayer 快进如何精准到1秒而不是10秒.9秒.3秒这么混乱 解决方法:

  4. c语言倒计时10秒linux,单片机10秒倒计时c语言汇编语言程序

    (2)数码管动态显示(循环显示0-9,时间间隔为1秒,1秒的时间间隔用定时器T0实现) ①汇编语言: ORG 0000H AJMP MAIN ORG 000BH AJMP INTT0 ORG 0030 ...

  5. Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例

    用到的数据文件 用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu 1.需求 & 数据 用户行为数据不断写入 ...

  6. Flink实战 - 统计每个店铺每日GMV

    Flink实战-统计每个店铺每日GMV 接到一个实时指标的需求,计算每个店铺每日的实时GMV.这个实时指标不难,对每个店铺.每日分组累计销售额就OK. 就是想的这么简单,结果在上面踩了坑. 问题 计算 ...

  7. Apache Flink 实战教程:CEP 实战(转载)

    文章目录 原文链接: 一:Flink CEP 概念以及使用场景 1.什么是 CEP 2.Flink CEP 应用场景 3.Flink CEP 原理 二:Flink CEP 程序开发 1.Flink C ...

  8. pandas使用resample进行不同粒度下的时间特征重构实战:构建时间维度统计特征

    pandas使用resample进行不同粒度下的时间特征重构实战:构建时间维度统计特征 Pandas中的resample,重新采样,是对原样本重新处理的一个方法,是一个对常规时间序列数据重新采样和频率 ...

  9. 基于Flink秒级计算时CPU监控图表数据中断问题

    基于Flink进行秒级计算时,发现监控图表中CPU有数据中断现象,通过一段时间的跟踪定位,该问题目前已得到有效解决,以下是解决思路: 一.问题现象 以SQL02为例,发现本来10秒一个点的数据,有时会 ...

最新文章

  1. 超nb的网页标签弹窗js代码!
  2. Ubuntu16 安装Jira
  3. 使用JS实现2048小游戏
  4. 能源利用率逼近理论极限 阿里巴巴展示液冷黑科技
  5. 14/100. Merge Two Sorted Lists
  6. 声明式事务基于注解@Transactional的理解
  7. CF1034E Little C Loves 3 III(神仙构造+FWT_OR卷积)
  8. 魅蓝x android 7,魅蓝x2什么时候发布 魅蓝x2发布时间最新消息
  9. 【Python】处理 selenium.common.exceptions.WebDriverException 报错问题
  10. Spring基于注解的方式二
  11. 82-Spark的StandLone模式调试
  12. 【Java】Java速成
  13. 修改系统时区 /etc/localtime
  14. 一次CSDN客户体验经历
  15. 友元函数实现复数加减法
  16. centos安装mysql并设置代理
  17. BZOJ 1924 [Sdoi2010]所驼门王的宝藏 tarjan缩点+拓扑DP
  18. 使用CCS调试CC3200芯片GPIO接口应用----流水灯程序
  19. 【目录】全志F1C100S/F1C200S学习笔记
  20. 天邑ty1208z海思3798刷版本_[高安]天邑ty1208z晶晨s905lb免拆机强刷固件下载

热门文章

  1. 辞旧送吉虎,迎新接玉兔,祝大家新年快乐!
  2. [面试专题]前端需要知道的web安全知识
  3. Web前端jQuery实现监控大屏数字滚动
  4. [Rotation Transform] 旋转变换
  5. 解决服务器挖矿漏洞 crypto
  6. python给一个不多于 5 位的正整数,要求:一、求它是几位数,二、逆序打印出各位数字
  7. 【升级版】python全自动定时,循环发消息(微信、QQ),零基础应用,
  8. 电子传真文档怎样加盖印章
  9. 鉴客 iPad横屏(Landscape)尺寸规格说明
  10. 【19/04/18 膜赛】土豪聪要请客(stol)