目录

输出终端/位置

文件接收器

​​​​​​​Memory Sink

Foreach和ForeachBatch Sink

Foreach

​​​​​​​ForeachBatch

​​​​​​​代码演示


输出终端/位置

Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees

目前Structured Streaming内置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中测试最为方便的是Console Sink。

​​​​​​​文件接收器

将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下:

相关注意事项如下:

  • 支持OutputMode为:Append追加模式;
  • 必须指定输出目录参数【path】,必选参数,其中格式有parquet、orc、json、csv等等;
  • 容灾恢复支持精确一次性语义exactly-once;
  • 此外支持写入分区表,实际项目中常常按时间划分;

​​​​​​​Memory Sink

此种接收器作为调试使用,输出作为内存表存储在内存中, 支持Append和Complete输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中,因此,请谨慎使用,示例如下:

ForeachForeachBatch Sink

Foreach

Structured Streaming提供接口foreach和foreachBatch,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。其中foreach允许每行自定义写入逻辑,foreachBatch允许在每个微批量的输出上进行任意操作和自定义逻辑,建议使用foreachBatch操作。

foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个方法来表达数据写入逻辑:打开,处理和关闭。

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

streamingDatasetOfString.writeStream.foreach(new ForeachWriter[String] {def open(partitionId: Long, version: Long): Boolean = {// Open connection}def process(record: String): Unit = {// Write string to connection}def close(errorOrNull: Throwable): Unit = {// Close the connection}}).start()

​​​​​​​ForeachBatch

方法foreachBatch允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。

使用foreachBatch函数输出时,以下几个注意事项:

1.重用现有的批处理数据源,可以在每个微批次的输出上使用批处理数据输出Output;

2.写入多个位置,如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset 。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。要避免重新计算,您应该缓存cache输出 DataFrame/Dataset,将其写入多个位置,然后 uncache 。

3.应用其他DataFrame操作,流式DataFrame中不支持许多DataFrame和Dataset操作,使用foreachBatch可以在每个微批输出上应用其中一些操作,但是,必须自己解释执行该操作的端到端语义。

4.默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。

5.foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。

​​​​​​​代码演示

使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果存储到MySQL数据库表中*/
object StructuredForeachBatch {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()val resultStreamDF: DataFrame = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).groupBy($"value").count()val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println(s"BatchId = ${batchId}")if (!batchDF.isEmpty) {batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc")//.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8//.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8").option("user", "root").option("password", "root").option("dbtable", "bigdata.t_struct_words").save()}}).start()query.awaitTermination()query.stop()}
}

2021年大数据Spark(四十八):Structured Streaming 输出终端/位置相关推荐

  1. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  2. 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...

  3. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

  4. 客快物流大数据项目(四十八):Spark操作Kudu 修改表

    Spark操作Kudu 修改表 代码示例 /*** 添加列* @param kuduContext*/ def addColumn(kuduContext: KuduContext): Unit ={ ...

  5. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  6. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  7. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  8. 2021年大数据Flink(十八):Flink Window操作

    目录 ​​​​​​​Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 ​​​​​​​按照slide和size分类 ​​​​​​​总结 Window ...

  9. 2021年大数据Spark(十六):Spark Core的RDD算子练习

    目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first.take.top 算子 ​​​ ...

  10. 2021年大数据Hadoop(十四):HDFS的高可用机制

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...

最新文章

  1. Java 如何设计 API 接口,实现统一格式返回?
  2. 时间就是金钱!Windows 上必装的 10 款高效软件
  3. Java 洛谷 P1161 开灯
  4. 编译C程序提示之'for' loop initial declaration used outside C99 mode
  5. [SPOJ - FTOUR2] Free tour II(点分治 + 背包dp + 启发式合并)
  6. 今日头条Java后台Java研发三面题目
  7. Leetcode--238. 除自身以外数组的乘积
  8. flask mysql 1366_2017-11-17 Python Flask Script+mysql环境设置
  9. 基于mapreducer的图算法
  10. 用python把相同名称的放在一起,python实现将具有相同名称的文件放入相应的文件夹中,把,对应,内...
  11. GitHub上最全中华古诗词数据库又火了
  12. 《电脑音乐制作实战指南:伴奏、录歌、MTV全攻略》——第1篇 获取伴奏篇 第1章 MIDI音乐伴奏的获取与制作 1.1 电脑MIDI音乐与设备的介绍...
  13. 软件测试笔记2-目的
  14. 【古代文学论文】酒文化传播中唐代文学的作用分析(节选)
  15. c语言59秒倒计时程序,59分59秒倒计时程序及仿真显示
  16. 【有利可图网】多图如何排版?分享几个方案
  17. mysql 页分裂_InnoDB中的页合并与分裂
  18. docker之卷10
  19. Photoshop Cs5上经常使用的快捷键汇总
  20. 老板让我做研发负责人,谈谈我的想法和认知

热门文章

  1. php recordarray,Array 数组 - [ php中文手册 ] - 在线原生手册 - php中文网
  2. 求字符串全排列 python实现
  3. 解决:sql中将日期字符串当做日期类型处理
  4. 【微服务架构】SpringCloud之路由网关(zuul)
  5. xgboost重要参数1
  6. tf.cast()数据类型转换
  7. SLAM图优化g2o
  8. 用NVIDIA A100 GPUs提高计算机视觉
  9. Ubuntu 系统 Pycharm中无法使用中文输入法问题
  10. Eclipse工具栏上android的机器人小图标和adt图片的显示