目录

  • 1 连续处理概述
  • 2 编程实现
  • 3 支持查询

1 连续处理概述

连续处理(Continuous Processing)是Spark 2.3中引入的一种新的实验性流执行模式,可实现低的(~1 ms)端到端延迟,并且至少具有一次容错保证。 将其与默认的微批处理(micro-batchprocessing)引擎相比较,该引擎可以实现一次性保证,但最多可实现~100ms的延迟。

在实时流式应用中,最典型的应用场景:网站UV统计。

  • 业务需求一:实时统计网站UV,比如每日网站UV;
  • 业务需求二:统计最近一段时间(比如一个小时)网站UV,可以设置水位Watermark;
    在SparkStreaming或Flink框架中要想实现【网站UV统计】需要借助于外部存储系统,比如Redis内存数据库或者HBase列式存储数据库,存储UserId,利用数据库特性去重,最后进行count。Structured Streaming可以使用deduplication对有无Watermark的流式数据进行去重操作:
  • 第一、无 Watermark:对重复记录到达的时间没有限制。查询会保留所有的过去记录作为状态用于去重;
  • 第二、有 Watermark:对重复记录到达的时间有限制。查询会根据水印删除旧的状态数据;
    官方提供示例代码如下:

    演示范例:对网站用户日志数据,按照userId和eventType去重统计,网站代码如下。
