Spark Structured Steaming

Spark Structured Streaming 简介

什么是 Spark Structured Streaming

Structured Streaming (结构化流)是一种基于 Spark SQL 引擎构建的可扩展且容错的 stream processing engine (流处理引擎)。您可以以静态数据表示批量计算的方式来表达 streaming computation (流式计算)。 Spark SQL 引擎将随着 streaming data 持续到达而增量地持续地运行,并更新最终结果。您可以使用 Scala , Java , Python 或 R 中的 Dataset/DataFrame API 来表示 streaming aggregations (流聚合), event-time windows (事件时间窗口), stream-to-batch joins (流到批处理连接) 等。在同一个 optimized Spark SQL engine (优化的 Spark SQL 引擎)上执行计算。最后,系统通过 checkpointing (检查点) 和 Write Ahead Logs (预写日志)来确保 end-to-end exactly-once (端到端的完全一次性) 容错保证。简而言之,Structured Streaming 提供快速,可扩展,容错,end-to-end exactly-once stream processing (端到端的完全一次性流处理),而无需用户理解 streaming 。

Structured Streaming 编程模型

Structured Streaming 的关键思想是将 live data stream (实时数据流)视为一种正在不断 appended (附加)的表。这形成了一个与 batch processing model (批处理模型)非常相似的新的 stream processing model (流处理模型)。您会将您的 streaming computation (流式计算)表示为在一个静态表上的 standard batch-like query (标准类批次查询),并且 Spark 在 unbounded(无界) 输入表上运行它作为 incremental(增量) 查询。让我们更加详细地了解这个模型。

基本概念

将 input data stream (输入数据流) 视为 “Input Table”(输入表)。每个在 stream 上到达的 data item (数据项)就像是一个被 appended 到 Input Table 的新的 row 。

  • Stream as a Table

对输入的查询将生成 “Result Table” (结果表)。每个 trigger interval (触发间隔)(例如,每 1 秒),新 row (行)将附加到 Input Table ,最终更新 Result Table 。无论何时更新 result table ,我们都希望将 changed result rows (更改的结果行)写入 external sink (外部接收器)。

  • Model

“Output(输出)” 被定义为写入 external storage (外部存储器)的内容。可以以不同的模式定义 output :

Complete Mode(完全模式) - 整个更新的 Result Table 将被写入外部存储。由 storage connector (存储连接器)决定如何处理整个表的写入。

Append Mode(附加模式) - 只有 Result Table 中自上次触发后附加的新 rows(行) 将被写入 external storage (外部存储)。这仅适用于不期望更改 Result Table 中现有行的查询。

Update Mode(更新模式) - 只有自上次触发后 Result Table 中更新的 rows (行)将被写入 external storage (外部存储)(从 Spark 2.1.1 之后可用)。请注意,这与 Complete Mode (完全模式),因为此模式仅输出自上次触发以来更改的 rows (行)。如果查询不包含 aggregations (聚合),它将等同于 Append mode 。

为了说明这个模型的使用,我们来了解一下上面章节的 快速示例 。第一个 lines DataFrame 是 input table ,并且最后的 wordCounts DataFrame 是 result table 。请注意,streaming lines DataFrame 上的查询生成 wordCounts 是 exactly the same(完全一样的) 因为它将是一个 static DataFrame (静态 DataFrame )。但是,当这个查询启动时, Spark 将从 socket 连接中持续检查新数据。如果有新数据,Spark 将运行一个 “incremental(增量)” 查询,它会结合以前的 running counts (运行计数)与新数据计算更新的 counts ,如下所示。

  • Model

这种模式与许多其他 stream processing engines (流处理引擎)有着显著不同。许多 streaming systems (流系统)要求用户本身保持运行 aggregations (聚合),因此必须要考虑容错,和数据一致性(at-least-once(至少一次), at-most-once (最多一次),exactly-once (完全一次))。在这个模型中,当有新数据时, Spark 负责更新 Result Table ,从而减轻用户对它的考虑。举个例子,我们来看一下这个模型如何处理对于基于 event-time 的处理和 late arriving (迟到)的数据。

Spark Structured Streaming 实战

使用 scala 开发 Structured Streaming 程序

  • StructuredNcWordCount.scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode/*** <p>** @author leone* @since 2019-03-28**/
object StructuredNcWordCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate()import spark.implicits._val line = spark.readStream.format("socket").option("host", "ip").option("port", "8000").load()// 由于控制台日志打印太多方便调试spark.sparkContext.setLogLevel("WARN")val words = line.as[String].flatMap(_.split(" "))val wordsCount = words.groupBy("value").count()// 注意:Append模式不支持基于数据流上的聚合操作(Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets)// 三种模式1:complete 所有内容都输出  2:append 新增的行才输出  3:update 更新的行才输出val query = wordsCount.writeStream.outputMode(OutputMode.Complete()).format("console").start()query.awaitTermination()}}
  • StructuredWithKafka.java
