文章目录

  • 一 source
    • 1.1 sockSorce(用于测试)
    • 1.2 fileSource
      • 1.2.1 读取csv文件
      • 1.2.2 读取自动分区的文件夹内的文件
    • 1.3 Rate Source(用于测试)
    • 1.4 kafkaSource
      • 1.4.1 以 Streaming 模式创建 Kafka 工作流
      • 1.4.2 通过 Batch 模式创建 Kafka 工作流
  • 二 操作 Streaming DataFrame/Streaming DataSet
    • 2.1 基于Sql查询
    • 2.2 基于event-time的窗口操作
      • 2.2.1 从socket流中接收数据,根据事件时间进行计算:
      • 2.2.2 窗口计算规则
    • 2.3 基于water-maker(水印)处理延时数据
      • 2.3.1 在update模式下使用水印
      • 2.3.2 在append模式下使用水印
      • 2.3.3 总结
    • 2.4 流数据去重 dropDuplicates
    • 2.5 join操作
      • 2.5.1 流和静态数据的Join
      • 2.5.2 流和流之间的Join
    • 2.6 触发器
    • 2.7 输出接收器Sink
      • 2.7.1 file Sink
      • 2.7.2 console Sink
      • 2.7.3 kafka Sink
      • 2.7.4 memory Sink
      • 2.7.5 foreach Sink
      • 2.7.6 foreachBatch Sink

一 source

官网介绍:

File source path: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section. Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
Socket Source host: host to connect to, must be specified port: port to connect to, must be specified No
Rate Source rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed. Yes
Kafka Source See the Kafka Integration Guide. Yes
  1. File source 读取文件夹中的文件作为流式数据. 支持的文件格式: text, csv, josn, orc, parquet. 注意, 文件必须放置的给定的目录中, 在大多数文件系统中, 可以通过移动操作来完成.
  2. kafka source 从 kafka 读取数据. 目前兼容 kafka 0.10.0+ 版本
  3. socket source 用于测试. 可以从 socket 连接中读取 UTF8 的文本数据. 侦听的 socket 位于驱动中. 注意, 这个数据源仅仅用于测试.
  4. rate source 用于测试. 以每秒指定的行数生成数据,每个输出行包含一个 timestamp 和 value。其中 timestamp 是一个 Timestamp类型(信息产生的时间),并且 value 是 Long 包含消息的数量. 用于测试和基准测试.

1.1 sockSorce(用于测试)

object WordCount {//从scoket 网络端口接受流 对流进行计算 可持续不断的对结果进行统计def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[2]").getOrCreate()import spark.implicits._// 构建流val df: DataFrame = spark.readStream.format("socket").option("host", "hadoop102").option("port", "9999").load()val rsDf = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()rsDf.writeStream.format("console").outputMode("complete").start().awaitTermination()}
}

1.2 fileSource

1.2.1 读取csv文件

package com.gc.structured.streaming.day01import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}object FileSourceDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("FileSourceDemo").master("local[2]").getOrCreate()//val shema: StructType = new StructType().add("name", StringType).add("age", IntegerType).add("sex", StringType)val dataFrame: DataFrame = spark.readStream.format("csv").schema(shema).load("D:\\sparkSql")// 此处必须指定为文件夹 否则会报错// 将结果输出在控制台dataFrame.writeStream.format("console")//.outputMode(OutputMode.Complete()) //输出模式 Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;.outputMode(OutputMode.Append()) .start().awaitTermination()}
}

会持续读取该目录下新增的文件,当文件格式不匹配的时候,读取出的数据会全是null,修改文件内容不会触发读取任务,修改文件名会从新读取。
结果:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---+-----+
|    name|age|  sex|
+--------+---+-----+
|zhangsan| 12|  man|
|    lisi| 14|woman|
|  wangwu| 12|  man|
|zhaoqian| 12|  man|
+--------+---+-----+

1.2.2 读取自动分区的文件夹内的文件

当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区.
如果文件夹的命名规则不是“key=value”形式, 则不会触发自动分区. 另外, 同级目录下的文件夹的命名规则必须一致.


再次运行上面的代码得到如下的数据结构:会自动按照文件名添加对应的字段