package cn.oldlu.spark.deduplicateimport org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}object StructuredDeduplication {def main(args: Array[String]): Unit = {// 构建SparkSession实例对象val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]")// 设置Shuffle分区数目.config("spark.sql.shuffle.partitions", "2").getOrCreate()// 导入隐式转换和函数库import spark.implicits._import org.apache.spark.sql.functions._// 1. 从TCP Socket 读取数据val inputTable: DataFrame = spark.readStream.format("socket").option("host", "node1.oldlu.cn").option("port", 9999).load()// 2. 数据处理分析val resultTable: DataFrame = inputTable.as[String].filter(line => null != line && line.trim.length > 0)// 样本数据:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}.select(get_json_object($"value", "$.eventTime").as("event_time"), //get_json_object($"value", "$.eventType").as("event_type"), //get_json_object($"value", "$.userID").as("user_id") //)// 按照UserId和EventType去重.dropDuplicates("user_id", "event_type").groupBy($"user_id", $"event_type").count()// 3. 设置Streaming应用输出及启动val query: StreamingQuery = resultTable.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination() // 流式查询等待流式应用终止// 等待所有任务运行完成才停止运行query.stop()}
}

测试数据如下:

{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}
{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}
{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}

运行应用结果如下:

连续处理(Continuous Processing)是“真正”的流处理,之所以说“真正”是因为 continuousmode是传统的流处理模式,通过运行一个long-running的operator用来处理数据。之前SparkStreaming是基于 micro-batch 模式的,就被很多人诟病不是“真正的”流式处理。continuous mode处理模式只要一有数据可用就会进行处理,如下图所示:
epoch是input event stream中数据被发送给operator处理的最小单位,在处理过程中,epoch的offset会被记录到WAL中。另外continuous模式下的snapshot存储使用的一致性算法是Chandy-Lamport算法。
与micro-batch模式缺点和优点都很明显,缺点是不容易做扩展,优点是延迟更低。为什么延迟更低,下面两幅图目了然:

  • 微批处理(Micro-batch Processing)
  • 连续处理(Continue Processing)
    在一台4核服务器上对Structured Streaming的连续处理模式进行基准测试,该测试展示了延迟
    -吞吐量的权衡(因为分区是独立运行的,希望延迟与节点数量保持一致)。
    上图展示了一个map任务的结果,这个map任务从Kafka中读取数据,虚线展示了微批模式能达到的最大吞吐量。可以看到,在连续模式下,吞吐量不会大幅下降,但是延迟会更低。(小于10毫秒的延迟,只有微批处理模式最大吞吐量的一半)。它的最大稳定吞吐量也略高,因为微批处理模式由于任务调度而导致延迟。

2 编程实现

要在连续处理模式下运行支持的查询,只需指定一个continuous trigger,并将所需的检查点
间隔作为参数,示例代码:

// 从Kafka消费数据
val streamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
// 数据ETL后保存Kafka
streamDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").trigger(Trigger.Continuous("1 second")) // only change in query.start()

检查点间隔为1秒意味着连续处理引擎将每秒记录查询的进度,生成的检查点采用与微批处理
引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。 例如以微批处理模式启动的支持
查询可以以连续模式(continuous mode)重新启动,反之亦然。
范例演示:从Kafka实时消费数据,经过ETL处理后,将数据发送至Kafka Topic。

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** 从Spark 2.3版本开始,StructuredStreaming结构化流中添加新流式数据处理方式:Continuous processing* 持续流数据处理:当数据一产生就立即处理,类似Storm、Flink框架,延迟性达到100ms以下,目前属于实验开发阶段*/
object StructuredContinuous {def main(args: Array[String]): Unit = {// 构建SparkSession实例对象val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[3]").config("spark.sql.shuffle.partitions", "3").getOrCreate()import spark.implicits._// 1. 从KAFKA读取数据val kafkaStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1.oldlu.cn:9092").option("subscribe", "stationTopic").load()// 2. 对基站日志数据进行ETL操作// station_0,18600004405,18900009049,success,1589711564033,9000val etlStreamDF: Dataset[String] = kafkaStreamDF// 获取value字段的值,转换为String类型.selectExpr("CAST(value AS STRING)")// 转换为Dataset类型.as[String]// 过滤数据:通话状态为success.filter { log =>null != log && log.trim.split(",").length == 6 && "success".equals(log.trim.split(",")(3))}// 3. 针对流式应用来说,输出的是流val query: StreamingQuery = etlStreamDF.writeStream.outputMode(OutputMode.Append()).format("kafka").option("kafka.bootstrap.servers", "node1.oldlu.cn:9092").option("topic", "etlTopic")// 设置检查点目录.option("checkpointLocation", s"datas/structured/etl-100002")// TODO: 设置持续流处理 Continuous Processing, 指定CKPT时间间隔/*the continuous processing engine will records the progress of the query every second持续流处理引擎,将每1秒中记录当前查询Query进度状态*/.trigger(Trigger.Continuous("1 second")).start() // 流式应用,需要启动start// 查询器等待流式应用终止query.awaitTermination()query.stop() // 等待所有任务运行完成才停止运行}
}

运行应用程序,观察数据生成与数据处理发送Kafka时间,实时性很快,延迟性在毫秒级别。

3 支持查询

从Spark 2.3开始,连续处理模式才出现,目前仅支持以下类型的查询:

  • 第一、数据源Sources:Kafka source(支持所有选项)及Rate source(仅仅适合测试)。
  • 第二、接收器Sinks:Kafka sink(支持所有选项)、Console sink(适合调试)。
  • 第三、DataFrame Operations操作:在连续模式下仅支持类似 map 的 Dataset/DataFrame 操
    作,即仅投影(select,map,flatMap,mapPartitions等)和选择(where,filter等)。
    连续处理引擎启动多个长时间运行的任务,这些任务不断从源中读取数据,处理数据并连续写
    入接收器。 查询所需的任务数取决于查询可以并行从源读取的分区数。 因此,在开始连续处理查
    询之前,必须确保群集中有足够的核数并行执行所有任务。例如,如果正在读取具有10个分区的
    Kafka主题,则群集必须至少具有10个核数才能使查询取得进展。

大数据Spark Continuous Processing相关推荐

  1. 大数据Spark企业级实战 PDF 下载 和目录

    大数据Spark企业级实战  PDF完整版 下载地址 http://download.csdn.net/detail/laoge/9504794 基本信息 书名:大数据Spark企业级实战 定价:12 ...

  2. 2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    2016年大数据Spark"蘑菇云"行动代码学习之AdClickedStreamingStats模块分析     系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系 ...

  3. 光环大数据spark文档_推荐大数据Spark必读书目

    我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通.而且更耐得住加班,并且是自愿加班,毫无怨言.不像我,6点到准时走人了.但就是这么一位兢兢业业的技术人,却一直没有升职加 ...

  4. 大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一)

    大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一) 2017-03-27 11:58  浏览次数:148 1. 背景 前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技 ...

  5. 推荐大数据Spark必读书目

    点击蓝色"有关SQL"关注我哟 加个"星标",天天与10000人一起快乐成长 我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通. ...

  6. 大数据Spark超经典视频链接全集

    论坛贴吧等信息发布参考模板 Scala.Spark史上最全面.最详细.最彻底的一整套视频全集(特别是机器学习.Spark Core解密.Spark性能优化.Spark面试宝典.Spark项目案例等). ...

  7. 大数据Spark入门案例5–统计广告点击数量排行Top3(scala版本)

    大数据Spark入门案例5–统计每广告点击数量排行Top3(scala版本) 1 数据准备 链接:https://pan.baidu.com/s/1afzmL-hNsAJl1_gx_dH2ag 提取码 ...

  8. 大数据Spark企业级实战与Hadoop实战PDF和PPT

    今天给大家分享的是<大数据Spark企业级实战>与<Hadoop实战><大数据处理系统·Hadoop源代码情景分析><50个大厂大数据算法教程>等销量排 ...

  9. 大数据Spark实战视频教程-张长志-专题视频课程

    大数据Spark实战视频教程-33364人已学习 课程介绍         大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装.Spark表配置.平台搭建.快学Scala入门.Sp ...

  10. 大数据Spark “蘑菇云”行动第103课:Hive源码大师之路第一步:Hive源码思考和解析初体验

    大数据Spark "蘑菇云"行动第103课:Hive源码大师之路第一步:Hive源码思考和解析初体验 老师上课使用的Hive源码下载地址:http://www-eu.apache. ...

