执行流程

数据的接收

StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor。

实例化之后,首先,要指定一个接收数据的方式,如

val lines = ssc.socketTextStream("localhost", 9999)

  • 1

    这样从socket接收文本数据。这个步骤返回的是一个ReceiverInputDStream的实现,内含Receiver,可接收数据并转化为RDD放内存里。

    ReceiverInputDStream有一个需要子类实现的方法

    def getReceiver(): Receiver[T]

  • 1

    子类实现这个方法,worker节点调用后能得到Receiver,使得数据接收的工作能分布到worker上。

    如果是local跑,由于Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于Receiver数目,才能腾出cpu做计算任务的调度。

    Receiver需要子类实现

    def onStart()def onStop()

  • 1
  • 2

    来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。

    Receiver提供了一系列store()接口,如store(ByteBuffer),store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的ReceiverSupervisor来完成这些存储功能。ReceiverSupervisor还会对Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。

    ReceiverSupervisor的存储接口的实现,借助的是BlockManager,数据会以RDD的形式被存放,根据StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。

    ReceiverSupervisor在做putBlock操作的时候,会首先借助BlockManager存好数据,然后往ReceiverTracker发送一个AddBlock的消息。ReceiverTracker内部的ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即BlockInfo,所以AddBlock会把信息存放在ReceivedBlockTracker里。未来需要计算的时候,ReceiverTracker根据streamId,从ReceivedBlockTracker取出对应的block列表。

    RateLimiter帮助控制Receiver速度,spark.streaming.receiver.maxRate参数。

    数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。

    任务调度

    JobScheduler在context里初始化。当context start的时候,触发scheduler的start。

    scheduler的start触发了ReceiverTracker和JobGenerator的start。这两个类是任务调度的重点。前者在worker上启动Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。

    JobScheduler内含一个线程池,用于调度任务执行。spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job。

    job来自JobGenerator生成的JobSet。JobGenerator根据时间,生成job并且执行cp。

    JobGenerator的生成job逻辑:

    - 调用ReceiverTracker的allocateBlocksToBatch方法,为本批数据分配好block,即准备好数据

    - 间接调用DStream的generateJob(time)方法,制造可执行的RDD

    DStream切分RDD和生成可执行的RDD,即getOrCompute(time):

    - 如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步

    - 如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切

    - 调用DStream的子类的compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表

    - 对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDD的checkpoint方法,制定好cp策略

    - 得到这些RDD后,调用SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成Job类。未来会在executor上触发其runJob

    JobGenerator成功生成job后,调用JobScheduler.submitJobSet(JobSet),JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后,JobGenerator发送一个DoCheckpoint的消息,注意这里的cp是driver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的CheckpointWriter类会完成write(streamingContext, time)。

    JobScheduler提交job的线程里,触发了job的run()方法,同时,job跑完后,JobScheduler处理JobCompleted(job)。如果job跑成功了,调用JobSet的handleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用JobGenerator的onBatchCompletion(time)方法,JobGenerator接着会做clearMetadata的工作,然后JobScheduler打印输出;如果job跑失败了,JobScheduler汇报error,最后会在context里抛异常。

    更多说明

    特殊操作

  • transform:可以与外部RDD交互,比如做维表的join
  • updateStateByKey:生成StateDStream,比如做增量计算。WordCount例子
  • 每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDD做cogroup过程有些开销:RDD[K, V]和RDD[K, U]合成RDD[K, List[V], List[U]],List[U]一般size是1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]。
  • 批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
  • 增量RDD的分区数可以开大,即这步增量的计算可以调大并发
  • window:batch size,window length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。
  • foreachRDD: 这个操作是一个输出操作,比较特殊。

    /**

    * Apply a function to each RDD in this DStream. This is an output operator, so

    * 'this' DStream will be registered as an output stream and therefore materialized.

    */

    def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

    DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQL与Streaming的集合,如:结合Spark SQL、DataFrame做Wordcount。

    Cache

    如果是window操作,默认接收的数据都persist在内存里。

    如果是flume, kafka源头,默认接收的数据replicate成两份存起来。

    Checkpoint

    与state有关的流计算,计算出来的结果RDD,会被cp到HDFS上,原文如下:

    Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

    cp的时间间隔也可以设定,可以多批做一次cp。

    cp的操作是同步的。

    简单的不带state操作的流任务,可以不开启cp。

    driver端的metadata也有cp策略。driver cp的时候是将整个StreamingContext对象写到了可靠存储里

转载于:https://www.cnblogs.com/breg/p/4794780.html