Batch: 0
-------------------------------------------
+--------+---+-----+----+-----+
|    name|age|  sex|year|month|
+--------+---+-----+----+-----+
|zhangsan| 12|  man|2019|   12|
|    lisi| 14|woman|2019|   12|
|  wangwu| 12|  man|2019|   12|
|zhaoqian| 12|  man|2019|   12|
|zhangsan| 12|  man|2019|   11|
|    lisi| 14|woman|2019|   11|
|  wangwu| 12|  man|2019|   11|
|zhaoqian| 12|  man|2019|   11|
+--------+---+-----+----+-----+

1.3 Rate Source(用于测试)

以固定的速率生成固定格式的数据, 用来测试 Structured Streaming 的性能.

package com.gc.structured.streaming.day01import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}object RateSource {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("RateSource").getOrCreate()val df: DataFrame = spark.readStream.format("rate").option("rowsPerSecond", 100) // 设置每秒产生的数据的条数, 默认是 1.option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0.option("numPartitions", 2) /// 设置分区数  默认是 spark 的默认并行度.loaddf.writeStream.outputMode("append").trigger(Trigger.Continuous(1000)).format("console").option("truncate",false).start().awaitTermination()}
}

运行结果:

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2019-09-25 20:04:18.336|139  |
|2019-09-25 20:04:18.356|141  |
|2019-09-25 20:04:18.376|143  |
|2019-09-25 20:04:18.396|145  |
|2019-09-25 20:04:18.416|147  |
|2019-09-25 20:04:18.436|149  |
|2019-09-25 20:04:18.456|151  |
|2019-09-25 20:04:18.476|153  |
|2019-09-25 20:04:18.496|155  |
|2019-09-25 20:04:18.516|157  |
|2019-09-25 20:04:18.536|159  |
|2019-09-25 20:04:18.556|161  |
|2019-09-25 20:04:18.576|163  |
|2019-09-25 20:04:18.596|165  |
|2019-09-25 20:04:18.616|167  |
|2019-09-25 20:04:18.636|169  |
|2019-09-25 20:04:18.656|171  |
|2019-09-25 20:04:18.676|173  |
|2019-09-25 20:04:18.696|175  |
|2019-09-25 20:04:18.716|177  |
+-----------------------+-----+
only showing top 20 rows

1.4 kafkaSource

参考文档: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

1.4.1 以 Streaming 模式创建 Kafka 工作流

需求:对接kafka 实现整数数据的累加

package com.gc.structured.streaming.day01import org.apache.spark.sql.{SparkSession}/*** 读取kafka 的数据流 实现实时计算 求和*/
object KafkaSumDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("KafkaSumDemo").master("local[2]").getOrCreate()import  spark.implicits._val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop103:9092").option("subscribe", "first") // 主题.load()  // 返回数据 kafka key value 等分区信息.selectExpr("CAST(value AS String)") //查询出kafka 中 对应的value.as[(String)]df.map(row=>row.toLong).toDF("value").createOrReplaceTempView("tmp")val frame = spark.sql("select sum(if(value is null,0,value)) from tmp") //sql才是王道阿frame.writeStream.format("console").outputMode("update").start().awaitTermination()}
}

1.4.2 通过 Batch 模式创建 Kafka 工作流

这种模式一般需要设置消费的起始偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
该模式为一次性作业(批处理), 而非持续性的处理数据.
为了方便进行累加求和:创建一个second主题

[guochao@hadoop102 kafka_0.11]$ ./bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic second --create --partitions 3 --replication-factor 3

生产一些数据至对应的主题:

[guochao@hadoop102 kafka_0.11]$ ./bin/kafka-console-producer.sh --broker-list hadoop102:9092  --topic second
>1
>2
>3
>4
>5
>6
>7
>
object KafkaSumBatchDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("KafkaSumBatchDemo").master("local[2]").getOrCreate()import spark.implicits._val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop103:9092").option("subscribe", "second") // 主题.option("startingOffsets", "earliest") // 开始消费Offset  从头开始消费.option("endingOffsets", "latest") // 结束的offset.load()  // 返回数据 kafka key value 等分区信息.selectExpr("CAST(value AS String)") //查询出kafka 中 对应的value.as[(String)]df.map(row=>row.toLong).toDF("value").createOrReplaceTempView("tmp")val frame = spark.sql("select sum(if(value is null,0,value)) from tmp") //sql才是王道阿frame.write.format("console").save()}
}