最新文章

  1. static和global的区别
  2. vs2013突然没有代码提示功能了。
  3. r k-means 分类结果_机器学习-Kmeans均值聚类算法(贪心学院)
  4. 电子商务的安全机制及商务模式
  5. datefromstring 转换不准确_免费的在线OCR工具,将图片内容转换为文本内容
  6. Docker学习总结(18)——阿里超大规模Docker化之路
  7. 【C++ Primer】第十四章 C++中的代码重用
  8. fn:startsWith()函数
  9. 从金钱社会向财富第三极的过渡方案
  10. [MATLAB]数值计算
  11. 架构师害怕程序员知道的十项技能
  12. golang 修改全局默认时区的方法
  13. 【NOIP2015模拟10.28B组】终章-剑之魂
  14. Excel如何快速评定考核成绩等级
  15. LWN: 名为 Sequoia 的 seq_file 漏洞!
  16. Gstreamer基础教程13:Playback Speed
  17. 互联网日报 | 7月1日 星期四 | 滴滴正式登陆纽交所;奈雪的茶上市首日破发;2021年铁路暑运今日正式启动...
  18. LSP标识符(LSP ID)
  19. 计算加、减、乘、除的函数
  20. 计算机音乐春分秋分,春分和秋分的古诗词

热门文章

  1. SEO入门知识2:不同角度看seo
  2. 杭州女程序员自述:疫情之下被迫离职,仲裁说理被公司索赔百万
  3. Expressive JavaScript
  4. python编程从入门到实战16章x轴刻度与书不一样,2020-10-05 Python编程从入门到实践 第16章 下载数据 动手试...
  5. h5 js 打开微信客户端
  6. mac系统通过ADB与scrcpy实现手机投屏
  7. 2020版无人机组装与维修(芯片级)
  8. python del用法_python中del函数的用法详解
  9. 粗柳簸箕细柳斗,谁嫌爬虫男人丑 之 异步协程半秒扒光一本小说
  10. 3.22全局参数的保存_补作业来啦~~