目录

  • 1. API
  • 2. 核心思想
  • 3. 应用场景
  • 4.Structured Streaming 实战
    • 1) 读取 Socket 数据
    • 2) 读取目录下文本数据
    • 3) 计算操作
    • 4) 输出

在 2.0 之前,Spark Streaming 作为核心 API 的扩展,针对实时数据流,提供了一套可扩展、高吞吐、可容错的流式计算模型。Spark Streaming 会接收实时数据源的数据,并切分成很多小的 batches,然后被 Spark Engine 执行,产出同样由很多小的 batchs 组成的结果流。本质上,这是一种 micro-batch(微批处理)的方式处理,用批的思想去处理流数据.这种设计让Spark Streaming 面对复杂的流式处理场景时捉襟见肘。

spark streaming 这种构建在微批处理上的流计算引擎,比较突出的问题就是处理延时较高(无法优化到秒以下的数量级),以及无法支持基于 event_time 的时间窗口做聚合逻辑。

spark 在 2.0 版本中发布了新的流计算的 API,Structured Streaming/结构化流。

Structured Streaming 是一个基于 Spark SQL 引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于 event_time 的时间窗口的处理逻辑。

随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用 Scala、Java、Python 或 R 中的 DataSet/DataFrame API 来表示流聚合、事件时间窗口、流到批连接等。此外,Structured Streaming 会通过 checkpoint 和预写日志等机制来实现 Exactly-Once 语义。

简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,Structured Streaming 提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑更多细节。

默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端的延迟,最短可达 100 毫秒,并且完全可以保证一次容错。自 Spark 2.3 以来,引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至 1 毫秒的端到端延迟。也就是类似于 Flink 那样的实时流,而不是小批量处理。实际开发可以根据应用程序要求选择处理模式,但是连续处理在使用的时候仍然有很多限制,目前大部分情况还是应该采用小批量模式。

1. API

Spark Streaming 时代 -DStream-RDD

Spark Streaming 采用的数据抽象是 DStream,而本质上就是时间上连续的 RDD,对数据流的操作就是针对 RDD 的操作。

Structured Streaming 时代 - DataSet/DataFrame -RDD

Structured Streaming 是 Spark2.0 新增的可扩展和高容错性的实时计算框架,它构建于 Spark SQL 引擎,把流式计算也统一到 DataFrame/Dataset 里去了。

Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步。

2. 核心思想

Structured Streaming 最核心的思想就是将实时到达的数据看作是一个不断追加的 unbound table 无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中.这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用 SQL 对到来的每一行数据进行实时查询处理。

3. 应用场景

Structured Streaming 将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;

4.Structured Streaming 实战

1) 读取 Socket 数据

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.接收数据val dataDF: DataFrame = spark.readStream.option("host", "node01").option("port", 9999).format("socket").load()//3.处理数据import spark.implicits._val dataDS: Dataset[String] = dataDF.as[String]val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)//result.show()//Queries with streaming sources must be executed with writeStream.start();result.writeStream.format("console")//往控制台写.outputMode("complete")//每次将所有的数据写出.trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快//.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉.start()//开启.awaitTermination()//等待停止}
}

2) 读取目录下文本数据

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/*** {"name":"json","age":23,"hobby":"running"}* {"name":"charles","age":32,"hobby":"basketball"}* {"name":"tom","age":28,"hobby":"football"}* {"name":"lili","age":24,"hobby":"running"}* {"name":"bob","age":20,"hobby":"swimming"}* 统计年龄小于25岁的人群的爱好排行榜*/
object WordCount2 {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val Schema: StructType = new StructType().add("name","string").add("age","integer").add("hobby","string")//2.接收数据import spark.implicits._// Schema must be specified when creating a streaming source DataFrame.val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")//3.处理数据val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)//4.输出结果result.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()}
}

3) 计算操作

获得到 Source 之后的基本数据处理方式和之前学习的 DataFrame、DataSet 一致,不再赘述。

官网示例代码:

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                 // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

4) 输出

计算结果可以选择输出到多种设备并进行如下设定:

1、output mode:以哪种方式将 result table 的数据写入 sink,即是全部输出 complete 还是只输出新增数据;