计算结果:

+----------------------------------------------------+
|sum((IF((value IS NULL), CAST(0 AS BIGINT), value)))|
+----------------------------------------------------+
|                                                  28|
+----------------------------------------------------+

这种方式相当于从指定的开始和结束位置一次拉取一批数据,只计算一次,计算完就退出应用了。

二 操作 Streaming DataFrame/Streaming DataSet

2.1 基于Sql查询

从kafka中消费数据,数据格式如下

zhangsan,12,man
lisi,14,woman
wangwu,12,man
zhaoqian,12,man

对kafka生产的数据进行实时查询

package com.gc.structured.streaming.day01import org.apache.spark.sql.SparkSession
// 从kafka 中读取数据流 并做实时查询
object KafkaSqlOptionDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("KafkaSqlOptionDemo").getOrCreate()// 读取数据流import  spark.implicits._val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop014:9092").option("subscribe", "second").load().selectExpr("cast(value as string)").as[String].map(line => {if (line != null) {val splits = line.split(",")Person(splits(0), splits(1).toInt, splits(2)) // 封装样例类} else {Person("default", 0, "default")}})df.createOrReplaceTempView("person")//创建临时表spark.sql("select * from person").writeStream.format("console").outputMode("update").start().awaitTermination()}
}
case class Person(name:String,age:Int,sex:String)

结果:

Batch: 2
-------------------------------------------
+--------+---+---+
|    name|age|sex|
+--------+---+---+
|zhaoqian| 12|man|
+--------+---+---+-------------------------------------------
Batch: 3
-------------------------------------------
+--------+---+---+
|    name|age|sex|
+--------+---+---+
|wangba| 24|man|
+--------+---+---+

2.2 基于event-time的窗口操作

http://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#window-operations-on-event-time

2.2.1 从socket流中接收数据,根据事件时间进行计算:

package com.gc.structured.streaming.day02import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCountByWindow1 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("WordCountByWindow1").master("local[2]").getOrCreate()//读取数据流import spark.implicits._val socketDstream: Dataset[String] = spark.readStream.format("socket").option("host", "hadoop102").option("port", 9999).load().as[(String)]import org.apache.spark.sql.functions._// 处理数据val dFrame = socketDstream.flatMap(line => {val splits = line.split("_")splits(1).split(",").map((word => (word, splits(0))))}).toDF("word", "tm")dFrame.groupBy(window($"tm", "4 minutes", "2 minutes"), // $ 是个方法  目的取列名 需要根据事件的时间和word 一起分组 指定窗口的长度和滑动长度$"word").count().orderBy("window") .writeStream.format("console").outputMode(OutputMode.Complete()).option("truncate",false).start().awaitTermination()}
}

输入数据:

2019-08-14 10:55:00_dog,hello,word
2019-08-14 10:56:00_dog,hello,word
2019-08-14 10:55:00_dog,hello,word

计算结果:

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|hello|1    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|word |1    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|dog  |1    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|word |1    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|dog  |1    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|hello|1    |
+------------------------------------------+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|hello|1    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|word |1    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|dog  |1    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|word |2    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|dog  |2    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|hello|2    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|dog  |1    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|word |1    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|hello|1    |
+------------------------------------------+-----+-----+-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+-----+
|window                                    |word |count|
+------------------------------------------+-----+-----+
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|hello|2    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|word |2    |
|[2019-08-14 10:52:00, 2019-08-14 10:56:00]|dog  |2    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|word |3    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|dog  |3    |
|[2019-08-14 10:54:00, 2019-08-14 10:58:00]|hello|3    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|dog  |1    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|word |1    |
|[2019-08-14 10:56:00, 2019-08-14 11:00:00]|hello|1    |
+------------------------------------------+-----+-----+

由计算结果可看出,与数据的接收时间没有关系,是根据事件发生的事件,在对应的有效窗口内进行计算的。

2.2.2 窗口计算规则

