spark streaming 不同于sotm,是一种准实时处理系统。storm 中,把批处理看错是时间教程的实时处理。而在spark streaming中,则反过来,把实时处理看作为时间极小的批处理。

1、三个时间参数

spark streaming 中有三个关于时间的参数,分别如下:

窗口时间windowDuration​:当前窗口要统计多长时间的数据,是批量时间的整数倍

滑动时间slideDuration​:要多长时间更新一次结果,是批量时间的整数倍

批量时间batchDuration​:多长时间创建一个批次,与实际业务无关,只与数据量有关,数据量大则可以设置短一些,数据量小则设置长一些,但必须小于其他两个时间,

2、该怎么设置?

为方便理解,就拿咱们最常见的日启、日活、周启、周活作为示例

注:1、实际中日启、日活、周启、周活更多是用批处理,此处只是拿来方便大家理解

2、此处不是严格意义上的日启、周启。此处的日:最近24小时,周:最近7天

案例1:每隔一小时,统计产品的日启、日活,

窗口时间:1日,滑动时间:1小时,批量时间:1小时、半小时、15分钟、10分钟、5分钟、2分钟均可,视数据量大小而定

案例2:每天统计最近七天累计启动、活跃

窗口时间:7日,滑动时间:1日 批量时间:一小时、半小时、10分钟、5分钟

3、实战

为了理解上边参数是怎么设置的,我们对假定现在有个需求,需要对输入的字母进行计数。

使用nc -lk 9999 模拟生产者,发送数据,streaming 通过socket接收数据

实战1:每10秒统计当前输入的字符

适用:彻底非累加业务

  import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}import org.apache.spark.SparkConfval sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")//10秒创建一个批次val ssc = new StreamingContext(sparkConf, Seconds(10))val lines = ssc.socketTextStream("localhost", 9999)val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))val current_stream = wordCounts.reduceByKey(_ + _)current_stream.print()current_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_current.")ssc.start()ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:我们当前仅输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (c,1) (d,1)】 原因:我们当前输入了c、d

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 】 原因:当前我们没有输入, 所以没有任何字符可以统计

实战2、每10秒统计历史所有输入的字符。

适用范围:计算历史(包含窗口之外)累计数据,经常用于统计“总装机量”之类

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.SparkConf

val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")//10秒创建一个批次val ssc = new StreamingContext(sparkConf, Seconds(10))
//累加所有经过的数据val updateFunc = (values: Seq[Long], state: Option[Long]) => {val currentCount = values.foldLeft(0l)(_ + _)val previousCount = state.getOrElse(0l)Some(currentCount + previousCount)}ssc.checkpoint("socket_wordcount_history")val lines = ssc.socketTextStream("localhost", 9999)val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))val history_stream = wordCounts.updateStateByKey[Long](updateFunc)//合并当前数据和历史数据history_stream.print()history_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_history.")ssc.start()ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:我们当前输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:我们当前输入了c、d,历史输入过 a、b

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:当前我们没有输入,但是,历史曾经输入过a、b、c、d

输入字符操作4、这时,仍不需要操作,等待30秒,在spark-shell中确认第四次计算完成后,查看新产生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】原因: 当前我们没有输入,但是,历史曾经输入过a、b、c、d

之后,即使没有输入abcd,统计结果仍包含abcd这四个字符各1次

实战3、每隔30秒,统计最近1分钟输入的字母。窗口内历史累加

(适用范围:非累加业务,这里的累加指的是超出window范围)

sc.stopimport org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}import org.apache.spark.SparkConfval updateFunc = (values: Seq[Long], state: Option[Long]) => {
val currentCount = values.foldLeft(0l)(_ + _)
val previousCount = state.getOrElse(0l)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))   //10秒创建一个批次
ssc.checkpoint("socket-kafka-wordcount_recent")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
val stateDstream = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), Seconds(30))  //每30秒算一次,数据范围为最近一分钟内收到的数据  另外,使用window时,需要设置checkpointstateDstream.print()
stateDstream.repartition(1).saveAsTextFiles("/data/socket-streaming-wordcount.log")ssc.start()
ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:最近1分钟,我们只输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:最近1分钟,我们只输入了a、b、c、d

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 (a,0) (b,0) (c,1) (d,1)】 原因:最近1分钟,我们只输入了c、d ,1分钟之前输入的a、b将不再在统计范围之内

