Spark Streaming

  • 引入
  • 1. Spark Streaming 是什么
    • 基本思想:
    • Dstream:
    • 时间区间:
    • Dstream Graph:
    • 持久化:
    • 离散化:
    • SparkStreaming与Storm的区别
  • 2. 基本架构
  • 3. 运行流程
  • 4. 运行架构

引入

实时和离线 数据处理 ?
指的是数据处理延迟的长短,
实时数据处理是毫秒级别 !离线处理级别的延迟在小时、天;

流式和批量 处理 ?
数据处理的方式角度;
流式是来一条处理一条,批量是一次积攒一批再做处理;

1. Spark Streaming 是什么

SparkStreaming就是将连续的数据 持久化离散化,然后进行批量处理的框架;

SparkStreaming准实时(秒级别)、微批次的数据处理框架;

  • 为什么是微批次 ?
    如果来一条处理一条,资源浪费;数据量太大,延迟越大,吞吐量小;
    所以SparkStreaming使用微批次,介于【离线和实时之间】!
    微批次使得吞吐量更大

基本思想:

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作DStream

DStream 是随时间推移而收到的数据的序列。

DStream是Spark Streaming特有的数据类型;

Spark Streaming将接收到的实时流数据,按照一定时间间隔对数据进行拆分,交给 Spark Engine引擎进行处理,最终得到一批批的结果。

Dstream:

Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream;

假如外部数据不断涌入,按照一分钟切片,每个一分钟内部的数据是连续的(连续数据流),而一分钟与一分钟的切片却是相互独立的(离散流)。

Spark Streaming将产生高度分离的数据流叫DStream(discretized Stream);
DStream既可以从输入数据源创建得来(如Kafka、Flume),也可以从其他DStream经过一些算子操作得来;

在内部,每个【时间区间】收到的数据都作为 RDD 存在,而 DStream 是由这些RDD 所组成的序列 (因此得名“离散化”);

Dstream可以看做一组RDDs, 即对 RDD 在【实时数据处理场景】的一种封装;

对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream;
在底层,其实其原理为 对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。
底层的RDD的transformation操作 还是由Spark Core的计算引擎来实现的。

任何对DStream的操作都会转变为对底层RDD的操作(通过算子):

Dstream 和 RDD:
DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。

数据流向

receiver task是一直在执行的,一直在接受数据,将【一段时间内】接收到的数据保存到batch中(默认为5秒)那么会将接受来的数据每隔5秒(可设置)封装到一个batch中,batch没有分布式计算的特性,这一个batch的数据又被封装到一个RDD中最终封装到一个DStream中;

如:
假设批处理间隔(batchInterval)为5秒,每隔5秒通过SparkStreaming将得到的一个DStream,在第6秒的时候开始计算这个DStream;
假设执行任务的时间是3秒,那么第6-9秒一边接受数据,一边在计算任务,9~10秒只是在接收数据。然后在第11秒的时候重复上面的操作
如果job的执行的时间 > 批处理间隔,接收到的数据会越积越多,最后可能导致OOM;

Spark与Spark Streaming区别:

Spark -> RDD:transformation action + RDD DAG

Spark Streaming -> Dstream:transformation output(它不能让数据在中间激活,必须保证数据有输入有输出) + DStreamGraph

时间区间:

一个采集数据的周期 ,类似滑动窗口;

一个时间周期内,由数据采集器将数据采集完成后作为RDD序列,封装成DStream;
最终将RDD发给Driver,形成Task:

Dstream Graph:

一系列transformation操作的抽象,Dstream之间的转换所形成的的依赖关系全部保存在DStreamGraph中,DStreamGraph对于后期生成RDD Graph至关重要;

持久化:

即接收到的数据暂存
为什么持久化?做容错的,当数据流出错了,因为没有得到计算,需要把数据从源头进行回溯,暂存的数据可以进行恢复。

离散化:

按时间分片,形成处理单元

SparkStreaming与Storm的区别

  1. Storm是纯实时式处理框架,SparkStreaming是准实时的处理框架(微批次);因为微批处理,SparkStreaming的吞吐量比Storm要高;
  2. Storm的事务机制要比SparkStreaming的要完善;
  3. Storm支持的动态资源调度(Spark1.2及以后也支持)
  4. SparkStreaming擅长复杂的业务处理,Storm不擅长复杂的业务处理,擅长简单的汇总计算;

2. 基本架构