org.apache.spark.sql.catalyst.analysis.TimeWindowing
/*** Generates the logical plan for generating window ranges on a timestamp column. Without* knowing what the timestamp value is, it's non-trivial to figure out deterministically how many* window ranges a timestamp will map to given all possible combinations of a window duration,* slide duration and start time (offset). Therefore, we express and over-estimate the number of* windows there may be, and filter the valid windows. We use last Project operator to group* the window columns into a struct so they can be accessed as `window.start` and `window.end`.** The windows are calculated as below:* maxNumOverlapping <- ceil(windowDuration / slideDuration)* for (i <- 0 until maxNumOverlapping)*   windowId <- ceil((timestamp - startTime) / slideDuration)*   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime*   windowEnd <- windowStart + windowDuration*   return windowStart, windowEnd** This behaves as follows for the given parameters for the time: 12:05. The valid windows are* marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the* Filter operator.* window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m*     11:55 - 12:07 +                      11:52 - 12:04 x*     12:00 - 12:12 +                      11:57 - 12:09 +*     12:05 - 12:17 +                      12:02 - 12:14 +** @param plan The logical plan* @return the logical plan that will generate the time windows using the Expand operator, with*         the Filter operator for correctness and Project for usability.*/

核心代码如下:

 maxNumOverlapping <- ceil(windowDuration / slideDuration)for (i <- 0 until maxNumOverlapping)windowId <- ceil((timestamp - startTime) / slideDuration)windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

上文中输入数据的窗口计算规则:

输入数据:2019-08-14 10:55:00_dog,hello,wordwindow($"tm", "4 minutes", "2 minutes"), //设置的窗口参数startTime 没传默认是0windowDuration =4  slideDuration=2maxNumOverlapping =2  //计算出最大的窗口数为 2windowId <- ceil((timestamp - startTime) / slideDuration)    55/2 向上取整28windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime56+(0-2)*2 = 52 windowStart:2019-08-14 10:52:00 windowEnd <- windowStart + windowDuration
windowEnd :2019-08-14 10:56:00 最终计算完的窗口如下:[2019-08-14 10:52:00  2019-08-14 10:56:00 ][2019-08-14 10:54:00  2019-08-14 10:58:00 ]

2.3 基于water-maker(水印)处理延时数据

在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达. 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time. 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤.
现在考虑如果事件延迟到达会有哪些影响. 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用. 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11. 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态


官网介绍

Conditions for watermarking to clean aggregation state It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).

  • Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state. See the Output Modes section for detailed explanation of the semantics of each output mode.
  • The aggregation must have either the event-time column, or a window on the event-time column.
  • withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.
  • withWatermark must be called before the aggregation for the watermark details to be used. For example, df.groupBy(“time”).count().withWatermark(“time”, “1 min”) is invalid in Append output mode.

水印计算公式:
watermark 计算: watermark = MaxEventTime - Threshhod
而且, watermark只能逐渐增加, 不能减少
初始水印值为0

2.3.1 在update模式下使用水印

package com.gc.structured.streaming.day02import java.sql.Timestampimport org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}object WordCountByWindowWaterMaker {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("WordCountByWindowWaterMaker").master("local[2]").getOrCreate()//读取数据流import spark.implicits._val socketDstream: Dataset[String] = spark.readStream.format("socket").option("host", "hadoop102").option("port", 9999).load().as[(String)]import org.apache.spark.sql.functions._// 处理数据val dFrame = socketDstream.flatMap(line => {//2019-08-14 10:55:00_dogval splits = line.split("_")splits(1).split(",").map(word => (word, Timestamp.valueOf(splits(0))))}).toDF("word", "tm")dFrame.withWatermark(eventTime="tm", delayThreshold="2 minutes").groupBy(window($"tm", "10 minutes", "2 minutes"),$"word").count().writeStream.format("console").outputMode(OutputMode.Update()).option("truncate",false).start().awaitTermination()}
}

① 输入一条测试数据:2019-08-14 10:55:00_dog

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |1    |
|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |1    |
+------------------------------------------+----+-----+

水印值更新为:2019-08-14 10:53:00
② 输入第二条数据:2019-08-14 11:00:00_dog

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 11:00:00, 2019-08-14 11:10:00]|dog |1    |
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |2    |
|[2019-08-14 10:58:00, 2019-08-14 11:08:00]|dog |1    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |2    |
|[2019-08-14 10:56:00, 2019-08-14 11:06:00]|dog |1    |
+------------------------------------------+----+-----+

