2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
目录
输出终端/位置
文件接收器
Memory Sink
Foreach和ForeachBatch Sink
Foreach
ForeachBatch
代码演示
输出终端/位置
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees。
目前Structured Streaming内置FileSink、Console Sink、Foreach Sink(ForeachBatch Sink)、Memory Sink及Kafka Sink,其中测试最为方便的是Console Sink。
文件接收器
将输出存储到目录文件中,支持文件格式:parquet、orc、json、csv等,示例如下:
相关注意事项如下:
- 支持OutputMode为:Append追加模式;
- 必须指定输出目录参数【path】,必选参数,其中格式有parquet、orc、json、csv等等;
- 容灾恢复支持精确一次性语义exactly-once;
- 此外支持写入分区表,实际项目中常常按时间划分;
Memory Sink
此种接收器作为调试使用,输出作为内存表存储在内存中, 支持Append和Complete输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中,因此,请谨慎使用,示例如下:
Foreach和ForeachBatch Sink
Foreach
Structured Streaming提供接口foreach和foreachBatch,允许用户在流式查询的输出上应用任意操作和编写逻辑,比如输出到MySQL表、Redis数据库等外部存系统。其中foreach允许每行自定义写入逻辑,foreachBatch允许在每个微批量的输出上进行任意操作和自定义逻辑,建议使用foreachBatch操作。
foreach表达自定义编写器逻辑具体来说,需要编写类class继承ForeachWriter,其中包含三个方法来表达数据写入逻辑:打开,处理和关闭。
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html
streamingDatasetOfString.writeStream.foreach(new ForeachWriter[String] {def open(partitionId: Long, version: Long): Boolean = {// Open connection}def process(record: String): Unit = {// Write string to connection}def close(errorOrNull: Throwable): Unit = {// Close the connection}}).start()
ForeachBatch
方法foreachBatch允许指定在流式查询的每个微批次的输出数据上执行的函数,需要两个参数:微批次的输出数据DataFrame或Dataset、微批次的唯一ID。
使用foreachBatch函数输出时,以下几个注意事项:
1.重用现有的批处理数据源,可以在每个微批次的输出上使用批处理数据输出Output;
2.写入多个位置,如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset 。但是,每次写入尝试都会导致重新计算输出数据(包括可能重新读取输入数据)。要避免重新计算,您应该缓存cache输出 DataFrame/Dataset,将其写入多个位置,然后 uncache 。
3.应用其他DataFrame操作,流式DataFrame中不支持许多DataFrame和Dataset操作,使用foreachBatch可以在每个微批输出上应用其中一些操作,但是,必须自己解释执行该操作的端到端语义。
4.默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。
5.foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。
代码演示
使用foreachBatch将词频统计结果输出到MySQL表中,代码如下:
package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果存储到MySQL数据库表中*/
object StructuredForeachBatch {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()val resultStreamDF: DataFrame = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).groupBy($"value").count()val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).foreachBatch((batchDF: DataFrame, batchId: Long) => {println(s"BatchId = ${batchId}")if (!batchDF.isEmpty) {batchDF.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc")//.option("driver", "com.mysql.cj.jdbc.Driver")//MySQL-8//.option("url", "jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")//MySQL-8.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8").option("user", "root").option("password", "root").option("dbtable", "bigdata.t_struct_words").save()}}).start()query.awaitTermination()query.stop()}
}
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置相关推荐
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...
- 2021年大数据Spark(十四):Spark Core的RDD操作
目录 RDD的操作 函数(算子)分类 Transformation函数 Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...
- 客快物流大数据项目(四十八):Spark操作Kudu 修改表
Spark操作Kudu 修改表 代码示例 /*** 添加列* @param kuduContext*/ def addColumn(kuduContext: KuduContext): Unit ={ ...
- 2021年大数据Spark(十二):Spark Core的RDD详解
目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- 2021年大数据Spark(十五):Spark Core的RDD常用算子
目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 聚合函数算子 Scala集合中的聚合函数 ...
- 2021年大数据Flink(十八):Flink Window操作
目录 Flink-Window操作 为什么需要Window Window的分类 按照time和count分类 按照slide和size分类 总结 Window ...
- 2021年大数据Spark(十六):Spark Core的RDD算子练习
目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 first.take.top 算子 ...
- 2021年大数据Hadoop(十四):HDFS的高可用机制
全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 HDFS的高可用机制 HDFS高可用介绍 组件介绍 Nam ...
最新文章
- Java 如何设计 API 接口,实现统一格式返回?
- 时间就是金钱!Windows 上必装的 10 款高效软件
- Java 洛谷 P1161 开灯
- 编译C程序提示之'for' loop initial declaration used outside C99 mode
- [SPOJ - FTOUR2] Free tour II(点分治 + 背包dp + 启发式合并)
- 今日头条Java后台Java研发三面题目
- Leetcode--238. 除自身以外数组的乘积
- flask mysql 1366_2017-11-17 Python Flask Script+mysql环境设置
- 基于mapreducer的图算法
- 用python把相同名称的放在一起,python实现将具有相同名称的文件放入相应的文件夹中,把,对应,内...
- GitHub上最全中华古诗词数据库又火了
- 《电脑音乐制作实战指南:伴奏、录歌、MTV全攻略》——第1篇 获取伴奏篇 第1章 MIDI音乐伴奏的获取与制作 1.1 电脑MIDI音乐与设备的介绍...
- 软件测试笔记2-目的
- 【古代文学论文】酒文化传播中唐代文学的作用分析(节选)
- c语言59秒倒计时程序,59分59秒倒计时程序及仿真显示
- 【有利可图网】多图如何排版?分享几个方案
- mysql 页分裂_InnoDB中的页合并与分裂
- docker之卷10
- Photoshop Cs5上经常使用的快捷键汇总
- 老板让我做研发负责人,谈谈我的想法和认知
热门文章
- php recordarray,Array 数组 - [ php中文手册 ] - 在线原生手册 - php中文网
- 求字符串全排列 python实现
- 解决:sql中将日期字符串当做日期类型处理
- 【微服务架构】SpringCloud之路由网关(zuul)
- xgboost重要参数1
- tf.cast()数据类型转换
- SLAM图优化g2o
- 用NVIDIA A100 GPUs提高计算机视觉
- Ubuntu 系统 Pycharm中无法使用中文输入法问题
- Eclipse工具栏上android的机器人小图标和adt图片的显示