Master:记录Dstream之间的依赖关系或者血缘关系,并负责任务调度以生成新的RDD
Worker(Executor):从网络接收数据,存储并执行RDD计算
Client:负责向Spark Streaming中灌入数据
调度:按照时间触发。

Worker(Executor)里面有个重要的角色:receiver
receiver:接收器,接收不同的数据源,进行针对性的获取,Spark Streaming 也提供了不同的接收器分布在不同的节点上,每个接收器都是一个特定的进程,每个节点接收一部分作为输入。
receiver接受完不马上做计算,先存储到它的内部缓存区
因为Streaming 是按照时间不断的分片,所以需要等待,【一旦定时器到时间了】,缓冲区就会把数据转换成数据块block,(缓冲区的作用:按照用户定义的时间间隔切割),然后把数据块放到一个队列里面去,然后Block manager从队列中把数据块拿出来,把数据块转换成一个spark能处理的数据块。

为什么 receiver 是一个进程?
container -> Executor 是一个JVM进程, 所以receiver 是进程级别的 ;

Spark Streaming 作业提交:

  • Network Input Tracker:跟踪每一个网络received数据,并且将其映射到相应的input Dstream上
  • Job Scheduler:周期性的访问DStreamGraph并生成Spark Job,将其交给JobManager执行
  • Job Manager:获取任务队列,并执行Spark任务

3. 运行流程

以wordcount为例

object StreamWordCount {def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息,至少是2 个线程,一个同于接收数据,一个用于处理数据;val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 StreamingContext,如果是RDD则是 new SparkContext(sparkConf)val ssc = new StreamingContext(sparkConf, Seconds(3))      // 采集周期为3秒//3.通过监控端口创建 DStream,拿到的就是读进来的一行行的数据val datas= ssc.socketTextStream("linux1", 9999)         //  ip+ 端口号//对Dstream中的每一行数据做切分,形成一个个单词val wordStreams = datas.flatMap(_.split(" "))//将单词映射成元组(word,1)val wordAndOneStreams = wordStreams.map((_, 1))//将相同的单词次数做统计val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)//打印wordAndCountStreams.print()        // print方法会打印 // 启动采集器 ssc.start() // 等待采集器的执行ssc.awaitTermination()}
}

注意:

  1. 通过 socketTextStream("linux1", 9999) 创建Dstream
  2. 通过 awaitTermination() 等待计算完成,防止应用退出;

启动程序并使用 netcat 工具发送数据;
输出:

设置 batch interval 采集周期为3秒,则 前5个a在一个周期内,后面1个a在周期外;

宏观流程:

  1. StreamingContext 会在底层创建出SparkContext,用来处理数据
    其构造函数可以指定处理一次数据的间隔(batch interval),这里指定为3秒,即3秒 采集一次数据;
  2. socketTextStream() 创建Dstream, 然后指定Dstream计算的步骤;
  3. 到此时只是设定好了要进行的计算步骤,系统收到数据时计算就会开始。要开始接收数据,必须显式调用StreamingContext 的start() 方法;
  4. 这样,Spark Streaming 就会开始把Spark 作业不断交给下面的SparkContext 去调度执行。执行会在另一个线程中进行,所以需要调用awaitTermination 来等待流计算完成,来防止应用退出。

详细流程
(1)初始化StreamingContext + 输出 DStream

  1. 【在Driver端】,StreamingContext初始化时会创建DStreamGraph、ReceiverTracker、JobGenerator、JobScheduler 等;
    同时,在Spark集群中的某个【Worker节点上】的Executor,启动输入DStream的Receiver接收器 ;

  2. Receiver接收器 负责从外部数据源接收数据
    Receiver接收到数据之后,会启动一个BlockGenerator,其会每隔一段时间(可配置,默认是200ms)将Receiver接收到的数据,打包成一个block
    每个block除了会保存到所运行的Executor关联的BlockManager中之外,还会发送一份block信息如blockId到Driver端的ReceiverTracker上,其会将一个一个的block信息存入一个HashMap中,key就是时间;

  3. JobGenerator会每隔我们定义的batch时间间隔,就会去ReceiverTracker中获取经过这个【batch时间间隔内的】数据信息blocks,将这些block聚合成一个batch,然后这个batch会被创建为一个RDD

    这样每隔一个batch interval时间间隔,就会将这个时间间隔内的数据形成一个RDD,然后形成一个RDD序列,每个RDD代表数据流中一个时间间隔内的数据;
    正是这个RDD序列,形成SparkStreaming应用的输入DStream。

