2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
目录
SparkStreaming实战案例四 窗口函数
需求
代码实现
SparkStreaming实战案例四 窗口函数
需求
使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!
回顾窗口:
窗口长度:要计算多久的数据
滑动间隔:每隔多久计算一次
窗口长度10s > 滑动间隔5s:每隔5s计算最近10s的数据--滑动窗口
窗口长度10s = 滑动间隔10s:每隔10s计算最近10s的数据--滚动窗口
窗口长度10s < 滑动间隔15s:每隔15s计算最近10s的数据--会丢失数据,开发不用
代码实现
package cn.itcast.streamingimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkStreaming接收Socket数据,node01:9999* 窗口长度:要计算多久的数据* 滑动间隔:每隔多久计算一次* 窗口长度10s > 滑动间隔5s:每隔5s计算最近10s的数据--滑动窗口* 窗口长度10s = 滑动间隔10s:每隔10s计算最近10s的数据--滚动窗口* 窗口长度10s < 滑动间隔15s:每隔15s计算最近10s的数据--会丢失数据,开发不用* 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!*/
object SparkStreamingDemo04_Window {def main(args: Array[String]): Unit = {//1.创建环境val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//batchDuration the time interval at which streaming data will be divided into batches//流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()//注意:因为涉及到历史数据/历史状态,也就是需要将历史数据/状态和当前数据进行合并,作为新的Value!//那么新的Value要作为下一次的历史数据/历史状态,那么应该搞一个地方存起来!//所以需要设置一个Checkpoint目录!ssc.checkpoint("./ckp")//2.接收socket数据val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)//3.做WordCountval resultDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))//windowDuration:窗口长度:就算最近多久的数据,必须都是微批间隔的整数倍//slideDuration :滑动间隔:就是每隔多久计算一次,,必须都是微批间隔的整数倍//使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Seconds(10),Seconds(5))//总结:实际开发中需要学会的是如何设置windowDuration:窗口长度和slideDuration :滑动间隔//如进行如下需求://每隔30分钟(slideDuration :滑动间隔),计算最近24小时(windowDuration:窗口长度)的各个广告点击量,应该进行如下设置://.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(24*60),Minutes(30))//每隔10分钟(slideDuration :滑动间隔),更新最近1小时(windowDuration:窗口长度)热搜排行榜//.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(60),Minutes(10))//4.输出resultDS.print()//5.启动并等待程序停止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数相关推荐
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口
目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...
- 2021年大数据Spark(十二):Spark Core的RDD详解
目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...
- 2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用FileBeat采集Kafka日志到Elasticsearch 一.需求分 ...
- 大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
文章目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- 2021年大数据Spark(十四):Spark Core的RDD操作
目录 RDD的操作 函数(算子)分类 Transformation函数 Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- 2021年大数据Spark(十六):Spark Core的RDD算子练习
目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 first.take.top 算子 ...
最新文章
- SAP WM LRFMD中Variant参数的影响初探
- mysql 乱字符_JDBC ODBC MYSQL中文出现乱字符 解决
- Spring xml 注入静态变量
- 两者相差百分比怎么算_不知道烘焙百分比的全拖出来打屁股!
- Java黑皮书课后题第1章:1.3(显示图案)编写程序,显示下面的图案 Java
- 遇到Visual Studio 当前不会命中断点.还没有为该文档加载任何符号的情况
- 使用Axios拦截器打印前端请求日志和后端后返回日志
- Spring Boot————Spring Data JPA简介
- linux整个文件夹下全部文件的属性,C/C++遍历目录下的所有文件(Windows/Linux篇,超详细)...
- makefile 文件 (​ http://blog.csdn.net/ruglcc/article/details/7814546/ )
- 【报告分享】2020年中国企业直播服务市场研究报告.pdf(附下载链接)
- 魔方与科学和计算机表现李世春,科学网—魔方 - 李世春的博文
- VS2015彻底卸载干净
- 代理内网穿透-Lcx.exe-venom-proxychains
- et al和etc区别
- java求100以内的素数
- 计算机科学相关的期刊,计算机类期刊汇总(核心期刊,国家级期刊)
- 一个最简单的自定义锁屏应用实现
- 量子计算机五条原则,量子信息科学:量子计算机、隐形传物与人脑量子运算
- Vue3生命周期函数的那些事