1、spark streaming获取kafka的数据有两种形式:(现在基本都是用direct方式了)

  • receiver
    通过zookeeper来连接kafka队列来获取数据。如果要做到容错,就要启用WAL机制。但吞吐量不高,效率低,而且可能反复消费

  • direct
    直接连接到kafka的节点上获取数据。kafka会自动维护偏移量在kafka里面,但是为了数据准确性,一般都自己写程序,把kafka的读偏移量写到zk或MySQL或redis中。如果spark挂掉了,会自动从挂掉的时候的偏移量重新消费数据,这样数据就不会丢失,轻易做到容错

    这种方式有如下优点:

    • 1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream,然后对它们进行union操作。Spark会创建跟Kafka partition一样多RDD partition,并且会并行从Kafka中读取数据。

    • 2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复

    • 3、一次且仅一次的事务机制:基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset(kafka自动保存到zk)。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的低级api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

4、在sparkstreaming中如何做到数据不丢失呢?
(1)使用checkpoint
(2)自己维护kafka偏移量
checkpoint配合kafka能够在特定环境下保证不丢不重,注意为什么要加上特定环境呢,这里有一些坑,checkpoint是对sparkstreaming运行过程中的元数据和 每次rdds的数据状态保存到一个持久化系统中,当然这里面也包含了offset。如果程序挂了,或者集群挂了,下次启动仍然能够从checkpoint中恢复,从而做到生产环境的7*24高可用。但是checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,重新打jar包,这个时候必须要删除原来的checkpoint文件,否则程序启动报错,但是一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消费,而不是上一次程序停止的那个偏移量。
所以综上,当我们对代码进行升级了的时候,我们不仅需要在重新启动的时候将上次checkpoint删掉,还需要自己写程序来维护kafka偏移量。
5、RDD的不可变性是指这个RDD经过转换操作后,生成的是新的RDD,而原值不变。并不是指这个RDD的数值一直都存在。而当你对这个RDD重复利用时,每次都会根据这个RDD的依赖从头到尾重新算一次,因为这个RDD的值并不是一直存在的。所以最好是将RDD进行cache
6、spark的transform操作只是会记住计算的逻辑,只有遇到了action操作才会进行真正的计算
7、在Spark Streaming中,累加器(Accumulator)和广播变量(Broadcast)不能从检查点(checkpoint)中恢复。
如果你采用检查点机制(检查点将切断RDD依赖)并且也用了累加器或广播变量,为了在突发异常并重启driver节点之后累加器和广播变量可以被重新实例化,你应该为它们创建惰性单例对象。示例如下:

//创建单例对象,由于spark是惰性计算,即遇到action才真正的计算,所以这里的单例称为惰性单例
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = nulldef getInstance(sc: SparkContext): Broadcast[Seq[String]] = {if (instance == null) {synchronized {if (instance == null) {val wordBlacklist = Seq("a", "b", "c")instance = sc.broadcast(wordBlacklist)}}}instance
}
}object DroppedWordsCounter {@volatile private var instance: Accumulator[Long] = nulldef getInstance(sc: SparkContext): Accumulator[Long] = {if (instance == null) {synchronized {if (instance == null) {instance = sc.accumulator(0L, "WordsInBlacklistCounter")}}}instance
}
}wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// 获取或注册blacklist广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取或注册droppedWordsCounter累加器。 注意这里是rdd的sparkContext
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 根据blacklist来移除words,并用droppedWordsCounter来计数
val counts = rdd.filter { case (word, count) =>if (blacklist.value.contains(word)) {droppedWordsCounter += countfalse} else {true}
}.collect()
val output = "Counts at time " + time + " " + counts
})