(2)DStream的转化 + 输出
在底层,其实其原理为 对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。

Spark Streaming中作业的生成与Spark核心类似,对DStream进行的各种操作让它们之间构建起依赖关系(DStreamGraph);

【当遇到DStream使用输出操作时】,这些DStream之间的依赖关系以及它们之间的操作会被记录到名为DStreamGraph的对象中,触发一个job
这些job注册到DStreamGraph并不会立即运行,而是等到【Spark Streaming启动后】,【到达批处理时间时】,才根据DSteamGraph生成job处理该批处理时间内接收的数据。
在Spark Streaming如果应用程序中存在多个输出操作,那么在批处理中会产生多个job。

与RDD中的惰性操作类似,RDD遇到行动算子触发job,StreamingContext中遇到输出操作,才执行 job;


常见的Spark Streaming输出:

注意
尽管这些函数看起来像作用在整个Dstream上一样,但事实上每个DStream 在内部是由许多RDD(批次)组成,且这些转化操作是分别应用到每个RDD 上的;

4. 运行架构

总体来说,Spark Streaming是将流式计算分解成一系列短小的批处理作业;

这里的批处理引擎是Spark Core,也就是Spark Streaming将输入数据按照batch interval分成一段一段的数据(Dstream),每一段数据都转换成Spark中的RDD,然后将Spark Streaming中对DStream的Transformation操作变为针对DSteam内各个RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中;

Driver端:
StreamingContext实例。该实例包括DStreamGraphJobScheduler(包括ReceiverTrackerJobGenerator)等;

Client端:
ReceiverSupervisorReceiver等;

大致流程分为:
启动流处理引擎、接收及存储流数据、处理流数据、输出处理结果

1) 初始化
初始化StreamingContext实例,在该对象启动过程中实例化DStreamGraphJobScheduler(ReceiverTracker+JobGenerator)

DStreamGraph用于存放DStream以及DStream之间的依赖关系等信息,
JobScheduler中包括ReceiverTracker和JobGenerator。
其中ReceiverTracker为Driver端流数据接收器(Receiver)的管理者,JobGenerator为批处理作业生成器。

ReceiverTracker启动过程中,根据流数据接收器分发策略通知对应的Executor中的流数据接收管理器(ReceiverSupervisor)启动,再由ReceiverSupervisor启动流数据接收器Receiver

2) 接收及存储流数据
当流数据接收器Receiver启动后,持续不断地接收实时流数据,根据传过来数据的大小进行判断,如果数据量很小,则攒多条数据成一块,然后再进行block块存储;如果数据量大,则直接进行块存储。
对于这些数据Receiver直接交给ReceiverSupervisor,由其进行数据转储操作。
数据存储完毕后,ReceiverSupervisor会把数据存储的元信息上报给ReceiverTracker

3) 处理流数据
在StreamingContext的JobGenerator中维护一个定时器,该定时器在【批处理时间到来时】会进行生成作业的操作。在该操作中进行如下操作:

  1. 通知ReceiverTracker将接收到的数据进行提交,在提交时采用synchronized关键字进行处理,保证每条数据被划入一个且只被划入一个批次中;
  2. 要求DStreamGraph根据DSream依赖关系生成作业序列Seq[Job];
  3. 从第一步中ReceiverTracker获取本批次数据的元数据;
  4. 把批处理时间time、作业序列Seq[Job]和本批次数据的元数据包装为JobSet,调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler,JobScheduler将把这些作业发送给SparkCore进行处理,由于该执行为异步,因此本步执行速度将非常快;
  5. 只要提交结束(不管作业是否被执行),SparkStreaming对整个系统做一个检查点(Checkpoint);

(4)输出处理结果
在Spark核心的作业队数据进行处理,处理完毕后输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用;
由于实时流数据的数据源源不断地流入,Spark会周而复始地进行数据处理,相应地也会持续不断地输出结果。