由于现在的水印值是2019-08-14 10:53:00 所以每个窗口都有效,在Update模式下只显示更新的数据
水印值更新为:2019-08-14 10:58:00
③ 输入一条相当于延迟的数据:2019-08-14 10:55:00_dog

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |3    |
|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |2    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |3    |
+------------------------------------------+----+-----+

当前水印值为:2019-08-14 10:58:00 由于

|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |

这两个窗口的最大值小于当前水印的值,所以这两个窗口会被过滤掉。
再次计算水印值为:2019-08-14 10:53:00 由于新计算出的水印值小于当前水印值不进行更新(水印值只能增加,不能减少)

2.3.2 在append模式下使用水印

将上面的outputMode(OutputMode.Update()) 改为.outputMode(OutputMode.Append())

在 append 模式中, 仅输出新增的数据, 且输出后的数据无法变更.
继续进行上面的测试:
① 输入一条测试数据:2019-08-14 10:55:00_dog

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

根据window($“tm”, “10 minutes”, “2 minutes”)计算出5个窗口,但是由于初始的watermaker值为0,所有窗口的结束时间都大于水印值, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermask, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态.
计算出水印值:2019-08-14 10:53:00
② 输入测试数据:2019-08-14 11:00:00_dog

-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

这条数据作为第二批数据, 计算得到 5 个窗口. 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark, 仍然不会输出
计算出水印值:2019-08-14 10:58:00
③ 输入测试数据:2019-08-14 10:55:00_dog 相当于延迟数据

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
+------------------------------------------+----+-----+

此时内存中的水印值是2019-08-14 10:58:00 有两个窗口的最大值已经小于水印值,则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态.
再次计算水印值,因为新计算出的水印值小于2019-08-14 10:58:00所以不更新水印值(水印值只能增大不能减小)

2.3.3 总结

  • Event time must be defined on a window or a timestamp 时间时间必须为时间戳
    -Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; 排序在流式处理的时候不支持
  • 输出模式必须是append或update. 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果. 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义
  • 在输出模式是append时, 必须设置 watermask 才能使用聚合操作. 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态
  • 在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态.
  • watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用. 但如果节点发送故障, 则可能延迟若干批次生效
  • withWatermark 必须使用与聚合操作中的时间戳列是同一列.df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 无效
  • withWatermark 必须在聚合之前调用f.groupBy(“time”).count().withWatermark(“time”, “1 min”) 无效

2.4 流数据去重 dropDuplicates

需求:根据用户名和性别去重

package com.gc.structured.streaming.day02import java.sql.Timestampimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputModeobject DropDuplicatesDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[2]").appName("DropDuplicatesDemo").getOrCreate()//读取socket 流数据import spark.implicits._spark.readStream.format("socket").option("host","hadoop102").option("port","9999").load().as[String]//zhangsan,10,man,2019-08-24 10:55:00.map(line=>{val splits = line.split(",")User(splits(0),splits(1).toInt,splits(2),Timestamp.valueOf(splits(3)))}).toDF.withWatermark("event_time", "2 minutes").dropDuplicates("name","sex").writeStream.outputMode(OutputMode.Append()).format("console").start().awaitTermination()}
}
case class User(name:String,age:Int,sex:String,event_time:Timestamp)

注意:

  • dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates
  • 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。
  • 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。

2.5 join操作

2.5.1 流和静态数据的Join

package com.gc.structured.streaming.day02.joinimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}//流和静态数据join
object StreamJoinStatic {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("StreamJoinStatic").master("local[4]").getOrCreate()// 从文件读取静态数据import  sparkSession.implicits._val personCsv: DataFrame = sparkSession.read.csv("D:\\sparkDemo")val ds = personCsv.toDF("id","name","age")// 读取流式数据val df: DataFrame = sparkSession.readStream.format("socket").option("host", "hadoop102").option("port", "9999").load().as[String].map(line => {val splits = line.split(",")(splits(0), splits(1))}).toDF("id", "sex")//join def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame =  // 第三个参数为连接类型 innerjoin  leftjoin rightjoin 默认是内连接inner joinval joinDframe: DataFrame = df.join(ds,"id")joinDframe.writeStream.outputMode("append").format("console").start().awaitTermination()}
}