8、sparkStreaming程序中,一个批次时间间隔只会产生一个RDD,而Dstream由一系列RDD组成,是说DStream由以前的RDD,当前批次的这个RDD,下一批次RDD…等等一系列RDD组成。比如DStream.foreachRDD,其实一个批次内,foreachRDD内只有一个RDD的,并不是很多个RDD
9、spark streaming启动之后,所有代码会执行一遍,但是后面的每一批DSTREAM过来的时候,只会执行DStream相关的代码,如:
val logStream = StreamingUtils.createDirectStream(ssc, kafkaParams, topicSet, caseConf)
即只会执行logStream下所有相关的子DStream的代码(DStream.foreachRDD下所有代码),DStream外的代码都不会再执行了
10、spark streaming保存offset到zk:

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (o <- offsetRanges) {ZkUtils.updatePersistentPath(zkClient, s"${zkPath}/${o.topic}/${o.partition}, o.untilOffset.toString)}

11、性能调优

  • 1)、mapWithState 方法较之于updateStateByKey方法,有十倍之多的性能提升。

  • 2)、触发shuffle的常见算子:distinct、groupByKey、reduceByKey、join、repartition等。
    要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,直接在Web UI上看就可以,然后查看运行耗时的task,查看数据是否倾斜了! 根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。
    查看导致数据倾斜的key的数据分布情况 。根据执行操作的不同,可以有很多种查看key分布的方式: 1,如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。 2,如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。

  • 3)、数据倾斜的解决办法

    • 方案一:使用Hive ETL预处理数据
      适用场景:导致数据倾斜的是Hive表,Hive表中的数据本身很不均匀,业务场景需要频繁使用Spark对Hive表执行某个分析操作。
      实现思路:提前将join等操作执行,进行Hive阶段的ETL。将导致数据倾斜的shuffle前置。
      优缺点:实现简单,Spark作业性能提升,但是Hive ETL还是会发生数据倾斜,导致Hive ETL的速度很慢。
      实践经验:将数据倾斜提前到上游的Hive ETL,每天就执行一次,慢就慢点吧。

    • 方案二:过滤少数导致倾斜的key

    • 解决方案三:提高shuffle操作的并行度
      适用场景:直接面对数据倾斜的简单解决方案。
      实现思路:对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行的shuffle read task的数量。
      优缺点:简单能缓解,缺点是没有根除问题,效果有限

    • 解决方案四:两阶段聚合(局部聚合+全局聚合)
      适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适合这种方案。
      实现思路:先局部聚合,给每个key打一个小范围的随机数,比如10以内的随机数,相当于分成10份,一个task分成10个task。聚合后,去掉key上的随机数前缀,再次进行全局聚合操作。
      优缺点:大幅度缓解数据倾斜,缺点是仅适用于聚合类的shuffle操作。

    • 解决方案五:将reduce join转为map join

    • 解决方案六:reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差

12、spark streaming checkpoint保存的信息:topic,partition,fromOffset和untilOffset。有状态算子:updateStateByKey计算的数据
13、yarn containor是资源的抽象,即内存和core,网络等资源。是由applicationMaster向resourceManager申请的。
14、yarn-cluster和yarn-client

  • yarn-cluster模式(driver运行在applicationMaster的那个节点上)
    spark-submit在提交的时候请求到ResourceManager,请求来启动ApplicationMaster,ResourceManager接收到请求后会分配一个container,随机地在某个NodeManager(worker)上启动ApplicationMaster,driver就运行在这个AM所在的节点上,AM向ResourceManager申请资源(container),并在每台NodeManager上启动相应的executors;这里的NM相当于worker,ResourceManager相当于之前的master。

  • yarn-client模式(driver运行在提交代码的那个客户端上,所以在提交代码客户端可以 看到driver输出,适合调试)
    spark-submit在提交的时候请求RM启动AM,分配一个container,在某个NM上,启动AM,但是这里的AM只是一个ExecutorLauncher,功能是很有限的。AM启动后会找RM申请container,启动executor,AM链接其他的NM,用container的资源来启动executor。

15、spark整合kafka时offset管理
kafka0.10版本前,支持Receiver和direct方式,kafka0.10版本只支持direct方式,direct连接方式也有些变化,比如增加了enable.auto.commit参数,如果为true(默认为true),spark就会自动向kafka提交offset,不需要我们管理。如果为false,需要我们手动提交offset到kafka
16、spark streaming实时监控

1、kafka消费堆积监控:通过在webui url后面加metric/json,可以获得当前消费kafka的堆积数据,将此数据发到kafka,最后展示http://ClusterHostName:8088/proxy/AppID/metrics/jsonhttps://www.jianshu.com/p/f757ac43770e
2、jvm相关进程监控:通过spark metric将相关数据写到kafka

17、sparkstreaming消费kafka,如果enable.auto.commit设置为false,并且不手动提交offset,那么offset将不会提交到kafka。那么重启spark应用时,会根据auto.offset.reset策略,从最新offset或者最早offset开始消费。注意这里的最新或者最早offset是指已经提交到kafka的offset。
createDirectStream中指定的偏移量fromOffsets只有在刚启动spark应用的时候用到,以后rdd用到的偏移量都是从spark自己内存中保存偏移量的数据结构中获得,比如InputInfoTracker类中的map对象metadata。即将偏移量保存到kafka或者zk或者MySQL,只是为了在spark应用挂掉之后,可以从正确的偏移量恢复应用运行。当正常运行后,spark内存中会有一个维护偏移量的数据结构,根据这个数据结构准确获取保存offset,并不依赖外部系统了

val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
offset的维护粒度是topic-分区