输入字符操作4、这时,仍不需要操作,等待30秒,在spark-shell中确认第四次计算完成后,查看新产生文件 期望 【 (a,0) (b,0) (c,0) (d,0)】原因:最近1分钟,我们没有任何输入,1 分钟之前输入的a、b、c、d将不再在统计范围之内

转载于:https://www.cnblogs.com/piaolingzxh/p/5468780.html

spark streaming之 windowDuration、slideDuration、batchDuration​相关推荐

  1. Spark Streaming的Word Count

    Spark Streaming的Word Count 需求&准备 图解 首先在linux服务器上安装nc工具 nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 ...

  2. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  3. Spark学习之Spark Streaming

    一.简介 许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用,还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它 ...

  4. 使用Spark Streaming SQL基于时间窗口进行数据统计

    1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理,常用于检测.监控.根据时间进行统计等系统中.比如埋点日志中每条日志记录了埋点处操作的时间,或者业务系统中记录了用户操作时间,用于统计各种操 ...

  5. Spark Streaming源码分析 – DStream

    A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence o ...

  6. 通过Spark Streaming的window操作实战模拟热点搜索词案例实战

    本博文主要内容包括: 1.在线热点搜索词实现解析 2.SparkStreaming 利用reduceByKeyAndWindow实现在线热点搜索词实战 一:在线热点搜索词实现解析 背景描述:在社交网络 ...

  7. 【Spark分布式内存计算框架——Spark Streaming】11. 应用案例:百度搜索风云榜(下)实时窗口统计

    5.5 实时窗口统计 SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档: http://spark.apache.org/docs/2.4.5/streaming-pro ...

  8. Spark Streaming Backpressure分析

    转载自:http://www.cnblogs.com/barrenlake/p/5349949.html# 1.为什么引入Backpressure 默认情况下,Spark Streaming通过Rec ...

  9. sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制

    Spark Streaming 反压机制是1.5版本推出的特性,用来解决处理速度比摄入速度慢的情况,简单来讲就是做流量控制.当批处理时间(Batch Processing Time)大于批次间隔(Ba ...

  10. 通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

    本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的 ...

最新文章

  1. 100G内存下,MySQL查询200G大表会OOM么?
  2. 混合云扛起云存储领军大旗
  3. 截图截取各种右键菜单的方法
  4. zip在python中的用法_Python中zip()函数用法实例教程
  5. c primer plus--运算符、表达式和语句(第5章)--习题
  6. 大数据平台的搭建思路是怎样的
  7. eclipse订制快捷键
  8. 医院耗材管理系统开发_15
  9. 尚硅谷大数据课程flink1.13代码实现与笔记记录
  10. 视频教程-产品原型图设计Axure教程-Axure
  11. 如何修复word文档损坏的?
  12. 《东周列国志》第三回 犬戎主大闹镐京 周平王东迁洛邑
  13. nagios 总结_caci 与 nagios 一些总结 【一】
  14. 凌动z3735f运行64位linux,iwork8平板电脑安装ubuntu,Z3735d/f系列CPU通用
  15. 使用Pytorch框架
  16. CSS系列之修改滚动条的样式
  17. 北理工通报方岱宁院士处理结果
  18. python里import as什么意思_import as和 from import 区别
  19. fastadmin 使用switch 点击修改无反应 提示“未更新任何行”
  20. 短视频开发要注意哪些问题?

热门文章

  1. docker stats 监控资源使用情况
  2. 删除win10linux系统,在装了win10和Ubuntu双系统的电脑里删除win10
  3. javaIO流-IO基础知识指南
  4. 李炎恢老师的php源码以及附带一个php手册
  5. java 序列化,流,二进制的区别和联系
  6. window 下 go lang 环境变量一键批处理设置
  7. ubuntu 安装mysql 5.5.28 编译安装 innodb 配置
  8. LayaAir textInput 单行输入多行输入
  9. idea快捷键自动生成序列化id
  10. 阶段3 3.SpringMVC·_03.SpringMVC常用注解_6 CookieValue注解