文件数据:

1,李四,20
2,张三,25
3,王五,30

scoket实时输入数据:

1,man
2,wm

结果:

+---+---+----+---+
| id|sex|name|age|
+---+---+----+---+
|  1|man|李四| 20|
+---+---+----+---+
+---+---+----+---+
| id|sex|name|age|
+---+---+----+---+
|  2| wm|张三| 25|
+---+---+----+---+

2.5.2 流和流之间的Join

package com.gc.structured.streaming.day02.StreamJoinStreamimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}//流和流的join
object StreamJoinStream {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("StreamJoinStream").master("local[4]").getOrCreate()// 从文件读取静态数据import  sparkSession.implicits._// 读取流式数据val df: DataFrame = sparkSession.readStream.format("socket").option("host", "hadoop102").option("port", "9999").load().as[String].map(line => {val splits = line.split(",")(splits(0), splits(1))}).toDF("id", "sex")val df2: DataFrame = sparkSession.readStream.format("socket").option("host", "hadoop102").option("port", "10000").load().as[String].map(line => {val splits = line.split(",")(splits(0), splits(1))}).toDF("id", "name")// 流的join df.join(df2,"id").writeStream.format("console").outputMode("append").start().awaitTermination()}
}

往端口9999输入数据:

[guochao@hadoop102 ~]$ nc -lt 9999
1,man

往端口10000发送数据:

[guochao@hadoop102 ~]$ nc -lt 10000
1,zhangsan

两个流join后的结果:

+---+---+--------+
| id|sex|    name|
+---+---+--------+
|  1|man|zhangsan|
+---+---+--------+

不支持的操作:
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Outer joins between a streaming and a static Datasets are conditionally supported.

  • Full outer join with a streaming Dataset is not supported

  • Left outer join with a streaming Dataset on the right is not supported

  • Right outer join with a streaming Dataset on the left is not supported

  • Any kind of joins between two streaming Datasets is not yet supported.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(…) (see next section).

  • show() - Instead use the console sink (see next section).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

2.6 触发器

连续处理是2.3 引入, 它可以实现低至 1ms 的处理延迟. 并实现了至少一次(at-least-once)的语义.
微批处理模式虽然实现了严格一次(exactly-once)的语义, 但是最低有 100ms 的延迟.
对有些类型的查询, 可以切换到这个模式, 而不需要修改应用的逻辑.(不用更改 df/ds 操作)
若要切换到连续处理模式, 只需要更改触发器即可.

spark.readStream.format("rate").option("rowsPerSecond", "10").option("")spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").trigger(Trigger.Continuous("1 second"))  // only change in query.start()

连续处理模式支持的查询

  1. 操作: 支持 select, map, flatMap, mapPartitions, etc. 和 selections (where, filter, etc.). 不支持聚合操作
  2. 数据源:
    • kafka 所有选项都支持
    • rate source
  3. sink
    • 所有的 kafka 参数都支持
    • memory sink
    • console sink

2.7 输出接收器Sink

2.7.1 file Sink

存储输出到目录中 仅仅支持 append 模式

package com.gc.structured.streaming.day02.sinkimport org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}object FileSinkDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("FileSinkDemo").master("local[2]").getOrCreate()import sparkSession.implicits._val dFrame =sparkSession.readStream.format("socket").option("host","hadoop102").option("port","9999").load().as[String].map(line=>(line,line.reverse)).toDF("word","wordReverse")//将结果输出到指定的文件dFrame.writeStream.format("csv").outputMode(OutputMode.Append()).option("path","./data").option("checkpointLocation","./filesink") // 写的时候需要指定checkpoint 地址.start.awaitTermination()}
}

2.7.2 console Sink

package com.gc.structured.streaming.day02.sinkimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputModeobject ConsoleSinkDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("FileSinkDemo").master("local[2]").getOrCreate()import sparkSession.implicits._val dFrame =sparkSession.readStream.format("socket").option("host","hadoop102").option("port","9999").load().as[String].map(line=>(line,line.reverse)).toDF("word","wordReverse")//将结果输出到指定的文件dFrame.writeStream.format("console").outputMode(OutputMode.Append()).start.awaitTermination()}
}

