Apache Spark在2016年的时候启动了Structured Streaming项目,一个基于Spark SQL的全新流计算引擎Structured Streaming,让用户像编写批处理程序一样简单地编写高性能的流处理程序。

Structured Streaming并不是对Spark Streaming的简单改进,而是吸取了在开发Spark SQL和Spark Streaming过程中的经验教训,以及Spark社区和Databricks众多客户的反馈,重新开发的全新流式引擎,致力于为批处理和流处理提供统一的高性能API。同时,在这个新的引擎中,也很容易实现之前在Spark Streaming中很难实现的一些功能,比如Event Time(事件时间)的支持,Stream-Stream Join(2.3.0 新增的功能),毫秒级延迟(2.3.0 即将加入的 Continuous Processing)。

Structured Streaming概述

Spark Streaming是Apache Spark早期基于RDD开发的流式系统,用户使用DStream API来编写代码,支持高吞吐和良好的容错。其背后的主要模型是Micro Batch(微批处理),也就是将数据流切成等时间间隔(BatchInterval)的小批量任务来执行。

Structured Streaming则是在Spark 2.0加入的,经过重新设计的全新流式引擎。它的模型十分简洁,易于理解。一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾,用户可以使用Dataset/DataFrame 或者 SQL 来对这个动态数据源进行实时查询。

文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html

Spark Streaming 不足

Spark Streaming 会接收实时数据源的数据,并切分成很多小的batches,然后被Spark Engine执行,产出同样由很多小的batchs组成的结果流

本质上,这是一种micro-batch(微批处理)的方式处理,用批的思想去处理流数据。这种设计让Spark Streaming面对复杂的流式处理场景时捉襟见肘。

Spark Streaming 存在哪些不足,总结一下主要有下面几点:

1:使用 Processing Time 而不是 Event Time

Processing Time 是数据到达 Spark 被处理的时间,而 Event Time 是数据自带的属性,一般表示数据产生于数据源的时间。

比如 IoT 中,传感器在 12:00:00 产生一条数据,然后在 12:00:05 数据传送到 Spark,那么 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。

Spark Streaming是基于DStream模型的micro-batch模式,简单来说就是将一个微小时间段(比如说 1s)的流数据当前批数据来处理。如果要统计某个时间段的一些数据统计,毫无疑问应该使用 Event Time,但是因为 Spark Streaming 的数据切割是基于Processing Time,这样就导致使用 Event Time 特别的困难。

2:Complex, low-level api

DStream(Spark Streaming 的数据模型)提供的API类似RDD的API,非常的low level;

当编写Spark Streaming程序的时候,本质上就是要去构造RDD的DAG执行图,然后通过Spark Engine运行。这样导致一个问题是,DAG 可能会因为开发者的水平参差不齐而导致执行效率上的天壤之别;

3:reason about end-to-end application

end-to-end指的是直接input到out,如Kafka接入Spark Streaming然后再导出到HDFS中;

DStream 只能保证自己的一致性语义是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 输出到外部存储的语义往往需要用户自己来保证;

4:批流代码不统一

尽管批流本是两套系统,但是这两套系统统一起来确实很有必要,有时候确实需要将的流处理逻辑运行到批数据上面;

Streaming尽管是对RDD的封装,但是要将DStream代码完全转换成RDD还是有一点工作量的,更何况现在Spark的批处理都用DataSet/DataFrameAPI;

总结

