主要内容

  1. Spark Stream 缓存
  2. Checkpoint
  3. 案例

1. Spark Stream 缓存

通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一样,也可以将流式数据持久化到内容当中,采用的同样是persisit方法,调用该方法后DStream将持久化所有的RDD数据。这对于一些需要重复计算多次或数据需要反复被使用的DStream特别有效。像reduceByWindow、reduceByKeyAndWindow等基于窗口操作的方法,它们默认都是有persisit操作的。reduceByKeyAndWindow方法源码具体如下:

def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner,filterFunc: ((K, V)) => Boolean): DStream[(K, V)] = ssc.withScope {val cleanedReduceFunc = ssc.sc.clean(reduceFunc)val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else Nonenew ReducedWindowedDStream[K, V](self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,windowDuration, slideDuration, partitioner)}

从上面的方法来看,它最返回的是一个ReducedWindowedDStream对象,跳到该类的源码中可以看到在其主构造函数中包含下面两段代码:

private[streaming]
class ReducedWindowedDStream[K: ClassTag, V: ClassTag](parent: DStream[(K, V)],reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,filterFunc: Option[((K, V)) => Boolean],_windowDuration: Duration,_slideDuration: Duration,partitioner: Partitioner) extends DStream[(K, V)](parent.ssc) {//省略其它非关键代码//默认被缓存到内存当中// Persist RDDs to memory by default as these RDDs are going to be reused.super.persist(StorageLevel.MEMORY_ONLY_SER)reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
}

通过上面的代码我们可以看到,通过窗口操作产生的DStream不需要开发人员手动去调用persist方法,Spark会自动帮我们将数据缓存当内存当中。同一般的RDD类似,DStream支持的persisit级别为:

2. Checkpoint机制

通过前期对Spark Streaming的理解,我们知道,Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误、JVM出错等与程序逻辑无关的错误(failures )具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的,它将足够多的信息checkpoint到某些具备容错性的存储系统如HDFS上,以便出错时能够迅速恢复。有两种数据可以chekpoint:

(1)Metadata checkpointing
将流式计算的信息保存到具备容错性的存储上如HDFS,Metadata Checkpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:
Configuration(配置信息) - 创建streaming应用程序的配置信息
DStream operations - 在streaming应用程序中定义的DStreaming操作
Incomplete batches - 在列队中没有处理完的作业

(2)Data checkpointing
将生成的RDD保存到外部可靠的存储当中,对于一些数据跨度为多个bactch的有状态tranformation操作来说,checkpoint非常有必要,因为在这些transformation操作生成的RDD对前一RDD有依赖,随着时间的增加,依赖链可能会非常长,checkpoint机制能够切断依赖链,将中间的RDD周期性地checkpoint到可靠存储当中,从而在出错时可以直接从checkpoint点恢复。

具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

Checkpointing具体的使用方式时通过下列方法:

//checkpointDirectory为checkpoint文件保存目录
streamingContext.checkpoint(checkpointDirectory)

3. 案例

程序来源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
进行了适量修改