2.7.3 kafka Sink

从socket中接收数据,对数据进行统计,将结果写出到kafka

package com.gc.structured.streaming.day02.sinkimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.OutputModeobject KafkaSinkDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("KafkaSinkDemo").master("local[2]").getOrCreate()import sparkSession.implicits._val dFrame =sparkSession.readStream.format("socket").option("host","hadoop102").option("port","9999").load().as[String].flatMap(_.split("\\W+")).map((_,1)).toDF("word","count").groupBy("word").count()val df: DataFrame = dFrame.as[(String,Long)].map(t=>t._1+t._2).toDF("value")//将结果输出到指定的文件df.writeStream.format("kafka").outputMode(OutputMode.Update()) // Complete 写数据会造成重复.option("kafka.bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092") // kafka 配置.option("topic", "wordTest") // kafka 主题.option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录.start.awaitTermination()}
}

2.7.4 memory Sink

package com.gc.structured.streaming.day02.sinkimport java.util.{Timer, TimerTask}import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}object MemorySinkDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[2]").appName("MemorySinkDemo").getOrCreate()import spark.implicits._val lines: DataFrame = spark.readStream.format("socket") // 设置数据源.option("host", "Hadoop102").option("port", 9999).loadval words: DataFrame = lines.as[String].flatMap(_.split("\\W+")).groupBy("value").count()val query: StreamingQuery = words.writeStream.outputMode("complete").format("memory") // memory sink.queryName("word") // 内存临时表名.start// 测试使用定时器执行查询表val timer = new Timer(true)val task: TimerTask = new TimerTask {override def run(): Unit = spark.sql("select * from word").show}timer.scheduleAtFixedRate(task, 0, 2000)query.awaitTermination()}
}

2.7.5 foreach Sink

foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出.

package com.gc.structured.streaming.day02.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
object ForeachSinkDemo {def main(args: Array[String]): Unit = {// 将wordcount 的结果写入mysqlval sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("ForeachSinkDemo").getOrCreate()import sparkSession.implicits._val df: DataFrame =sparkSession.readStream.format("socket").option("host","hadoop102").option("port",9999).load().as[String].flatMap(_.split("\\W+")).map((_,1)).toDF("word","count").groupBy("word").count()// 写到mysqldf.writeStream.outputMode("update").foreach(new ForeachWriter[Row] {// 一般用于 打开链接. 返回 false 表示跳过该分区的数据var conn:Connection=null;var ps: PreparedStatement = _override def open(partitionId: Long, epochId: Long): Boolean = {println("open ..." + partitionId + "  " + epochId)Class.forName("com.mysql.jdbc.Driver")conn= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/sparkdemo","root","root")val sql = "insert into wordcount values(?, ?) on duplicate key update word=?, count=?"ps = conn.prepareStatement(sql)conn != null && !conn.isClosed && ps != null}// 插入数据, 当有重复的 key 的时候更新override def process(value: Row): Unit = {println(value)ps.setString(1,value.getString(0))ps.setLong(2,value.getLong(1))ps.setString(3,value.getString(0))ps.setLong(4,value.getLong(1))ps.execute()}// 把数据写入到连接override def close(errorOrNull: Throwable): Unit ={ps.close()conn.close()}}).start().awaitTermination()}
}

结果:

mysql> select * from wordcount;
+----------+-------+
| word     | count |
+----------+-------+
| wangwu   |     1 |
| hello    |     1 |
| zhangsan |     1 |
| jiji     |     2 |
| word     |     1 |
| li       |     3 |
| a        |     3 |
| lisi     |     1 |
+----------+-------+

2.7.6 foreachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据.
将统计结果同时输出到本地文件和 mysql 中

package com.gc.structured.streaming.day02.sink
import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object ForeachBatchSinkDemo {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("ForeachSinkDemo").getOrCreate()import sparkSession.implicits._val df: DataFrame =sparkSession.readStream.format("socket").option("host","hadoop102").option("port",9999).load().as[String].flatMap(_.split("\\W+")).map((_,1)).toDF("word","count").groupBy("word").count()
//mysql 连接参数val props = new Properties()props.setProperty("user", "root")props.setProperty("password", "root")val query: StreamingQuery = df.writeStream.outputMode("complete").foreachBatch((dataFrame: DataFrame, batchId) => {  // 当前分区id, 当前批次idif (dataFrame.count() != 0) {dataFrame.cache()println(batchId)dataFrame.write.json(s"./$batchId") //写入文件dataFrame.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/sparkdemo", "wordcount", props) // 写入mysql}}).start()query.awaitTermination()}
}

Structured-Streaming编程练习知识点相关推荐

