大数据之Spark:Structured Streaming
目录
- 1. API
- 2. 核心思想
- 3. 应用场景
- 4.Structured Streaming 实战
- 1) 读取 Socket 数据
- 2) 读取目录下文本数据
- 3) 计算操作
- 4) 输出
在 2.0 之前,Spark Streaming 作为核心 API 的扩展,针对实时数据流,提供了一套可扩展、高吞吐、可容错的流式计算模型。Spark Streaming 会接收实时数据源的数据,并切分成很多小的 batches,然后被 Spark Engine 执行,产出同样由很多小的 batchs 组成的结果流。本质上,这是一种 micro-batch(微批处理)的方式处理,用批的思想去处理流数据.这种设计让Spark Streaming 面对复杂的流式处理场景时捉襟见肘。
spark streaming 这种构建在微批处理上的流计算引擎,比较突出的问题就是处理延时较高(无法优化到秒以下的数量级),以及无法支持基于 event_time 的时间窗口做聚合逻辑。
spark 在 2.0 版本中发布了新的流计算的 API,Structured Streaming/结构化流。
Structured Streaming 是一个基于 Spark SQL 引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于 event_time 的时间窗口的处理逻辑。
随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用 Scala、Java、Python 或 R 中的 DataSet/DataFrame API 来表示流聚合、事件时间窗口、流到批连接等。此外,Structured Streaming 会通过 checkpoint 和预写日志等机制来实现 Exactly-Once 语义。
简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,Structured Streaming 提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑更多细节。
默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端的延迟,最短可达 100 毫秒,并且完全可以保证一次容错。自 Spark 2.3 以来,引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至 1 毫秒的端到端延迟。也就是类似于 Flink 那样的实时流,而不是小批量处理。实际开发可以根据应用程序要求选择处理模式,但是连续处理在使用的时候仍然有很多限制,目前大部分情况还是应该采用小批量模式。
1. API
Spark Streaming 时代 -DStream-RDD
Spark Streaming 采用的数据抽象是 DStream,而本质上就是时间上连续的 RDD,对数据流的操作就是针对 RDD 的操作。
Structured Streaming 时代 - DataSet/DataFrame -RDD
Structured Streaming 是 Spark2.0 新增的可扩展和高容错性的实时计算框架,它构建于 Spark SQL 引擎,把流式计算也统一到 DataFrame/Dataset 里去了。
Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步。
2. 核心思想
Structured Streaming 最核心的思想就是将实时到达的数据看作是一个不断追加的 unbound table 无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中.这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用 SQL 对到来的每一行数据进行实时查询处理。
3. 应用场景
Structured Streaming 将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;
4.Structured Streaming 实战
1) 读取 Socket 数据
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.接收数据val dataDF: DataFrame = spark.readStream.option("host", "node01").option("port", 9999).format("socket").load()//3.处理数据import spark.implicits._val dataDS: Dataset[String] = dataDF.as[String]val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)//result.show()//Queries with streaming sources must be executed with writeStream.start();result.writeStream.format("console")//往控制台写.outputMode("complete")//每次将所有的数据写出.trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快//.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉.start()//开启.awaitTermination()//等待停止}
}
2) 读取目录下文本数据
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/*** {"name":"json","age":23,"hobby":"running"}* {"name":"charles","age":32,"hobby":"basketball"}* {"name":"tom","age":28,"hobby":"football"}* {"name":"lili","age":24,"hobby":"running"}* {"name":"bob","age":20,"hobby":"swimming"}* 统计年龄小于25岁的人群的爱好排行榜*/
object WordCount2 {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val Schema: StructType = new StructType().add("name","string").add("age","integer").add("hobby","string")//2.接收数据import spark.implicits._// Schema must be specified when creating a streaming source DataFrame.val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")//3.处理数据val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)//4.输出结果result.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()}
}
3) 计算操作
获得到 Source 之后的基本数据处理方式和之前学习的 DataFrame、DataSet 一致,不再赘述。
官网示例代码:
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
4) 输出
计算结果可以选择输出到多种设备并进行如下设定:
1、output mode:以哪种方式将 result table 的数据写入 sink,即是全部输出 complete 还是只输出新增数据;
2、format/output sink 的一些细节:数据格式、位置等。如 console;
3、query name:指定查询的标识。类似 tempview 的名字;
4、trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据;
5、checkpointLocation:一般是 hdfs 上的目录。注意:Socket 不支持数据恢复,如果设置了,第二次启动会报错,Kafka 支持。
output mode:
每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
这里有三种输出模型:
1、Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询 select,where,map,flatMap,filter,join 等会支持追加模式。不支持聚合
2、Complete mode:所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。
3、Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(自 Spark 2.1.1 起可用),不支持排序
说明:
File sink:输出存储到一个目录中。支持 parquet 文件,以及 append 模式。
writeStream.format("parquet") // can be "orc", "json", "csv", etc..option("path", "path/to/destination/dir").start()
Kafka sink:将输出存储到 Kafka 中的一个或多个 topics 中。
writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()
Foreach sink:对输出中的记录运行任意计算
writeStream.foreach(...).start()
Console sink:将输出打印到控制台
writeStream.format("console").start()
大数据之Spark:Structured Streaming相关推荐
- 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理
文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...
- 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】
尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...
- 大数据篇--Spark常见面试题总结一
文章目录 一.Spark 概念.模块 1.相关概念: 2.基本模块: 二.Spark作业提交流程是怎么样的 三.Spark on YARN两种方式的区别以及工作流程 1.Yarn组件简介: 2.Spa ...
- Spark Structured Streaming概述
Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...
- kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V
简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...
- 大数据课程——Spark SQL
大数据课程--Spark SQL 实验内容以及要求 现有一份汽车销售记录(文件名:Cars.csv),销售记录包括时间.地点.邮政编码.车辆类型等信息,每条记录信息包含39项数据项.按步骤完成如下 ...
- 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】
视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...
最新文章
- 线性表List的基本创建
- ii 第七单元 访问网络共享文件系统
- goaccess在定时任务中不执行的问题
- vue组件的基本使用,以及组件之间的基本传值方式
- 一个小技巧,让您的ABAP OPEN SQL具有自描述效果
- 户外lisp导向牌如何安装_聚焦热点、难点,持续开展户外广告(招牌)专项整治...
- RTP/RTCP协议介绍
- 算法与数据结构c语言版PPT,C语言算法与数据结构.ppt
- 微服务精华问答:什么是微服务架构中的DRY?| 技术头条
- 前端笔记-vue cli中v-bind动态数据实时更新
- 可视化滤波器fvtool
- Markdown中的二级标题去掉默认的下划线
- java库存_java实现超市库存管理系统
- Javascript技巧大集合(转自http://www.mscto.com/JavaScript/041043806.html)
- 如何打造个人IP品牌?_云媒体软文营销
- jpa blob mysql_Spring让BLOB 和Clob数据操作变得简单易行
- JSP的四大作用域及属性范围
- Java数组,集合,列表的使用与区别
- c语言程序设计教程61页,谭浩强C语言程序设计课后习题答案所有的程序都有(61页)-原创力文档...
- ‘CollectReport‘ object has no attribute ‘description‘
热门文章
- (简单搞懂)from abc import ABC,abstractmethod是什么意思
- iOS 视图透明度与视图颜色透明度
- 在Vue框架下使用Fullcalendar
- Hive 观看时长秒数、毫秒数转化为时分秒格式
- Workbench云图中怎样使其它零部件半透明(以静态结构分析为例)
- java中布局管理器flowlayout_JAVA基础:FlowLayout布局管理器
- uni-app小程序引入iconfont的三种方式详解(无需下载文件到项目)
- 普乐蛙7d影院设备报价7d动感餐厅设备6d电影体验馆
- [解锁新姿势] 分享 7 个优化代码的技巧
- McAfee Endpoint Security for Mac(防病毒软件)