Spark Streaming原理简析相关推荐

  1. Webpack模块化原理简析

    webpack模块化原理简析 1.webpack的核心原理 一切皆模块:在webpack中,css,html.js,静态资源文件等都可以视作模块:便于管理,利于重复利用: 按需加载:进行代码分割,实现 ...

  2. Android Handler与Looper原理简析

    一直感觉自己简直就是一个弱智,最近越来越感觉是这样了,真的希望自己有一天能够认同自己,认同自己. 本文转载于:https://juejin.im/post/59083d7fda2f60005d14ef ...

  3. grpc通信原理_gRPC原理简析

    gRPC原理简析 gRPC是由谷歌提出并开发的RPC协议,gRPC提供了一套机制,使得应用程序之间可以进行通信. 降级开发者的使用门槛,屏蔽网络协议,调用对端的接口就像是调用本地的函数一样.而gRPC ...

  4. .Spark Streaming(上)--实时流计算Spark Streaming原理介

    Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/474 ...

  5. Android V1及V2签名原理简析

    Android为了保证系统及应用的安全性,在安装APK的时候需要校验包的完整性,同时,对于覆盖安装的场景还要校验新旧是否匹配,这两者都是通过Android签名机制来进行保证的,本文就简单看下Andro ...

  6. CRC原理简析——史上最清新脱俗简单易懂的CRC解析

    CRC原理简析 1. CRC校验原理 CRC校验原理根本思想就是先在要发送的帧后面附加一个数(这个就是用来校验的校验码,但要注意,这里的数也是二进制序列的,下同),生成一个新帧发送给接收端.当然,这个 ...

  7. Java的定时器Timer和定时任务TimerTask应用以及原理简析

    记录:272 场景:Java JDK自带的定时器Timer和定时任务TimerTask应用以及原理简析.在JDK工具包:java.util中可以找到源码,即java.util.Timer和java.u ...

  8. 转子接地保护原理_发变组转子接地保护原理简析

    发变组转子接地保护原理简析 发电机转子接地故障是常见的故障之一, 发生一点接地, 对发电机本身并不直接构成危 害,此时可通过转移负荷,平稳停机后,再查故障点:若在此基础上又发生另外一点接地, 将会严重 ...

  9. Mysql锁机制及原理简析

    Mysql锁机制及原理简析 一.前言 1.什么是锁? 锁是计算机协调多个进程或线程并发访问某一资源的机制. 锁保证数据并发访问的一致性.有效性: 锁冲突也是影响数据库并发访问性能的一个重要因素. 锁是 ...

最新文章

  1. SM04在线用户管理
  2. MySQL远程连接丢失问题解决方法(Lost connection to MySQL server)
  3. PAT_B_1057_Java(20分)
  4. 工厂电子产品工艺文件_建智能工厂,人机如何达到最佳组合?
  5. PHP企业网站源码-稻草人PHP系统源码v1.0.3
  6. 三星Galaxy Note 10系列价格曝光:顶配售价要破万
  7. PHP 7 的五大新特性
  8. Gstreamer之gst_omx_video_enc_set_format(二十七)
  9. 备战数学建模12-模糊综合评价模型
  10. 【内网安全】基础知识:工作组、域和权限分配
  11. 苹果6plus几核处理器_iOS 13.4 Beta3发布:苹果在布局,越狱软件也更新!
  12. java医疗报销_医疗保险报销流程图(修改后)
  13. tdk怎么设置_SEO优化建议:如何正确设置TDK
  14. zoho邮箱收费和免费区别_集成MS Office和您的Zoho在线帐户
  15. 计算机三种校验方式,三种校验码
  16. Oracle spatial 空间修正函数(SDO_UTIL.RECTIFY_GEOMETRY)
  17. 万达电商CEO董策离职
  18. 主板上集成显卡的计算机在进行显示工作,如何在主板集成显卡和独立显卡之间切换?计算机技术...
  19. Aop 自动装配Autowired时,不装配接口而是实现类而报错 切面配置 satisfiedDependencyException|BeanNotOfRequiredTypeException
  20. 【Android安全】vdex、odex文件

热门文章

  1. 版权监控中心怎么关闭_防火门监控系统让消防通道疏散更安全
  2. 传递对象_看懂Xlua实现原理——从宏观到微观(1)传递c#对象到Lua
  3. python下载邮箱附件_基于Python3 下载邮箱附件,并解压到指定文件夹
  4. python中封装一个枚举_JavaScript可扩展枚举封装
  5. html5 js选择器,使用HTML5的JS选择器操作页面中的元素
  6. 哈工大威海c语言实验报告 第八章 无法运行程序,哈工大威海c语言实验报告.doc...
  7. java methodtype_java基于MethodHandle调用方法
  8. 查询mysql视图_MySQL数据库简介及常用命令
  9. u9系统的使用方法仓库_新风系统如何使用 新风系统使用方法介绍【图文】
  10. for ie无效 in js_关于js中for in的缺陷浅析