  1. Structured Streaming编程 Programming Guide

    Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...

  2. Structured Streaming 编程模型

    Structured Streaming 的核心思想是:把持续不断的流式数据当做一个不断追加的表. 这使得新的流式处理模型同批处理模型非常相像. 我们可以表示我们的流式计算类似于作用在静态数表上的标准 ...

  3. Structured Streaming详解

    目录 一.Structured Streaming概述 (1)Structured Streaming背景 (2)Structured Streaming概念 二.Structured Streami ...

  4. Structured Streaming总结

    参考: Spark官方文档 博客https://blog.csdn.net/u013468917/article/details/79643850?spm=1001.2014.3001.5501 0. ...

  5. Spark Streaming 转向 Structured Streaming

    导读 Spark 团队对 Spark Streaming 的维护将会越来越少,Spark 2.4 版本的 Release Note 里面甚至一个 Spark Streaming 相关的 ticket ...

  6. 【Spark分布式内存计算框架——Structured Streaming】2. Structured Streaming 核心设计与编程模型

    核心设计 2016年,Spark在2.0版本中推出了结构化流处理的模块Structured Streaming,核心设计如下: 第一点:Input and Output(输入和输出) Structur ...

  7. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

  8. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  9. 2021年大数据Spark(四十四):Structured Streaming概述

    Apache Spark在2016年的时候启动了Structured Streaming项目,一个基于Spark SQL的全新流计算引擎Structured Streaming,让用户像编写批处理程序 ...

  10. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

最新文章

  1. C语言基础知识(自己做个笔记,云储存一下)
  2. echarts js 删除框选数据_echarts同一页面中四个图表切换的js数据交互方法示例
  3. python自动测试e_python实现hive自动化测试
  4. Windows Phone的网络连接策略
  5. Flutter:删除所有已保存的shared_preferences首选项
  6. vc:如何从Internet上有效而稳定地下载文件
  7. mysql查询条数为零_mysql 查询每个月发布的文章数。月份无数据显示为0。如何写SQL呢...
  8. leetcode-237-删除链表中的节点
  9. centos 6.x 64位 运行32位程序
  10. 【转】谈谈c#中异步编程模型的变迁**
  11. c++ 单引号和双引号
  12. CSS选择器与Xpath常用语法及对比
  13. [Java] 变量里存的到底是什么
  14. 调用虚拟内存编译时的error处理
  15. 4.21 使用剪切蒙版命令制作可爱的皮球 [Illustrator CC教程]
  16. 2021版itunes不备份更新ios系统
  17. MTK6577+Android之TP(触摸屏)
  18. IEEE latex 编写lemma
  19. 如何用cmd安装Python库
  20. 如何使用界面控件DevExpress WinForms自带的UI模板?其实很简单

热门文章

  1. 原生app 嵌入百度h5人脸认证,活体检测方案, HUAWEI mate30 Pro 不兼容处理
  2. Fragment has not been attached yet Fragment 套 Fragment
  3. PDF如何在线压缩?PDF在线压缩方法介绍
  4. VS Code 使用火狐 FireFox 调试网页
  5. PPTV面试算法思考-最长对称子字符串
  6. 天池竞赛员工离职预测训练赛
  7. 人人车被曝近百员工聚集北京总部维权
  8. 【ACWing】247. 亚特兰蒂斯
  9. 求助:关于python tkinter模块 pack() 函数问题。请各位大神赐教!
  10. 武汉大学计算机学院卓越工程师班,大学里的三大重点班:实验班、基地班、卓越工程师班...