
  1. Spark Stream 缓存
  2. Checkpoint
  3. 案例

1. Spark Stream 缓存


def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner,filterFunc: ((K, V)) => Boolean): DStream[(K, V)] = ssc.withScope {val cleanedReduceFunc = ssc.sc.clean(reduceFunc)val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else Nonenew ReducedWindowedDStream[K, V](self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,windowDuration, slideDuration, partitioner)}


class ReducedWindowedDStream[K: ClassTag, V: ClassTag](parent: DStream[(K, V)],reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,filterFunc: Option[((K, V)) => Boolean],_windowDuration: Duration,_slideDuration: Duration,partitioner: Partitioner) extends DStream[(K, V)](parent.ssc) {//省略其它非关键代码//默认被缓存到内存当中// Persist RDDs to memory by default as these RDDs are going to be reused.super.persist(StorageLevel.MEMORY_ONLY_SER)reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)


2. Checkpoint机制

通过前期对Spark Streaming的理解,我们知道,Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误、JVM出错等与程序逻辑无关的错误(failures )具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的,它将足够多的信息checkpoint到某些具备容错性的存储系统如HDFS上,以便出错时能够迅速恢复。有两种数据可以chekpoint:

(1)Metadata checkpointing
将流式计算的信息保存到具备容错性的存储上如HDFS,Metadata Checkpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:
Configuration(配置信息) - 创建streaming应用程序的配置信息
DStream operations - 在streaming应用程序中定义的DStreaming操作
Incomplete batches - 在列队中没有处理完的作业

(2)Data checkpointing

具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing



3. 案例


import java.io.File
import java.nio.charset.Charsetimport com.google.common.io.Filesimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam/*** Counts words in text encoded with UTF8 received from the network every second.** Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>*   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive*   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data*   <output-file> file to which the word counts will be appended** <checkpoint-directory> and <output-file> must be absolute paths** To run this on your local machine, you need to first run a Netcat server**      `$ nc -lk 9999`** and run the example as**      `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \*              localhost 9999 ~/checkpoint/ ~/out`** If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from* the checkpoint data.** Refer to the online documentation for more details.*/
object RecoverableNetworkWordCount {def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {//程序第一运行时会创建该条语句,如果应用程序失败,则会从checkpoint中恢复,该条语句不会执行println("Creating new context")val outputFile = new File(outputPath)if (outputFile.exists()) outputFile.delete()val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sparkConf, Seconds(1))ssc.checkpoint(checkpointDirectory)//将socket作为数据源val lines = ssc.socketTextStream(ip, port)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")println(counts)println("Appending to " + outputFile.getAbsolutePath)Files.append(counts + "\n", outputFile, Charset.defaultCharset())})ssc}//将String转换成Intprivate object IntParam {def unapply(str: String): Option[Int] = {try {Some(str.toInt)} catch {case e: NumberFormatException => None}}
}def main(args: Array[String]) {if (args.length != 4) {System.err.println("You arguments were " + args.mkString("[", ", ", "]"))System.err.println("""|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>|     <output-file>. <hostname> and <port> describe the TCP server that Spark|     Streaming would connect to receive data. <checkpoint-directory> directory to|     HDFS-compatible file system which checkpoint data <output-file> file to which the|     word counts will be appended||In local mode, <master> should be 'local[n]' with n > 1|Both <checkpoint-directory> and <output-file> must be absolute paths""".stripMargin)System.exit(1)}val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args//getOrCreate方法,从checkpoint中重新创建StreamingContext对象或新创建一个StreamingContext对象val ssc = StreamingContext.getOrCreate(checkpointDirectory,() => {createContext(ip, port, outputPath, checkpointDirectory)})ssc.start()ssc.awaitTermination()}




Creating new context
15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Counts at time 1448896840000 ms []
Appending to /root/out2
15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers
Counts at time 1448896850000 ms [(Spark,1), (Context,1)]


Counts at time 1448897070000 ms []
Appending to /root/out2
Counts at time 1448897080000 ms []
Appending to /root/out2
Counts at time 1448897090000 ms []
Appending to /root/out2
15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers
[Stage 8:>                                                          (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)]
Appending to /root/out2

Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制相关推荐

  1. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  2. 大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    文章目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求

  3. Spark Streaming 实战案例(一)

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-opera ...

  4. Spark Streaming 实战案例(二) Transformation操作

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...

  5. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

  6. spark—SQL实战案例

    学习内容 一.sparkSQL在IDEA的使用 1.环境配置 2.快速入门 二.sparkSQL实战案例 1.数据准备 2.案例分析 3.功能实现 4.代码实现 一.sparkSQL在IDEA的使用 ...

  7. Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...

  8. Spark入门实战系列--1.Spark及其生态圈简介

    1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年 ...

  9. Spark入门实战系列--4.Spark运行架构

    注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1. Spark运行架构 1.1 术语定义 lApplication:Spark Applic ...