注意事项

  1. local模式下需要启动至少两个线程,因为只开启了一条线程(这里只有接收数据的线程,却没有处理数据的线程)
    new SparkConf().setMaster("local[2]").setAppName("SimpleDemo");

  2. Durations时间(batch intervals)的设置–接收数据划分批次的时间间隔,即多久触发一次job;
    new StreamingContext(conf, Seconds(1))

  3. 业务逻辑完成后,需要有一个输出操作,将SparkStreaming处理后的数据输出到外部存储系统;

  4. 关于 StreamingContext 的 start()和 stop()
    StreamingContext.start() // Spark Streaming应用启动之后是不能再添加业务逻辑
    StreamingContext.stop() // 无参的stop方法会将SparkContext一同关闭,解决办法:stop(false)
    StreamingContext.stop() // 停止之后是不能在调用start()

  5. DStreams(Discretized Streams–离散的流),应用在每个DStream的算子操作,会应用在DStream内的各个RDD,进而应用在RDD的各个Partition,应用在Partition中的一条条数据,最终应用到每一条记录上

参考:
https://blog.csdn.net/weixin_44735572/article/details/102831434
https://www.cnblogs.com/fishperson/p/10447033.html
https://blog.csdn.net/u012369535/article/details/93042905
https://www.imooc.com/article/268318

Spark Streaming 总结相关推荐

  1. Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)

    https://bigdata.163.com/product/article/5 Apache 流框架 Flink,Spark Streaming,Storm对比分析(一) 转载于:https:// ...

  2. Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver

    [TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...

  3. Spark Streaming实践和优化

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...

  4. 基于大数据的Uber数据实时监控(Part 4:Spark Streaming、DataFrames和HBase)

    导言 根据Gartner的数据,到2020年全球将有208亿个连接产品投入使用.Uber机器学习负责人Danny Lange的目标是将机器学习待到Uber业务的每个角落.连接事物的示例包括联网汽车和设 ...

  5. 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

    导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...

  6. Spark Streaming与Kafka Streaming对比

    ♚ 叙述 对流处理的需求每天都在增加.原因是,处理大量数据通常是不够的. 必须快速处理数据,以便公司能够对不断变化的业务条件作出实时反应. 流处理是对数据进行连续.并行的实时处理. 流式处理是处理数据 ...

  7. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  9. Spark Streaming高级特性在NDCG计算实践

    从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方 ...

  10. Spark Streaming揭秘 Day9 从Receiver的设计到Spark框架的扩展

    Spark Streaming揭秘 Day9 从Receiver的设计到Spark框架的扩展 Receiver是SparkStreaming的输入数据来源,从对Receiver整个生命周期的设计,我们 ...

最新文章

  1. Nginx(二) 配置与调试
  2. 分享一个C#读取计算机信息的类
  3. 浅谈JVM的GC策略
  4. (0064)iOS开发之枚举NS_ENUM和NS_OPTIONS的区别
  5. 网易云IM云服务的稳定原来是这样实现的
  6. practical python and opencv_Practical Python and OpenCV + Case Studies
  7. jenkins maven没有使用全局设置文件地址_Jenkins手把手图文教程「基于Jenkins 2.164.1」...
  8. junit 参数化测试用例_JUnit:在参数化测试中命名单个测试用例
  9. 做柜员还是程序员_应届生放弃互联网大厂回家乡银行:程序员五万比不上柜员五千...
  10. jquery复选框组清空选中的值_jQuery选取所有复选框被选中的值并用Ajax异步提交数据...
  11. 选定内容没有属性页_宣传单页的效果和作用
  12. ES6新特性 - const let var
  13. [转载] 令牌桶算法和漏桶算法python_排序算法(七):Bucket Sort 桶排序
  14. Java项目部署到云服务器最简单的方法
  15. 卢菲菲数字编码表_学技树
  16. springboot集成ureport2
  17. 非线性光纤光学_多模光纤中的非线性光学
  18. 台式计算机键盘快捷键怎么设置,电脑快捷键设置修改 电脑键盘快捷键怎么更改...
  19. java正则表达式 ascii_Java正则表达式
  20. python中倒计时_python实现倒计时的示例

热门文章

  1. 关于MySql插入表情Emoji报错问题解决
  2. php向上取整_PHP直接取整、四舍五入取整、向上取整、向下取整函数介绍
  3. 有机农产品稀缺昂贵-农业大健康·万祥军:靠谱的功能农业
  4. 大数运算(加、减、乘、除)--C++
  5. android标记 pdf,PDF标记篇二:安卓手机怎样在pdf文件中做标记
  6. 【ARKit、RealityKit】把图片转为3D模型
  7. winform无法屏蔽回车事件、KeyPress事件不执行问题解决
  8. 专业实训---滴滴打车
  9. nvm安装和使用问题的解决
  10. CentOS发行版号和RHEL版本对照表