文章目录

  • Spark内存计算框架
    • Spark Streaming
      • Checkpoint
        • 1. checkpoint的基本介绍
        • 2. 什么时候需要使用checkpoint
        • 3. 如何使用checkpoint
      • Spark Streaming和Spark SQL整合
      • Spark Streaming容错
        • 1. 节点失败容错
        • 2. 数据丢失如何处理
        • 3. Task执行很慢容错
      • 优雅关闭
      • Spark Streaming整合kafka
        • 1. Spark Streaming整合kafka-0-8
          • 方式一:Receiver-based Approach【不推荐使用】
          • 方式二:Direct Approach(NoReceivers)
        • 2. Spark Streaming与kafka-0-10整合
        • 3. 解决SparkStreaming与kafka-0.8版本正好数据不丢失方案
        • 4. Spark Streaming如何保证exactly-once

Spark内存计算框架

Spark Streaming

Checkpoint

1. checkpoint的基本介绍

  • checkpoint 是 SparkStreaming 当中为了解决流式处理程序意外停止造成的数据丢失问题,checkpoint 的目的是保证长时间运行的任务在意外挂掉之后,能够在拉起来时不丢失数据。
  • checkpoint 中包含两种数据:
    • metadata 元数据信息:用户恢复 Driver 端的数据

      • 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统,用来恢复 Driver,元数据包括:配置(用户创建该 Streaming Application的所有配置)、DStream(一系列的操作)、未完成的batches(那些提交了 job 但尚未执行或未完成的batches)
    • data:保存已生成的 RDDs 至可靠的存储;这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖随时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。

2. 什么时候需要使用checkpoint

  • 满足以下任一条件:

    • 使用了 stateful 转换:如果 Application 中使用了 updateStateByKey 或 reduceByKeyAndWindow 等 stateful 操作,必须提供 checkpoint 目录来允许定时的 RDD 进行 checkpoint。
    • 希望从意外中恢复 Driver
  • 如果 Streaming App 没有 stateful 操作,也允许 Driver 挂掉之后再次重启的进度丢失,就没有启用 checkpoint 的必要了。

3. 如何使用checkpoint

  • 启用 checkpoint,需要设置一个支持容错的、可靠的文件系统(如HDFS、S3等)目录来保存 checkpoint 数据。
  • 通过调用 streamingContext.checkpoint(checkpointDirectory) 来完成,另外,如果你想让你的 Application 能从 Driver 失败中恢复,你的 Application 要满足:
    • 若 Application 为首次重启,将创建一个新的 StreamContext 实例;
    • 若 Application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例;
    • 通过 StreamingContext.getOrCreate 可以达到目的。
  • checkpoint 不仅仅可以保存运行结果中的数据,还可以存储 Driver 端的信息,通过 checkpoint 可以实现 Driver 端的高可用。代码实现如下:
object Case09_DriverHAWordCount {val checkpointPath = "hdfs://node01:8020/checkpoint"def creatingFunc(): StreamingContext = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint(checkpointPath)val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)result.print()ssc}/*** @param currentValues  表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)* @param historyValues  表示在之前所有批次中每个单词出现的总次数 (hadoop,100)*/def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {val newValue: Int = currentValues.sum + historyValues.getOrElse(0)Some(newValue)}def main(args: Array[String]): Unit = {val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, creatingFunc _)ssc.start()ssc.awaitTermination()}
}
  • 如果 checkpointDirectory 存在,那么 context 将导入 checkpoint 数据;如果目录不存在,函数 functionToCreateContext 将被调用并创建新的 context。
  • 除了调用 getOrCreate 外,还需要你的集群模式执行 Driver 挂掉之后重启之。
    • 例如,在 yarn 模式下,Driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点启动一个新的 ApplicationMaster。
  • 需要注意的是,随着 Streaming Application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置 checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的 5~10 倍。

Spark Streaming和Spark SQL整合

  • pom.xml 文件添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version>
</dependency>
  • 代码开发:
object Case10_WordCountStreamingAndSql {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 2. 创建StreamingContext对象val ssc = new StreamingContext(sparkConf, Seconds(1))// 3. 接收Socket数据val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)// 4. 对数据进行处理val words: DStream[String] = socketTextStream.flatMap(_.split(" "))// 5. 对DStream进行处理,将RDD转换成DataFramewords.foreachRDD(rdd => {// 获取 SparkSessionval sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import sparkSession.implicits._val dataFrame: DataFrame = rdd.toDF("word")// 将dataFrame注册成表dataFrame.createOrReplaceTempView("words")// 统计每个单词出现的次数val result: DataFrame = sparkSession.sql("select word, count(*) as count from words group by word")// 展示结果result.show()})// 6. 开启流式计算ssc.start()ssc.awaitTermination()}
}

Spark Streaming容错

1. 节点失败容错

  • SparkStreaming运行流程:

  • 当一个 Executor 失败时:会将 task 重新发送到备份的数据块所在的 Executor

    • Tasks和Receiver自动的重启,不需要做任何的配置

  • 当 Driver 失败时:使用 checkpoint 机制恢复失败的 Driver

  • 使用 checkpoint 机制,会定期将 Driver 信息写入到 HDFS 中

  • 步骤一:设置自动重启 Driver 程序
# Standalone: 在spark-submit提交任务时,增加两个参数 `--deploy-mode cluster` 和 `--supervise`
spark-submit \
--master spark://node01:7077 \
--deploy-mode cluster \
--supervise \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar# Yarn: 在spark-submit提交任务时,增加参数 `--deploy-mode cluster`,并设置 `yarn.resourcemanager.am.max-attemps`
spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar<property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value><description>The maximum number of application master execution attempts.</description>
</property>
  • 步骤二:设置 HDFS 的 checkpoint 目录
streamingContext.checkpoint(hdfsDirectory)
  • 步骤三:代码实现
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...)   // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory)   // set checkpoint directoryssc
}// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...// Start the context
context.start()
context.awaitTermination()

2. 数据丢失如何处理

  • 可以利用 WAL 机制,将数据写入到 HDFS 中,这样当发生节点宕机时,可以从 WAL 中恢复

  • 步骤一:设置 checkpoint 目录
streamingContext.checkpoint(hdfsDirectory)
  • 步骤二:开启 WAL 日志
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  • 步骤三:需要 reliable receiver

    • 当数据写完 WAL 后,才告诉数据源已经消费,对于没有告诉数据的数据,可以从数据源中重新消费数据。
  • 步骤四:取消备份
    • 使用 StorageLevel.MEMORY_AND_DISK_SER 来存储数据源,不需要后缀为 2 的策略,因为 HDFS 已经是多副本了。
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999, StorageLevel.MEMORY_AND_DISK_SER)

  • Reliable Receiver:当数据接收到,并且已经备份存储后,再发送回执给数据源;
  • Unreliable Receiver:不发送回执给数据源
  • WAL:使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
    • 对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,向数据源确认接收到数据,然后把数据存储在executor的内存中,然后在exector上运行任务处理这些数据。
    • 如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性。
    • 此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样driver重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。

3. Task执行很慢容错

  • 开启推测机制:假设总的 task 有 10 个,成功的 task 数量 > spark.speculation.quantile * 10,正在运行的 task 的运行时间 > spark.speculation.multoplier * 成功运行task的平均时间,则这个正在运行的 task 需要重新等待调度。
# 每隔一段时间来检查有哪些正在运行的 task 需要重新调度
spark.speculation = true
# 推测间隔时间
spark.speculation.interval = 100ms
# 推测数量阈值
spark.speculation.quantile = 0.75
spark.speculation.multoplier = 1.5

  • 在分布式环境中,导致某个 task 执行缓慢的情况有很多:负载不均、程序 bug、资源不均、数据倾斜等,而且这些情况在分布式计算环境中是常态。Speculative Task(推测Task) 这种以空间换时间的思路设计对计算资源是种压榨,同时,如果 Speculative Task 本身也变成了 Slow Task 会导致情况进一步恶化。

优雅关闭

  • 流式任务需要 7 * 24h 执行,但有时涉及到代码升级,需要主动停止程序。但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅关闭就显得至关重要了。
  • 使用外部文件系统来控制内部程序关闭,代码实现如下:
object Case11_GracefullyShutdown {private val HDFS: String = "hdfs://node01:8020"private val CHECKPOINT: String = HDFS + "/checkpoint"def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => createSsc())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}def createSsc(): _root_.org.apache.spark.streaming.StreamingContext = {val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {Some(values.sum + status.getOrElse(0))}val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)// 设置优雅关闭sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint(CHECKPOINT)val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)val wordAndCount: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)wordAndCount.print()ssc}class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit = {val fs: FileSystem = FileSystem.get(new URI(HDFS), new Configuration(), "hadoop")while (true) {try {TimeUnit.SECONDS.sleep(5)} catch {case e: InterruptedException => e.printStackTrace()}val state: StreamingContextState = ssc.getState()val bool: Boolean = fs.exists(new Path(HDFS + "/stopSpark"))if (bool && state == StreamingContextState.ACTIVE) {ssc.stop(stopSparkContext = true, stopGracefully = true)System.exit(0)}}}}
}

Spark Streaming整合kafka

  • 在消费 kafka 数据时,可以有三种语义保证:

    • at most one 至多一次:数据最多处理一次或没有被处理,有可能造成数据丢失的情况;
    • at least once 至少一次:数据最少被处理一次,有可能存在重复消费的问题;
    • exactly once 精准一次:数据消费一次且仅一次

1. Spark Streaming整合kafka-0-8

  • SparkStreaming整合Kafka官方文档

方式一:Receiver-based Approach【不推荐使用】
  • 此方法使用Receiver接收数据,Receiver是使用Kafka高级消费者API实现的。
  • 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。
  • 但是在默认配置下,此方法可能会在失败时丢失数据(请参阅接收器可靠性。)
  • 为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(WAL 在Spark 1.2中引入)。
  • 这将同步保存所有收到的Kafka数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。
  • pom.xml 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.4.8</version>
</dependency>
  • 核心代码:
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  • 代码示例:
/*** sparkStreaming使用kafka 0.8API基于recevier来接受消息*/
object Case12_KafkaReceiver08 {private val zkQuorum = "192.168.254.120:2181"private val groupId = "KafkaReceiver08"private val topics = Map("test" -> 1)def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 设置checkpoint,将接收到的数据持久化写入到HDFSssc.checkpoint("hdfs://node01:8020/wal")// 接收kafka数据val receiverDstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)// 获取kafka的topic数据val data: DStream[String] = receiverDstream.map(_._2)// 单词计算val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)result.print()ssc.start()ssc.awaitTermination()}
}
  • 创建 kafka 的 topic 并发送数据
# 创建topic
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper 192.168.254.120:2181
# 生产发送数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

如果程序运行过程中,出现错误java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V这是spark-core与kafka-client中lz4版本不一致导致的,可用以下方式在程序中指定其他的压缩算法进行解决
new SparkConf().set(“spark.io.compression.codec”, “snappy”)

方式二:Direct Approach(NoReceivers)
  • 这种新的不基于Receiver的,是直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。
  • 替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。
  • 当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
  • 这种方式有如下优点:
    • 简化并行读取:

      • 如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。
      • Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。
      • 所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
    • 高性能:
      • 如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下。
      • 因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。
      • 而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
    • 一次且仅一次的事务机制:
      • 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。
      • 这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性。
      • 但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
    • 降低资源:
      • Direct不需要Receivers,其申请的Executors全部参与到计算任务中。
      • 而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。
      • 因此相同的资源申请,Direct 能够支持更大的业务。
    • 降低内存:
      • Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据。
      • 对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。
      • 而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
      • 实际应用中我们可以把原先的10G降至现在的2-4G左右。
    • 鲁棒性更好:
      • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。
      • Direct 则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
  • 代码示例:
/*** sparkStreaming使用kafka 0.8API基于Direct直连来接受消息* spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。*/
object Case13_KafkaDirect08 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "KafkaDirect08"private val topics = Set("test")def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 接收kafka数据val kafkaParams = Map("metadata.broker.list" -> kafkaCluster,"group.id" -> groupId)// 使用direct直连的方式接收数据val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)// 获取kafka的topic数据val data: DStream[String] = kafkaDstream.map(_._2)// 单词计算val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)result.print()ssc.start()ssc.awaitTermination()}
}
  • 要想保证数据不丢失,最简单的就是靠checkpoint的机制;但是checkpoint机制有个特点,如果代码升级了,checkpoint机制就失效了。
  • 所以如果想实现数据不丢失,那么就需要自己管理offset。

2. Spark Streaming与kafka-0-10整合

  • 支持0.10版本,或者更高的版本【推荐使用这个版本】
  • pom.xml 添加依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId><version>${spark.version}</version>
</dependency>
  • 代码示例:
/*** sparkStreaming使用kafka 1.0API基于Direct直连来接受消息*/
object Case14_KafkaDirect10 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "KafkaDirect10"private val topics = Set("test")def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)// 1. 创建StreamingContext对象val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))// 2. 使用Direct接收kafka数据val kafkaParams = Map("bootstrap.servers" -> kafkaCluster,"group.id" -> groupId,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"enable.auto.commit" -> "false")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,// 数据本地性策略LocationStrategies.PreferConsistent,// 指定要订阅的topicConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 3. 对数据进行处理// 注意:如果你想获取到消息消费的偏移,这里需要拿到最开始的这个DStream进行操作// 如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不再保存对应的消息的偏移量kafkaDStream.foreachRDD(rdd => {// 获取消息内容val dataRdd: RDD[String] = rdd.map(_.value())// 打印dataRdd.foreach(line => println(line))// 4. 提交偏移量,将偏移量信息添加到kafka中val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangeskafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})// 5. 开启流式计算ssc.start()ssc.awaitTermination()}
}

3. 解决SparkStreaming与kafka-0.8版本正好数据不丢失方案

  • 一般企业来说无论你是使用哪一套api去消费kafka中的数据,都是设置手动提交偏移量。因为自动提交(默认60s提交一次)偏移量风险比较高,可能会出现数据丢失或者数据被重复处理:

    • 数据处理失败了,自动提交了偏移量,会出现数据的丢失;
    • 数据处理成功了,自动提交偏移量失败,之后消费时会从失败的位置再次消费,导致数据重复处理。
  • 一般来说就手动去提交偏移量,将偏移量的提交通过消费者程序自己去维护,示意图如下:

  • 代码示例,偏移量存入ZK:
/*** sparkStreaming使用kafka 0.8API基于Direct直连来接受消息* 手动将偏移量数据保存到ZK中*/
object Case15_KafkaManageOffset08 {private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val zkQuorum = "192.168.254.120:2181"private val groupId = "consumer-manager"private val topic = "wordcount"private val topics = Set(topic)def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")// 开启 WAL 机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf, Seconds(2))// 创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量val topicDirs = new ZKGroupTopicDirs(groupId, topic)// 获取 ZK 中的路径 "/consumers/consumer-manager/offsets/wordcount"val zkTopicPath = topicDirs.consumerOffsetDir// 构造一个ZK的客户端,用来读写偏移量数据val zkClient = new ZkClient(zkQuorum)// 准备kafka的参数val kafkaParams = Map("metadata.broker.list" -> kafkaCluster,"group.id" -> groupId,"enable.auto.commit" -> "false")// 定义kafkaStream流var kafkaStream: InputDStream[(String, String)] = null// 获取指定的zk节点的子节点个数val childrenNum = zkClient.countChildren(zkTopicPath)// 判断是否保存过数据: 根据子节点的数量是否为0if (childrenNum > 0) {var fromOffsets: Map[TopicAndPartition, Long] = Map()for (i <- 0 until childrenNum) {// 获取子节点val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")val tp = TopicAndPartition(topic, i)// 获取数据偏移量: 将不同分区内的数据偏移量保存到map集合中fromOffsets += (tp -> partitionOffset.toLong)}// 泛型中 key, kafka中的key   value:hello tom hello jerry// 创建函数 解析数据 转换为(topic_name, message)的元组val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())// 利用底层的API创建DStream: 采用直连的方式(若之前已经消费了,则从指定的位置消费)kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {// 利用底层的API创建DStream 采用直连的方式(若之前没有消费,则这是第一次读取数据)// zk中没有子节点数据,就是第一次读取数据,直接创建直连对象kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}// 直接操作kafkaStream// 依次迭代DStream中的kafkaRDD, 只有kafkaRDD才可以强转为HasOffsetRanges, 从中获取数据偏移量信息// 之后是操作的RDD, 不能够直接操作DStream, 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息kafkaStream.foreachRDD(kafkaRDD => {// 强转为HasOffsetRanges, 获取offset偏移量数据val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges// 获取数据val lines: RDD[String] = kafkaRDD.map(_._2)// 接下来就是对RDD进行操作 触发actionlines.foreachPartition(partition => partition.foreach(x => println(x)))// 手动提交偏移量到zk集群上for (o <- offsetRanges) {// 拼接zk路径val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"// 将 partition 的偏移量数据 offset 保存到zookeeper中ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}})ssc.start()ssc.awaitTermination()}
}