流式计算一直没有一套标准化、能应对各种场景的模型,直到2015年Google发表了The Dataflow Model的论文( https://yq.aliyun.com/articles/73255 )

Google开源Apache Beam项目,基本上就是对Dataflow模型的实现,目前已经成为Apache的顶级项目,但是在国内使用不多。

国内使用的更多的是Apache Flink,因为阿里大力推广Flink,甚至把花7亿元把Flink母公司收购。

使用Yahoo的流基准平台,要求系统读取广告点击事件,并按照活动ID加入到一个广告活动的静态表中,并在10秒的event-time窗口中输出活动计数

比较了Kafka Streams 0.10.2、Apache Flink 1.2.1和Spark 2.3.0,在一个拥有5个c3.2*2大型Amazon EC2 工作节点和一个master节点的集群上(硬件条件为8个虚拟核心和15GB的内存)。

上图(a)展示了每个系统最大稳定吞吐量(积压前的吞吐量),Flink可以达到3300万,而Structured Streaming可以达到6500万,近乎两倍于Flink。这个性能完全来自于Spark SQL的内置执行优化,包括将数据存储在紧凑的二进制文件格式以及代码生成。

附录:【Streaming System系统】设计文章:

Streaming System 第一章【Streaming 101】

网址:https://blog.csdn.net/xxscj/article/details/84990301

Streaming System 第二章【The What- Where- When- and How of Data Processing】

网址:https://blog.csdn.net/xxscj/article/details/84989879

Structured Streaming 介绍

或许是对Dataflow模型的借鉴,也许是英雄所见略同,Spark在2.0版本中发布了新的流计算的API:Structured Streaming结构化流。Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。

Structured Streaming统一了流、批的编程模型,可以使用静态数据批处理一样的方式来编写流式计算操作,并且支持基于event_time的时间窗口的处理逻辑。随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。

模块介绍

Structured Streaming 在 Spark 2.0 版本于 2016 年引入,设计思想参考很多其他系统的思想,比如区分 processing time 和 event time,使用 relational 执行引擎提高性能等。同时也考虑了和 Spark 其他组件更好的集成。

Structured Streaming 和其他系统的显著区别主要如下:

1:Incremental query model(增量查询模型)

Structured Streaming 将会在新增的流式数据上不断执行增量查询,同时代码的写法和批处理 API(基于Dataframe和Dataset API)完全一样,而且这些API非常的简单。

2:Support for end-to-end application(支持端到端应用)

Structured Streaming 和内置的 connector 使的 end-to-end 程序写起来非常的简单,而且 "correct by default"。数据源和sink满足 "exactly-once" 语义,这样我们就可以在此基础上更好地和外部系统集成。

 3:复用 Spark SQL 执行引擎

Spark SQL 执行引擎做了非常多的优化工作,比如执行计划优化、codegen、内存管理等。这也是Structured Streaming取得高性能和高吞吐的一个原因。

​​​​​​​核心设计

2016年,Spark在2.0版本中推出了结构化流处理的模块Structured Streaming,核心设计如下:

 1:Input and Output(输入和输出)

Structured Streaming 内置了很多 connector 来保证 input 数据源和 output sink 保证 exactly-once 语义。

实现 exactly-once 语义的前提:

Input 数据源必须是可以replay的,比如Kafka,这样节点crash的时候就可以重新读取input数据,常见的数据源包括 Amazon Kinesis, Apache Kafka 和文件系统。

Output sink 必须要支持写入是幂等的,这个很好理解,如果 output 不支持幂等写入,那么一致性语义就是 at-least-once 了。另外对于某些 sink, Structured Streaming 还提供了原子写入来保证 exactly-once 语义。

补充:幂等性:在HTTP/1.1中对幂等性的定义:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。幂等性是系统服务对外一种承诺(而不是实现),承诺只要调用接口成功,外部多次调用对系统的影响是一致的。声明为幂等的服务会认为外部调用失败是常态,并且失败之后必然会有重试。

2:Program API(编程 API)

Structured Streaming 代码编写完全复用 Spark SQL 的 batch API,也就是对一个或者多个 stream 或者 table 进行 query。

query 的结果是 result table,可以以多种不同的模式(追加:append, 更新:update, 完全:complete)输出到外部存储中。

另外,Structured Streaming 还提供了一些 Streaming 处理特有的 API:Trigger, watermark, stateful operator。

3:Execution Engine(执行引擎)

复用 Spark SQL 的执行引擎;

Structured Streaming 默认使用类似 Spark Streaming 的 micro-batch 模式,有很多好处,比如动态负载均衡、再扩展、错误恢复以及 straggler (straggler 指的是哪些执行明显慢于其他 task 的 task)重试;

提供了基于传统的 long-running operator 的 continuous(持续) 处理模式;

4:Operational Features(操作特性)

利用 wal 和状态State存储,开发者可以做到集中形式的 rollback 和错误恢复FailOver。

​​​​​​​编程模型

Structured Streaming将流式数据当成一个不断增长的table,然后使用和批处理同一套API,都是基于DataSet/DataFrame的。如下图所示,通过将流式数据理解成一张不断增长的表,从而就可以像操作批的静态数据一样来操作流数据了。

在这个模型中,主要存在下面几个组成部分:

1:Input Table(Unbounded Table),流式数据的抽象表示,没有限制边界的,表的数据源源不断增加;

2:Query(查询),对 Input Table 的增量式查询,只要Input Table中有数据,立即(默认情况)执行查询分析操作,然后进行输出(类似SparkStreaming中微批处理);

3:Result Table,Query 产生的结果表;

4:Output,Result Table 的输出,依据设置的输出模式OutputMode输出结果;

核心思想

Structured Streaming最核心的思想就是将实时到达的数据看作是一个不断追加的unbound table无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中,用静态结构化数据的批处理查询方式进行流计算。

以词频统计WordCount案例,Structured Streaming实时处理数据的示意图如下,各行含义:

第一行、表示从TCP Socket不断接收数据,使用【nc -lk 9999】;

第二行、表示时间轴,每隔1秒进行一次数据处理;

第三行、可以看成是“input unbound table",当有新数据到达时追加到表中;

第四行、最终的wordCounts是结果表,新数据到达后触发查询Query,输出的结果;

第五行、当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;该示例设置为Complete Mode,因此每次都将所有数据输出到控制台;

上图中数据实时处理说明:

第一、在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此可以得到第1秒时的结果集cat=1 dog=3,并输出到控制台;

第二、当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;

第三、当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和"owl",执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;

使用Structured Streaming处理实时数据时,会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新Result Table。

2021年大数据Spark(四十四):Structured Streaming概述相关推荐

  1. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  2. 2021年大数据HBase(十四):HBase的原理及其相关的工作机制

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的原理及其相关的工作机制 一.HBase的flus ...

  3. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

  4. 2021年大数据Spark(一):框架概述

    目录 Spark框架概述 Spark 是什么 分布式内存迭代计算框架 官方定义: Spark框架概述 Spark 是加州大学伯克利分校AMP实验室(Algorithms Machines and Pe ...

  5. 2021年大数据Flink(十四):流批一体API Connectors JDBC

    目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector 代码演示 package ...

  6. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  7. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  8. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  9. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  10. 2021年大数据Spark(十六):Spark Core的RDD算子练习

    目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first.take.top 算子 ​​​ ...

最新文章

  1. BZOJ4539: [Hnoi2016]树
  2. 夺命雷公狗---node.js---3commonJs 与 nodeJs的简介
  3. python的django框架与springboot_Python系统教学|为什么Django框架在Python开发很重要?...
  4. Go语言实现FastDFS分布式存储系统WebAPI网关
  5. cad多段线画圆弧方向_CAD箭头怎么画
  6. 《LINUX与UNIX SHELL编程指南》学习笔记
  7. 第五章应用系统安全基础备考要点及真题分布
  8. java View转换类型_java强制类型转换.
  9. abaqus利用python实现部件合并_python脚本实现abaqus前处理2D多晶粒建模(附完整源码)-Voronoi多边形的生成...
  10. \t\t使用Google APP Engine 完成个人代理服务器架设
  11. 19-离线词典生成原理、图像描述子用BoW转化为BoW向量和FeatureVe
  12. 大数据学习之大数据概述
  13. 推导抛物线插值的拉格朗日插值公式
  14. Unity学习笔记–无限地图
  15. 如何卸载CAD 2020 ?怎么把AutoCAD 2020彻底卸载删除干净重新安装的方法【转载】
  16. [转]ASP.NET 安全认证(三): 用Form 表单认证实现单点登录
  17. 如何避免PayPal、Fb、谷歌账户被封,又如何解封?
  18. 浏览器缓存之http缓存和service worker
  19. 装箱与拆箱(TDB)
  20. 为什么选择高防DNS云解析?

热门文章

  1. Struts2 Cannot create a session after the response has been committed 一个不起眼的错误
  2. 语义网所谓的“本体”的具体例子是什么?人工智能
  3. UTF-8与UTF-8 BOM
  4. 全文翻译(三) TVM An Automated End-to-End Optimizing Compiler
  5. ONNX 实时graph优化方法
  6. 每秒能捕捉万亿帧的相机
  7. 自动驾驶关键技术分解和流程
  8. 客快物流大数据项目(五十六): 编写SparkSession对象工具类
  9. DCN-2655 同异步端口PPP (chap)认证
  10. Windows 系统执行Shell 脚本的方法