2、format/output sink 的一些细节:数据格式、位置等。如 console;

3、query name:指定查询的标识。类似 tempview 的名字;

4、trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据;

5、checkpointLocation:一般是 hdfs 上的目录。注意:Socket 不支持数据恢复,如果设置了,第二次启动会报错,Kafka 支持。
output mode:

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

这里有三种输出模型:

1、Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询 select,where,map,flatMap,filter,join 等会支持追加模式。不支持聚合

2、Complete mode:所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

3、Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(自 Spark 2.1.1 起可用),不支持排序

说明:
File sink:输出存储到一个目录中。支持 parquet 文件,以及 append 模式。

writeStream.format("parquet")        // can be "orc", "json", "csv", etc..option("path", "path/to/destination/dir").start()

Kafka sink:将输出存储到 Kafka 中的一个或多个 topics 中。

writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()

Foreach sink:对输出中的记录运行任意计算

writeStream.foreach(...).start()

Console sink:将输出打印到控制台

writeStream.format("console").start()

大数据之Spark:Structured Streaming相关推荐

  1. 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理

    文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...

  2. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  3. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

  4. 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...

  5. 大数据篇--Spark常见面试题总结一

    文章目录 一.Spark 概念.模块 1.相关概念: 2.基本模块: 二.Spark作业提交流程是怎么样的 三.Spark on YARN两种方式的区别以及工作流程 1.Yarn组件简介: 2.Spa ...

  6. Spark Structured Streaming概述

    Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...

  7. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  8. 大数据课程——Spark SQL

    大数据课程--Spark SQL   实验内容以及要求 现有一份汽车销售记录(文件名:Cars.csv),销售记录包括时间.地点.邮政编码.车辆类型等信息,每条记录信息包含39项数据项.按步骤完成如下 ...

  9. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[Spark(概述.快速上手.运行环境.运行架构)] 尚硅谷大数据技术Spark教 ...

最新文章

  1. 线性表List的基本创建
  2. ii 第七单元 访问网络共享文件系统
  3. goaccess在定时任务中不执行的问题
  4. vue组件的基本使用,以及组件之间的基本传值方式
  5. 一个小技巧,让您的ABAP OPEN SQL具有自描述效果
  6. 户外lisp导向牌如何安装_聚焦热点、难点,持续开展户外广告(招牌)专项整治...
  7. RTP/RTCP协议介绍
  8. 算法与数据结构c语言版PPT,C语言算法与数据结构.ppt
  9. 微服务精华问答:什么是微服务架构中的DRY?| 技术头条
  10. 前端笔记-vue cli中v-bind动态数据实时更新
  11. 可视化滤波器fvtool
  12. Markdown中的二级标题去掉默认的下划线
  13. java库存_java实现超市库存管理系统
  14. Javascript技巧大集合(转自http://www.mscto.com/JavaScript/041043806.html)
  15. 如何打造个人IP品牌?_云媒体软文营销
  16. jpa blob mysql_Spring让BLOB 和Clob数据操作变得简单易行
  17. JSP的四大作用域及属性范围
  18. Java数组,集合,列表的使用与区别
  19. c语言程序设计教程61页,谭浩强C语言程序设计课后习题答案所有的程序都有(61页)-原创力文档...
  20. ‘CollectReport‘ object has no attribute ‘description‘

热门文章

  1. (简单搞懂)from abc import ABC,abstractmethod是什么意思
  2. iOS 视图透明度与视图颜色透明度
  3. 在Vue框架下使用Fullcalendar
  4. Hive 观看时长秒数、毫秒数转化为时分秒格式
  5. Workbench云图中怎样使其它零部件半透明(以静态结构分析为例)
  6. java中布局管理器flowlayout_JAVA基础:FlowLayout布局管理器
  7. uni-app小程序引入iconfont的三种方式详解(无需下载文件到项目)
  8. 普乐蛙7d影院设备报价7d动感餐厅设备6d电影体验馆
  9. [解锁新姿势] 分享 7 个优化代码的技巧
  10. McAfee Endpoint Security for Mac(防病毒软件)