spark-streaming从入门到精通
1、spark streaming获取kafka的数据有两种形式:(现在基本都是用direct方式了)
receiver
通过zookeeper来连接kafka队列来获取数据。如果要做到容错,就要启用WAL机制。但吞吐量不高,效率低,而且可能反复消费direct
直接连接到kafka的节点上获取数据。kafka会自动维护偏移量在kafka里面,但是为了数据准确性,一般都自己写程序,把kafka的读偏移量写到zk或MySQL或redis中。如果spark挂掉了,会自动从挂掉的时候的偏移量重新消费数据,这样数据就不会丢失,轻易做到容错这种方式有如下优点:
1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream,然后对它们进行union操作。Spark会创建跟Kafka partition一样多RDD partition,并且会并行从Kafka中读取数据。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复
3、一次且仅一次的事务机制:基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset(kafka自动保存到zk)。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的低级api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
4、在sparkstreaming中如何做到数据不丢失呢?
(1)使用checkpoint
(2)自己维护kafka偏移量
checkpoint配合kafka能够在特定环境下保证不丢不重,注意为什么要加上特定环境呢,这里有一些坑,checkpoint是对sparkstreaming运行过程中的元数据和 每次rdds的数据状态保存到一个持久化系统中,当然这里面也包含了offset。如果程序挂了,或者集群挂了,下次启动仍然能够从checkpoint中恢复,从而做到生产环境的7*24高可用。但是checkpoint的最大的弊端在于,一旦你的流式程序代码或配置改变了,重新打jar包,这个时候必须要删除原来的checkpoint文件,否则程序启动报错,但是一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消费,而不是上一次程序停止的那个偏移量。
所以综上,当我们对代码进行升级了的时候,我们不仅需要在重新启动的时候将上次checkpoint删掉,还需要自己写程序来维护kafka偏移量。
5、RDD的不可变性是指这个RDD经过转换操作后,生成的是新的RDD,而原值不变。并不是指这个RDD的数值一直都存在。而当你对这个RDD重复利用时,每次都会根据这个RDD的依赖从头到尾重新算一次,因为这个RDD的值并不是一直存在的。所以最好是将RDD进行cache
6、spark的transform操作只是会记住计算的逻辑,只有遇到了action操作才会进行真正的计算
7、在Spark Streaming中,累加器(Accumulator)和广播变量(Broadcast)不能从检查点(checkpoint)中恢复。
如果你采用检查点机制(检查点将切断RDD依赖)并且也用了累加器或广播变量,为了在突发异常并重启driver节点之后累加器和广播变量可以被重新实例化,你应该为它们创建惰性单例对象。示例如下:
//创建单例对象,由于spark是惰性计算,即遇到action才真正的计算,所以这里的单例称为惰性单例
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = nulldef getInstance(sc: SparkContext): Broadcast[Seq[String]] = {if (instance == null) {synchronized {if (instance == null) {val wordBlacklist = Seq("a", "b", "c")instance = sc.broadcast(wordBlacklist)}}}instance
}
}object DroppedWordsCounter {@volatile private var instance: Accumulator[Long] = nulldef getInstance(sc: SparkContext): Accumulator[Long] = {if (instance == null) {synchronized {if (instance == null) {instance = sc.accumulator(0L, "WordsInBlacklistCounter")}}}instance
}
}wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// 获取或注册blacklist广播变量
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// 获取或注册droppedWordsCounter累加器。 注意这里是rdd的sparkContext
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 根据blacklist来移除words,并用droppedWordsCounter来计数
val counts = rdd.filter { case (word, count) =>if (blacklist.value.contains(word)) {droppedWordsCounter += countfalse} else {true}
}.collect()
val output = "Counts at time " + time + " " + counts
})
8、sparkStreaming程序中,一个批次时间间隔只会产生一个RDD,而Dstream由一系列RDD组成,是说DStream由以前的RDD,当前批次的这个RDD,下一批次RDD…等等一系列RDD组成。比如DStream.foreachRDD,其实一个批次内,foreachRDD内只有一个RDD的,并不是很多个RDD
9、spark streaming启动之后,所有代码会执行一遍,但是后面的每一批DSTREAM过来的时候,只会执行DStream相关的代码,如:
val logStream = StreamingUtils.createDirectStream(ssc, kafkaParams, topicSet, caseConf)
即只会执行logStream下所有相关的子DStream的代码(DStream.foreachRDD下所有代码),DStream外的代码都不会再执行了
10、spark streaming保存offset到zk:
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (o <- offsetRanges) {ZkUtils.updatePersistentPath(zkClient, s"${zkPath}/${o.topic}/${o.partition}, o.untilOffset.toString)}
11、性能调优
1)、mapWithState 方法较之于updateStateByKey方法,有十倍之多的性能提升。
2)、触发shuffle的常见算子:distinct、groupByKey、reduceByKey、join、repartition等。
要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,直接在Web UI上看就可以,然后查看运行耗时的task,查看数据是否倾斜了! 根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。
查看导致数据倾斜的key的数据分布情况 。根据执行操作的不同,可以有很多种查看key分布的方式: 1,如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。 2,如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。3)、数据倾斜的解决办法
方案一:使用Hive ETL预处理数据
适用场景:导致数据倾斜的是Hive表,Hive表中的数据本身很不均匀,业务场景需要频繁使用Spark对Hive表执行某个分析操作。
实现思路:提前将join等操作执行,进行Hive阶段的ETL。将导致数据倾斜的shuffle前置。
优缺点:实现简单,Spark作业性能提升,但是Hive ETL还是会发生数据倾斜,导致Hive ETL的速度很慢。
实践经验:将数据倾斜提前到上游的Hive ETL,每天就执行一次,慢就慢点吧。方案二:过滤少数导致倾斜的key
解决方案三:提高shuffle操作的并行度
适用场景:直接面对数据倾斜的简单解决方案。
实现思路:对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行的shuffle read task的数量。
优缺点:简单能缓解,缺点是没有根除问题,效果有限解决方案四:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适合这种方案。
实现思路:先局部聚合,给每个key打一个小范围的随机数,比如10以内的随机数,相当于分成10份,一个task分成10个task。聚合后,去掉key上的随机数前缀,再次进行全局聚合操作。
优缺点:大幅度缓解数据倾斜,缺点是仅适用于聚合类的shuffle操作。解决方案五:将reduce join转为map join
解决方案六:reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差
12、spark streaming checkpoint保存的信息:topic,partition,fromOffset和untilOffset。有状态算子:updateStateByKey计算的数据
13、yarn containor是资源的抽象,即内存和core,网络等资源。是由applicationMaster向resourceManager申请的。
14、yarn-cluster和yarn-client
yarn-cluster模式(driver运行在applicationMaster的那个节点上)
spark-submit在提交的时候请求到ResourceManager,请求来启动ApplicationMaster,ResourceManager接收到请求后会分配一个container,随机地在某个NodeManager(worker)上启动ApplicationMaster,driver就运行在这个AM所在的节点上,AM向ResourceManager申请资源(container),并在每台NodeManager上启动相应的executors;这里的NM相当于worker,ResourceManager相当于之前的master。yarn-client模式(driver运行在提交代码的那个客户端上,所以在提交代码客户端可以 看到driver输出,适合调试)
spark-submit在提交的时候请求RM启动AM,分配一个container,在某个NM上,启动AM,但是这里的AM只是一个ExecutorLauncher,功能是很有限的。AM启动后会找RM申请container,启动executor,AM链接其他的NM,用container的资源来启动executor。
15、spark整合kafka时offset管理
kafka0.10版本前,支持Receiver和direct方式,kafka0.10版本只支持direct方式,direct连接方式也有些变化,比如增加了enable.auto.commit参数,如果为true(默认为true),spark就会自动向kafka提交offset,不需要我们管理。如果为false,需要我们手动提交offset到kafka
16、spark streaming实时监控
1、kafka消费堆积监控:通过在webui url后面加metric/json,可以获得当前消费kafka的堆积数据,将此数据发到kafka,最后展示http://ClusterHostName:8088/proxy/AppID/metrics/jsonhttps://www.jianshu.com/p/f757ac43770e
2、jvm相关进程监控:通过spark metric将相关数据写到kafka
17、sparkstreaming消费kafka,如果enable.auto.commit设置为false,并且不手动提交offset,那么offset将不会提交到kafka。那么重启spark应用时,会根据auto.offset.reset策略,从最新offset或者最早offset开始消费。注意这里的最新或者最早offset是指已经提交到kafka的offset。
createDirectStream中指定的偏移量fromOffsets只有在刚启动spark应用的时候用到,以后rdd用到的偏移量都是从spark自己内存中保存偏移量的数据结构中获得,比如InputInfoTracker类中的map对象metadata。即将偏移量保存到kafka或者zk或者MySQL,只是为了在spark应用挂掉之后,可以从正确的偏移量恢复应用运行。当正常运行后,spark内存中会有一个维护偏移量的数据结构,根据这个数据结构准确获取保存offset,并不依赖外部系统了
val stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
offset的维护粒度是topic-分区
18、除了第一次指定的offset生效外,后续依然会从untilOffset消费,并非自己手动提交的offset位置。通过扒源码发现是自己对offset的理解错误了。streaming只有在第一次创建directStream的时候会使用指定的offset,后续会从每批次的untilOffset去消费。
19、kafka在从这个版本也提供了自动commit的功能,默认enable.auto.commit开启,针对于不同的业务语义,不能容忍数据丢失的场景,应当关闭auto commit, 它在提交offset的时机是周期性的,由auto.commit.interval.ms控制,存在业务操作还未完成,offset就被commit的可能,即存在数据丢失的风险。
20、Kafka提供的有api,可以将offset提交到指定的kafkatopic。默认情况下,新的消费者会周期性的自动提交offset到kafka。但是有些情况下,这也会有些问题,因为消息可能已经被消费者从kafka拉去出来,但是spark还没处理,这种情况下会导致一些错误。这也是为什么例子中stream将enable.auto.commit设置为了false。然而在已经提交spark输出结果之后,你可以手动提交偏移到kafka
spark-streaming从入门到精通相关推荐
- 大数据求索(8):Spark Streaming简易入门一
大数据求索(8):Spark Streaming简易入门一 一.Spark Streaming简单介绍 Spark Streaming是基于Spark Core上的一个应用程序,可伸缩,高吞吐,容错( ...
- Spark SQL:从入门到精通(一)[SparkSQL初体验]
入口-SparkSession 在spark2.0版本之前 SQLContext是创建DataFrame和执行SQL的入口 HiveContext通过hive sql语句操作hive表数据,兼容hiv ...
- Spark SQL:从入门到精通(五)[开窗函数]
概述 https://www.cnblogs.com/qiuting/p/7880500.html 介绍: 开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据.即在每一行的最后一列添加聚合函数 ...
- [学习笔记]黑马程序员Spark全套视频教程,4天spark3.2快速入门到精通,基于Python语言的spark教程
文章目录 视频资料: 思维导图 一.Spark基础入门(环境搭建.入门概念) 第二章:Spark环境搭建-Local 2.1 课程服务器环境 2.2 Local模式基本原理 2.3 安装包下载 2.4 ...
- Spark Streaming 编程新手入门指南
Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...
- spark streaming 入门例子
spark streaming 入门例子: spark shell import org.apache.spark._ import org.apache.spark.streaming._sc.ge ...
- spark从入门到精通spark内存管理详解- 堆内堆外内存管理
前言 Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解Spark内存管理的基本原理,有助于更好地开发Spark应用程序和进行性能调优.本文将详细介绍两部 ...
- Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...
- Spark入门(五)——Spark Streaming
Spark Streaming(流处理) Spark Streaming(流处理) 什么是流处理? 快速入门 概念介绍 初始化 StreamingContext Discretized Streams ...
- 大数据学习15之spark streaming入门
文章目录 一.概述 二.应用场景 三.集成Spark生态系统的使用 四.发展史 五.从词频统计功能着手入门 1.spark-submit执行 2.spark-shell执行(测试时使用) 六.工作原理 ...
最新文章
- 直接内存访问(DMA)
- c语言,字符串原地翻转
- 【Paper】2017_Limit-Cycle-Based Decoupled Design of Circle Formation Control with Collision Avoidance
- 返璞归真的Linux BFS调度器
- Koa / Co / Bluebird or Q / Generators / Promises / Thunks 的相互关系
- 记录一次uni-app页面跳转无效 来回跳转问题
- LeetCode 3. 无重复字符的最长子串(滑动窗口+哈希)
- MySQL(8)存储过程和函数
- Chrome和Firefox浏览器长截图
- 重磅!滴滴全员会宣布过冬:将裁员15% 涉及员工超2000人
- 用DrawerLayout(Support Library 4提供)开发侧边栏,有没有什么方法关闭手势控制?...
- 20模3c语言中等于多少钱,科三模拟多钱
- pandas系列学习(七):数据透视表
- Windows.old文件夹恢复系统解决方案
- Typec转hdmi+vga+pd+3.5音频方案设计参考电路|CS5266+CS201方案电路图|Typec扩展坞五合一方案设计
- Halcon——颜色识别提取
- win10家庭中文版安装docker
- BUAA全排列数的生成
- 浙江大学计算机考研资料汇总
- python-调用API接口移除照片背景,分分钟变抠图高手