package com.andy.spark.structuredimport org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}/*** <p>structured 整合kafka** @author leone* @since 2019-03-29**/
object StructuredWithKafka {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate()spark.sparkContext.setLogLevel("WARN")import spark.implicits._val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node-2:9092,node-3:9092,node-4:9092").option("subscribe", "structured-topic")//.option("startingOffsets", "earliest").load()val kafkaDS: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]println(kafkaDS)kafkaDS.printSchema()val words = kafkaDS.flatMap(_._2.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode(OutputMode.Complete()).format("console").start()query.awaitTermination()}}

使用 java 开发 Structured Streaming 程序

  • StructuredStreamingWithNc.java

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;import java.util.Arrays;/*** <p>** @author leone* @since 2019-03-29**/
public class StructuredStreamingWithNc {public static void main(String[] args) throws StreamingQueryException {SparkSession spark = SparkSession.builder().appName("structured").master("local[*]").getOrCreate();spark.sparkContext().setLogLevel("warn");Dataset<Row> line = spark.readStream().format("socket").option("host", "ip").option("port", "8000").load();Dataset<String> words = line.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator(), Encoders.STRING());Dataset<Row> wordCount = words.groupBy("value").count();StreamingQuery query = wordCount.writeStream().outputMode(OutputMode.Complete()).format("console").start();query.awaitTermination();}}
  • StructuredStreamingWithKafka.java

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;import java.util.Arrays;/*** <p>** @author leone* @since 2019-04-02**/
public class StructuredStreamingWithKafka {public static void main(String[] args) throws StreamingQueryException {SparkSession spark = SparkSession.builder().master("local[*]").appName("structured").getOrCreate();spark.sparkContext().setLogLevel("warn");// Create DataSet representing the stream of input lines from kafkaDataset<String> line = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "node-2:9092,node-3:9092,node-4:9092").option("subscribe", "structured-topic").load().selectExpr("CAST(value AS STRING)").as(Encoders.STRING());// val kafkaDS: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]line.printSchema();Dataset<Row> wordCounts = line.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),Encoders.STRING()).groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}}

Spark Structured Steaming实战相关推荐

  1. 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理

    文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...

  2. 大数据之Spark:Structured Streaming

    目录 1. API 2. 核心思想 3. 应用场景 4.Structured Streaming 实战 1) 读取 Socket 数据 2) 读取目录下文本数据 3) 计算操作 4) 输出 在 2.0 ...

  3. Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析-Spark商业环境实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  4. 《Spark大数据分析实战》——1.4节弹性分布式数据集

    本节书摘来自华章社区<Spark大数据分析实战>一书中的第1章,第1.4节弹性分布式数据集,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区"华章社区"公众号查看 1. ...

  5. Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客.版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习. Sp ...

  6. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  7. Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误

    1.背景 Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误 > select * from default._xd_after limit 1; &g ...

  8. Spark: Structured JDBC 方式访问远程的高可用HA的HIVE

    1. 背景 Spark: Structured JDBC 方式访问远程的高可用HA的HIVE,hive是看高可用的,连接信息如下 jdbc:hive2://xx.cdh1.test.dtwave.in ...

  9. Spark: Structured + hive(Jdbc方式)卡死

    1.背景 Spark: Structured + hive(Jdbc方式)卡死,运行入下 19/11/04 15:50:48 INFO metadata.Hive: Registering funct ...

最新文章

  1. 重型车辆盲区行为检查Behaviours – Heavy Vehicle Blind Spots
  2. NB-IoT模组低功耗设计:DRX、eDRX和PSM(NB-IoT专栏—拓展篇2)
  3. linux孤立cpu,Linux 抛弃旧款 CPU,一下子少 50 万行代码
  4. 错把一加当苹果,美国海关侵权大案糗大了:被网友无情嘲讽
  5. java stringbuffer详解_Java常用类StringBuffer详解
  6. Netty之Channel的继承关系
  7. 前端学习(781):格式化日期年月日星期
  8. Linux 启动流程即init程序分析--1
  9. L1 loss L2 loss
  10. SAP License:2021年:传统ERP丧钟响起
  11. 安卓 App 库存系统开发 基础版本
  12. git checkout远程分支_Git检出远程分支
  13. jquery mysql实现加入购物车_jquery-实现加入购物车效果
  14. 读一封汶川地震的信:亲爱的,我要和别人结婚了
  15. db是哪个城市的缩写_全国所有城市拼音及缩写
  16. 2022年天猫、淘宝春节打烊还能继续下单吗?发货时间需要多久?
  17. opencv 显示无边框
  18. 一个简单的acm竞赛题
  19. 副业项目:今日头条音频项目,日入200+
  20. 橙瓜码字多端同步、十份云储存本地实时备份,最放心的码字软件

热门文章

  1. 半加器设计(结构描述法)
  2. 数学在计算机科学上的应用文献,计算机科学技术在数学中应用浅析
  3. 知名游戏设计师的 GitHub 仓库被删,CEO 道歉;工信部向四家公司发放 5G 牌照
  4. 小技巧 CSR蓝牙连接问题
  5. 美团运筹优化实战——智能配送系统阅读笔记
  6. 好好说话之Tcache Attack(2):tcache dup与tcache house of spirit
  7. xlrd,xlwt操作excel个人总结 自动化办公
  8. 美式口语发音技巧:《英美发音区别》
  9. 《鬼谷子的局1》 —— 读后总结
  10. 那些让我印象深刻的bug--03