目录

SparkStreaming实战案例四 窗口函数

需求

代码实现


SparkStreaming实战案例四 窗口函数

需求

使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!

回顾窗口:

窗口长度:要计算多久的数据

滑动间隔:每隔多久计算一次

窗口长度10s > 滑动间隔5s:每隔5s计算最近10s的数据--滑动窗口

窗口长度10s = 滑动间隔10s:每隔10s计算最近10s的数据--滚动窗口

窗口长度10s < 滑动间隔15s:每隔15s计算最近10s的数据--会丢失数据,开发不用

​​​​​​​代码实现

package cn.itcast.streamingimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkStreaming接收Socket数据,node01:9999* 窗口长度:要计算多久的数据* 滑动间隔:每隔多久计算一次* 窗口长度10s > 滑动间隔5s:每隔5s计算最近10s的数据--滑动窗口* 窗口长度10s = 滑动间隔10s:每隔10s计算最近10s的数据--滚动窗口* 窗口长度10s < 滑动间隔15s:每隔15s计算最近10s的数据--会丢失数据,开发不用* 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!*/
object SparkStreamingDemo04_Window {def main(args: Array[String]): Unit = {//1.创建环境val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//batchDuration the time interval at which streaming data will be divided into batches//流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()//注意:因为涉及到历史数据/历史状态,也就是需要将历史数据/状态和当前数据进行合并,作为新的Value!//那么新的Value要作为下一次的历史数据/历史状态,那么应该搞一个地方存起来!//所以需要设置一个Checkpoint目录!ssc.checkpoint("./ckp")//2.接收socket数据val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)//3.做WordCountval resultDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))//windowDuration:窗口长度:就算最近多久的数据,必须都是微批间隔的整数倍//slideDuration :滑动间隔:就是每隔多久计算一次,,必须都是微批间隔的整数倍//使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据!.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Seconds(10),Seconds(5))//总结:实际开发中需要学会的是如何设置windowDuration:窗口长度和slideDuration :滑动间隔//如进行如下需求://每隔30分钟(slideDuration :滑动间隔),计算最近24小时(windowDuration:窗口长度)的各个广告点击量,应该进行如下设置://.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(24*60),Minutes(30))//每隔10分钟(slideDuration :滑动间隔),更新最近1小时(windowDuration:窗口长度)热搜排行榜//.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Minutes(60),Minutes(10))//4.输出resultDS.print()//5.启动并等待程序停止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数相关推荐

  1. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  2. 2021年大数据Flink(十九):案例一 基于时间的滚动和滑动窗口

    目录 案例一 基于时间的滚动和滑动窗口 需求 代码实现 案例一 基于时间的滚动和滑动窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4 ...

  3. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  4. 2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用FileBeat采集Kafka日志到Elasticsearch 一.需求分 ...

  5. 大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    文章目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求

  6. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  7. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  8. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  9. 2021年大数据Spark(十六):Spark Core的RDD算子练习

    目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first.take.top 算子 ​​​ ...

最新文章

  1. SAP WM LRFMD中Variant参数的影响初探
  2. mysql 乱字符_JDBC ODBC MYSQL中文出现乱字符 解决
  3. Spring xml 注入静态变量
  4. 两者相差百分比怎么算_不知道烘焙百分比的全拖出来打屁股!
  5. Java黑皮书课后题第1章:1.3(显示图案)编写程序,显示下面的图案 Java
  6. 遇到Visual Studio 当前不会命中断点.还没有为该文档加载任何符号的情况
  7. 使用Axios拦截器打印前端请求日志和后端后返回日志
  8. Spring Boot————Spring Data JPA简介
  9. linux整个文件夹下全部文件的属性,C/C++遍历目录下的所有文件(Windows/Linux篇,超详细)...
  10. makefile 文件 (​ http://blog.csdn.net/ruglcc/article/details/7814546/ )
  11. 【报告分享】2020年中国企业直播服务市场研究报告.pdf(附下载链接)
  12. 魔方与科学和计算机表现李世春,科学网—魔方 - 李世春的博文
  13. VS2015彻底卸载干净
  14. 代理内网穿透-Lcx.exe-venom-proxychains
  15. et al和etc区别
  16. java求100以内的素数
  17. 计算机科学相关的期刊,计算机类期刊汇总(核心期刊,国家级期刊)
  18. 一个最简单的自定义锁屏应用实现
  19. 量子计算机五条原则,量子信息科学:量子计算机、隐形传物与人脑量子运算
  20. Vue3生命周期函数的那些事

热门文章

  1. Centos配置yum为阿里源
  2. oracle自动备份
  3. 协方差矩阵有什么意义?
  4. LeetCode简单题之删除排序链表中的重复元素
  5. LeetCode简单题之最小操作次数使数组元素相等
  6. 如何使用TVM Pass Relay
  7. 双圆弧插值算法(三,代码实现)
  8. 自动驾驶系统关系与自动泊车原理
  9. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
  10. [JAVA EE]Spring Boot 控制层:参数传递方法