import java.io.File
import java.nio.charset.Charsetimport com.google.common.io.Filesimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam/*** Counts words in text encoded with UTF8 received from the network every second.** Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>*   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive*   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data*   <output-file> file to which the word counts will be appended** <checkpoint-directory> and <output-file> must be absolute paths** To run this on your local machine, you need to first run a Netcat server**      `$ nc -lk 9999`** and run the example as**      `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \*              localhost 9999 ~/checkpoint/ ~/out`** If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from* the checkpoint data.** Refer to the online documentation for more details.*/
object RecoverableNetworkWordCount {def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {//程序第一运行时会创建该条语句,如果应用程序失败,则会从checkpoint中恢复,该条语句不会执行println("Creating new context")val outputFile = new File(outputPath)if (outputFile.exists()) outputFile.delete()val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sparkConf, Seconds(1))ssc.checkpoint(checkpointDirectory)//将socket作为数据源val lines = ssc.socketTextStream(ip, port)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")println(counts)println("Appending to " + outputFile.getAbsolutePath)Files.append(counts + "\n", outputFile, Charset.defaultCharset())})ssc}//将String转换成Intprivate object IntParam {def unapply(str: String): Option[Int] = {try {Some(str.toInt)} catch {case e: NumberFormatException => None}}
}def main(args: Array[String]) {if (args.length != 4) {System.err.println("You arguments were " + args.mkString("[", ", ", "]"))System.err.println("""|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>|     <output-file>. <hostname> and <port> describe the TCP server that Spark|     Streaming would connect to receive data. <checkpoint-directory> directory to|     HDFS-compatible file system which checkpoint data <output-file> file to which the|     word counts will be appended||In local mode, <master> should be 'local[n]' with n > 1|Both <checkpoint-directory> and <output-file> must be absolute paths""".stripMargin)System.exit(1)}val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args//getOrCreate方法,从checkpoint中重新创建StreamingContext对象或新创建一个StreamingContext对象val ssc = StreamingContext.getOrCreate(checkpointDirectory,() => {createContext(ip, port, outputPath, checkpointDirectory)})ssc.start()ssc.awaitTermination()}
}

输入参数配置如下:

运行状态图如下:

首次运行时:

//创建新的StreamingContext
Creating new context
15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Counts at time 1448896840000 ms []
Appending to /root/out2
15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers
Counts at time 1448896850000 ms [(Spark,1), (Context,1)]

手动将程序停止,然后重新运行

//这时从checkpoint目录中读取元数据信息,进行StreamingContext的恢复
Counts at time 1448897070000 ms []
Appending to /root/out2
Counts at time 1448897080000 ms []
Appending to /root/out2
Counts at time 1448897090000 ms []
Appending to /root/out2
15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers
[Stage 8:>                                                          (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)]
Appending to /root/out2

Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制相关推荐

  1. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  2. 大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    文章目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求

  3. Spark Streaming 实战案例(一)

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-opera ...

  4. Spark Streaming 实战案例(二) Transformation操作

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...

  5. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

  6. spark—SQL实战案例

    学习内容 一.sparkSQL在IDEA的使用 1.环境配置 2.快速入门 二.sparkSQL实战案例 1.数据准备 2.案例分析 3.功能实现 4.代码实现 一.sparkSQL在IDEA的使用 ...

  7. Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...

  8. Spark入门实战系列--1.Spark及其生态圈简介

    1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年 ...

  9. Spark入门实战系列--4.Spark运行架构

    注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1. Spark运行架构 1.1 术语定义 lApplication:Spark Applic ...

最新文章

  1. tomcat+bean例子
  2. strtok_r 和 strsep 使用实例
  3. 牛客15324 用来作弊的药水
  4. 复杂网络环境下的访问控制技术
  5. 正则表达式和Java编程语言1zz
  6. 远程无法连接数据库的问题
  7. linux ftp 150 无响应,FTP遇到150无响应
  8. 区块链 以太坊 智能合约 运行原理和开发实例
  9. vue项目中axios请求网络接口封装
  10. 计算机本地局域网不通,局域网不通解决方法
  11. 目前下载VS2017你可能会遇到这个坑
  12. JS逆向之网易云音乐
  13. win10自带图片出现文件系统错误 (-2147219196)的修复方法
  14. 网站被劫持了怎么办?
  15. Java给PNG透明图片加水印,亲测可用
  16. 数据结构和算法基本介绍和概念
  17. Deep Audio-Visual Speech Recognition翻译
  18. (转)iOS Wow体验 - 第四章 - 为应用的上下文环境而设计
  19. matlab中==、~=、的含义
  20. 世界上排名前100的英文歌详细名单及介绍

热门文章

  1. java获取进程端口_查看进程的端口号
  2. VRRP——虚拟路由器冗余协议
  3. linux怎么打开q7后缀的文件,ZQ7 文件扩展名: 它是什么以及如何打开它?
  4. layui分页limit不显示_layui table分页 page为false时,limit问题
  5. idc机房建设费用_idc机房服务器带宽租用费用
  6. 多核处理器_多核处理器还能走多远?2050年用上1024核CPU
  7. dataframe保存为txt_如何批量查找并修改替换 Word、PPT、Excel、PDF、TXT等文件的内容...
  8. python list map成员排序_python – 同时对多个列表进行排序
  9. 在react里写原生js_小程序原生开发与第三方框架选择
  10. java 仅有类名 构造类_java – 这个设计模式有名字吗? (具有仅调用构造函数的实现的基类)...