4. Spark Streaming如何保证exactly-once

  • 一个流式计算如果要保证 exactly-once,首先要有三点要求:

    • source 支持 reply;
    • 流计算引擎本身处理能保证 exactly-once;
    • sink支持幂等或事务更新
  • 实现数据被处理且仅被处理一次,就需要实现数据结果保存操作与偏移量保存操作再同一个事务中,或者实现幂等操作。
  • 也就是说如果想让 SparkStreaming 的程序保证 exactly-once,需要从以下三个角度出发:
    • 接收数据:从Source中接收数据,保证 exactly-once;
    • 转换数据:用DStream和RDD算子转换,保证 exactly-once;
    • 存储数据: 将结果保存到外部系统,保证 exactly-once。
  • scalikejdbc依赖:
<dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc_${scala.version}</artifactId><version>${scalikejdbc.version}</version>
</dependency>
<dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc-config_${scala.version}</artifactId><version>${scalikejdbc.version}</version>
</dependency>
  • 示例代码:
/*** SparkStreaming EOS:* Input: kafka* Process: SparkStreaming* Output: MySQL* 保证EOS:* 1、偏移量自己管理,即enable.auto.commit=false,这里保存在Mysql中* 2、使用createDirectStream* 3、事务输出: 结果存储与Offset提交在Driver端同一Mysql事务中*/
class Case16_EOSKafkaMysqlAtomic {@transient lazy val log = LoggerFactory.getLogger(this.getClass)private val kafkaCluster = "node01:9092,node02:9092,node03:9092"private val groupId = "consumer-eos"private val topic = "topic_eos"private val mysqlUrl = "jdbc:mysql://node01:3306/test"private val mysqlUsr = "root"private val mysqlPwd = "123456"def main(args: Array[String]): Unit = {// 准备kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> kafkaCluster,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean),"group.id" -> groupId)// 数据库连接池ConnectionPool.singleton(mysqlUrl, mysqlUsr, mysqlPwd)val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(5))// 1、初次启动或重启时,从指定的Partition、Offset构建TopicPartition// 2、运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中// 3、后期Kafka Topic分区扩展,在运行过程中不能自动感知val initOffset = DB.readOnly(implicit session => {sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${groupId}".map(item => new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset")).list().apply().toMap})// CreateDirectStream: 从指定的Topic、Partition、Offset开始消费val sourceDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Assign[String, String](initOffset.keys, kafkaParams, initOffset))sourceDStream.foreachRDD(rdd => {if (!rdd.isEmpty()) {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesoffsetRanges.foreach(offsetRange => {log.info(s"Topic: ${offsetRange.topic}, Group: ${groupId}, Partition: ${offsetRange.partition}, fromOffset: ${offsetRange.fromOffset}, untilOffset: ${offsetRange.untilOffset}")})// 统计分析val sparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()import sparkSession.implicits._val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS())dataFrame.createOrReplaceTempView("tmpTable")val result = sparkSession.sql("""| select eventTimeMinute, language, count(1) pv, count(distinct(userID)) uv| from (select *, substr(eventTime,0,16) eventTimeMinute from tmpTable) as tmp| group by eventTimeMinute, language""".stripMargin).collect()// 在Driver端存储数据、提交Offset,结果存储与Offset提交在同一事务中原子执行,这里将偏移量保存在Mysql中DB.localTx(implicit session => {result.foreach(row => {sql"""insert into twitter_pv_uv (eventTimeMinute,language,pv,uv) values (${row.getAs[String]("eventTimeMinute")},${row.getAs[String]("language")},${row.getAs[Long]("pv")},${row.getAs[Long]("uv")},) on duplicate key update pv = pv, uv = uv""".update.apply()})// offset 提交offsetRanges.foreach(offsetRange => {val affectedRows =sql"""update kafka_topic_offset set offset = ${offsetRange.untilOffset}where topic = ${topic} and `group` = ${groupId} and `partition` = ${offsetRange.partition} and offset = ${offsetRange.fromOffset}""".update.apply()if (affectedRows != 1) {throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")}})})}})ssc.start()ssc.awaitTermination()}
}

大数据高级开发工程师——Spark学习笔记(10)相关推荐

  1. 大数据高级开发工程师——Spark学习笔记(9)