18、除了第一次指定的offset生效外,后续依然会从untilOffset消费,并非自己手动提交的offset位置。通过扒源码发现是自己对offset的理解错误了。streaming只有在第一次创建directStream的时候会使用指定的offset,后续会从每批次的untilOffset去消费。
19、kafka在从这个版本也提供了自动commit的功能,默认enable.auto.commit开启,针对于不同的业务语义,不能容忍数据丢失的场景,应当关闭auto commit, 它在提交offset的时机是周期性的,由auto.commit.interval.ms控制,存在业务操作还未完成,offset就被commit的可能,即存在数据丢失的风险。
20、Kafka提供的有api,可以将offset提交到指定的kafkatopic。默认情况下,新的消费者会周期性的自动提交offset到kafka。但是有些情况下,这也会有些问题,因为消息可能已经被消费者从kafka拉去出来,但是spark还没处理,这种情况下会导致一些错误。这也是为什么例子中stream将enable.auto.commit设置为了false。然而在已经提交spark输出结果之后,你可以手动提交偏移到kafka

spark-streaming从入门到精通相关推荐

  1. 大数据求索(8):Spark Streaming简易入门一

    大数据求索(8):Spark Streaming简易入门一 一.Spark Streaming简单介绍 Spark Streaming是基于Spark Core上的一个应用程序,可伸缩,高吞吐,容错( ...

  2. Spark SQL:从入门到精通(一)[SparkSQL初体验]

    入口-SparkSession 在spark2.0版本之前 SQLContext是创建DataFrame和执行SQL的入口 HiveContext通过hive sql语句操作hive表数据,兼容hiv ...

  3. Spark SQL:从入门到精通(五)[开窗函数]

    概述 https://www.cnblogs.com/qiuting/p/7880500.html 介绍: 开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据.即在每一行的最后一列添加聚合函数 ...

  4. [学习笔记]黑马程序员Spark全套视频教程,4天spark3.2快速入门到精通,基于Python语言的spark教程

    文章目录 视频资料: 思维导图 一.Spark基础入门(环境搭建.入门概念) 第二章:Spark环境搭建-Local 2.1 课程服务器环境 2.2 Local模式基本原理 2.3 安装包下载 2.4 ...

  5. Spark Streaming 编程新手入门指南

    Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...

  6. spark streaming 入门例子

    spark streaming 入门例子: spark shell import org.apache.spark._ import org.apache.spark.streaming._sc.ge ...

  7. spark从入门到精通spark内存管理详解- 堆内堆外内存管理

    前言 Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解Spark内存管理的基本原理,有助于更好地开发Spark应用程序和进行性能调优.本文将详细介绍两部 ...

  8. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

  9. Spark入门(五)——Spark Streaming

    Spark Streaming(流处理) Spark Streaming(流处理) 什么是流处理? 快速入门 概念介绍 初始化 StreamingContext Discretized Streams ...

  10. 大数据学习15之spark streaming入门

    文章目录 一.概述 二.应用场景 三.集成Spark生态系统的使用 四.发展史 五.从词频统计功能着手入门 1.spark-submit执行 2.spark-shell执行(测试时使用) 六.工作原理 ...

最新文章

  1. 直接内存访问(DMA)
  2. c语言,字符串原地翻转
  3. 【Paper】2017_Limit-Cycle-Based Decoupled Design of Circle Formation Control with Collision Avoidance
  4. 返璞归真的Linux BFS调度器
  5. Koa / Co / Bluebird or Q / Generators / Promises / Thunks 的相互关系
  6. 记录一次uni-app页面跳转无效 来回跳转问题
  7. LeetCode 3. 无重复字符的最长子串(滑动窗口+哈希)
  8. MySQL(8)存储过程和函数
  9. Chrome和Firefox浏览器长截图
  10. 重磅!滴滴全员会宣布过冬:将裁员15% 涉及员工超2000人
  11. 用DrawerLayout(Support Library 4提供)开发侧边栏,有没有什么方法关闭手势控制?...
  12. 20模3c语言中等于多少钱,科三模拟多钱
  13. pandas系列学习(七):数据透视表
  14. Windows.old文件夹恢复系统解决方案
  15. Typec转hdmi+vga+pd+3.5音频方案设计参考电路|CS5266+CS201方案电路图|Typec扩展坞五合一方案设计
  16. Halcon——颜色识别提取
  17. win10家庭中文版安装docker
  18. BUAA全排列数的生成
  19. 浙江大学计算机考研资料汇总
  20. python-调用API接口移除照片背景,分分钟变抠图高手

热门文章

  1. 2022.04.21【日常维护】|服务器存储清理浅谈
  2. GitHub使用教程、注册与安装
  3. 2022-2028年中国数位板行业发展现状调查及市场分析预测报告
  4. 《电感元器件》的特性分析
  5. css动画与渐变案例,使用动画和渐变做一个背景动态网页
  6. html使三角形渐变色,CSS3 简单的三角形渐变效果
  7. 关于PADS 9.5导入CAD图(dxf文件)的说明
  8. AQSW公司OA系统需求分析
  9. 多径效应及其消除方法
  10. (海伦公式)已知三角形三条边长,求面积