    文章目录 Spark内存计算框架 Spark Streaming Spark Streaming简介 Spark Streaming架构流程 什么是DStream DStream算子操作 1. Tra ...

  2. 大数据高级开发工程师——Spark学习笔记(7)

    文章目录 Spark内存计算框架 Spark SQL SparkSQL架构设计 1. SparkSQL的架构设计实现 2. Catalyst执行过程 SQL 解析阶段 Parser 绑定逻辑计划 An ...

  3. 大数据高级开发工程师——Spark学习笔记(8)

    文章目录 Spark内存计算框架 Spark SQL Spark的动态资源划分 1. Executor动态调整范围? 2. 超时被杀的Executor中持久化数据如何处理? 3. 如何开启Spark的 ...

  4. 大数据高级开发工程师——Spark学习笔记(6)

    文章目录 Spark内存计算框架 Spark SQL SparkSQL概述 1. SparkSQL的前世今生 2. 什么是 SparkSQL SparkSQL的四大特性 1. 易整合 2. 统一的数据 ...

  5. 大数据高级开发工程师——Spark学习笔记(4)

    文章目录 Spark内存计算框架 Spark Core Spark的shuffle过程 1. HashShuffleManager 未经优化的HashShuffleManager 经过优化的HashS ...

  6. 大数据高级开发工程师——HBase学习笔记(2)

    文章目录 大数据数据库之HBase HBase架构原理 HBase的数据存储原理 HBase读数据流程 HBase写数据流程 HBase的flush.compact机制 Flush触发条件 1. Me ...

  7. 大数据高级开发工程师——Hive学习笔记(2)

    文章目录 Hive提高篇 Hive的使用 Hive的分桶表 1. 分桶表的原理 2. 分桶表的作用 3. 案例演示 Hive数据导入 1. 直接向表中插入数据(强烈不推荐使用) 2. 通过load加载 ...

  8. 大数据高级开发工程师——HBase学习笔记(3)

    文章目录 Phoenix Phoenix介绍 什么是Phoenix Phoenix底层原理 安装部署 下载安装 配置环境变量 重启hbase集群 验证是否成功 Phoenix使用 批处理方式 命令行方 ...

  9. 大数据高级开发工程师——Hadoop学习笔记(3)

    文章目录 Hadoop进阶篇 HDFS:Hadoop分布式文件系统 NameNode和SecondaryNameNode功能剖析 1. NameNode和SecondaryNameNode解析 2. ...

最新文章

  1. python爬虫执行scrapy crawl demo出现: import win32api ModuleNotFoundError: No module named 'win32api'错误
  2. 05. 取SQL分组中的某几行数据
  3. 树莓派进阶之路 (019) - 树莓派通过filezilla,samba与PC文件共享(转)
  4. Linux学习之系统编程篇:ps 和 kill 命令以及父子进程间数据共享模式
  5. Windows Phone xml数据的解析与绑定
  6. 游戏数仓分析(一)数据准备阶段
  7. 为什么要学jquery
  8. 扩展WCF的消息分发行为
  9. lodop 小票排版_lodop+art-template实现web端漂亮的小票样式打印
  10. DB2中select top 用法
  11. Linux流量监控工具 – iftop
  12. 树莓派超声波车牌识别系统
  13. nginx下部署showdoc
  14. Django REST framework 的快速入门教程
  15. ecstore 定时任务配置
  16. 【答题卡识别】基于matlab GUI hough变换答题卡判定与成绩统计【含Matlab源码 752期】
  17. python3两个三阶矩阵相乘_矩阵相乘的实现-python
  18. oracle11g跟踪,Oracle 11g DRCP连接跟踪配置
  19. 前缀树——以Gin路由为例
  20. 找出大文档中的所有手机号

热门文章

  1. [附源码]计算机毕业设计小太阳幼儿园学生管理系统Springboot程序
  2. Mendix助力工业数字化 :“智能制造百家讲堂”问题回顾
  3. 基于C++模板类编程数据结构图的操作---注意error:2248的解析
  4. Java面试官推荐的开发面试要点
  5. 「JavaSE」-流程控制和方法
  6. 2021数学建模国赛B题复盘详细解析
  7. 单元格的边框没有被显示出来
  8. C++ 1 之 冲刺期末不挂科的入门
  9. 3D角色模型欣赏:韩国3D设计师 Jiwoong Choi 科幻3d角色
  10. echarts实现各省市地图、中国地图