Spark 之 Accumulator 累加器
Spark Accumulator
- 累加器作用
- 源码
- 累加器原理图
- Spark中累加器的执行流程:
- 累加器使用demo
- spark ui
- 使用累加器中可能遇到的坑
- 线程安全问题
累加器作用
- 累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。
- 累加器对信息进行聚合。
源码
累加器的基类,可以累加 IN 类型的输入,并产生 OUT 类型的输出。
OUT 应该是可以原子读取的类型(例如,Int、Long)或线程安全的(例如,synchronized collections),因为它将从其他线程读取。abstract class AccumulatorV2[IN, OUT] extends Serializable {...
}
累加器原理图
Spark中累加器的执行流程:
- 1.首先序列化 driver 端 accumulator 到 executor ,序列化前调用 reset 重置 value 并使用 isZero 检测是否重置成
- 2.有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的)
- 3.单个 executor 内使用 add 进行累加(注意在此过程中,被最初注册的累加器的值是不变的),
- 4.最终 driver 端对多个 executor 间的 accumulaotr 使用merge 进行合并得到结果。
累加器使用demo
package accumulatorimport org.apache.spark.util.AccumulatorV2
import java.utilclass MyAccumulator extends AccumulatorV2[(String,String),util.Map[String,AnyRef]] {//初始化一个输出的变量var map = new util.HashMap[String, AnyRef]()//1.driver端序列化累加器前重置累加器override def reset(): Unit = map.clear()//2.driver端序列化累加器前重置后检测是否重置成功override def isZero: Boolean = map.isEmpty//3.拷贝累加器到executor的task中 override def copy(): AccumulatorV2[(String, String), util.Map[String, AnyRef]] = {val myAcc = new MyAccumulatormyAcc.map = this.mapmyAcc}//4.单个executor 内使用 add 进行累加override def add(v: (String, String)): Unit = {map.put(v._1,v._2)}//5.driver端对executor中各个task的累加器进行合并override def merge(other: AccumulatorV2[(String, String), util.Map[String, AnyRef]]): Unit = {val map1: util.HashMap[String, AnyRef] = this.mapval map2: util.Map[String, AnyRef] = other.valuemap1.putAll(map2)}//6.driver端累加器的输出格式override def value: util.Map[String, AnyRef] = map
}
package accumulatorimport com.alibaba.fastjson.{JSONArray, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object AccumulatorTest {def main(args: Array[String]): Unit = {val session: SparkSession = SparkSession.builder().appName("regex").master("local").getOrCreate()val sc: SparkContext = session.sparkContextval rdd2: RDD[(String, String)] = sc.makeRDD(List(("home", "北京"),("name", "小宽"), ("age", "30"), ("tools", "篮球"), ("sex", "男")), 4)//注册累加器val myAcc2 = new MyAccumulatorsc.register(myAcc2,"myAcc2")println(rdd2.getNumPartitions)rdd2.foreachPartition(partition=>{partition.foreach(f=>{println(f)myAcc2.add(f._1,f._2)})})val jSONObject2 = new JSONObject(myAcc2.value)println(jSONObject2.toJSONString)session.close()}
}
spark ui
使用累加器中可能遇到的坑
当我们把累加器的操作放在 map 中执行的时候,后续如果有多个 action 操作共用该累加器的 RDD ,将会导致重复执行。也就意味着累加器会重复累加。为了避免这种错误,我们最好只在 action 算子如 foreach 中使用累加器,如果实在需要在 transformation 中使用,记得使用 cache 操作
1.累加器少加
//注册累加器val myAcc: LongAccumulator = sc.longAccumulator("myAcc")//todo 1.累加器少加 : 累计器是lazy加载的 没action算子 不执行sc.parallelize(1 to 20).map(myAcc.add(_))println(myAcc.value) // 0
2.累加器多加
//todo 2.累加器多加: 多个action算子 重复执行val rdd: RDD[Int] = sc.parallelize(1 to 10).map(f => {myAcc.add(1)f + 1})rdd.count()println("AccumuLator1: " + myAcc.value)rdd.reduce(_+_)println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 20
map算子实际上被执行了两次,在reduce操作提交作业后累加器又完成了一轮技数,所以最终的累加器的值为20。究其原因是因为count虽然促使numberRDD累计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集时,还需要从并行化数据集的部分开始执行计算
2.1避免累加器多加
//2.1 cache
//在count之前调用rdd的cache方法(或persist),这样在count后数据集就会被缓存下来
//reduce操作就会读取缓存的数据集,而无需从头开始计算。val rdd: RDD[Int] = sc.parallelize(1 to 10).map(f => {myAcc.add(1)f + 1})rdd.cache()rdd.count()println("AccumuLator1: " + myAcc.value)rdd.reduce(_+_)println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 10
//todo 2.2. 如果累计器在actions操作算子里面执行时,只会累加一次val rdd2: RDD[Int] = sc.parallelize(1 to 10)rdd2.foreach(f=>{myAcc.add(1)f+1})println("AccumuLator1: " + myAcc.value)rdd2.count()println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 10
线程安全问题
一、踩坑经历
自定义的accumulator是线程不安全的,会造成累加结果不正确。如果在累加器内使用集合就要用java的线程安全的集合,这里说一下scala原本是有线程安全的集合的但是在2.11之后全被弃用,看了源码建议使用java的线程安全集合
二、解决方法
创建一个线程安全的集合变量(我用的是Java的ConcurrentHashMap),赋好初始值 ,在重写add方法时,将旧的值取出来累加后再放回去(取与放的动作要加上上锁操作)。这样就可以,测试的时候再也没出现累加少值的情况。
我原本hashSet但是线程不安全,改后用的是Collections.synchronizedList
class MyAccumulator extends AccumulatorV2[String,util.List[String]]{var hashSet = Collections.synchronizedList(new util.ArrayList[String]())override def reset(): Unit = hashSet.clear()override def isZero: Boolean = hashSet.isEmptyoverride def copy(): AccumulatorV2[String, util.List[String]] = {val myAcc = new MyAccumulatormyAcc.hashSet = this.hashSetmyAcc}override def add(v: String): Unit = {if(!hashSet.contains(v)){hashSet.add(v)}}override def merge(other: AccumulatorV2[String, util.List[String]]): Unit ={val hashSet1: util.List[String] = this.hashSetval hashSet2: util.List[String] = other.valuehashSet1.addAll(hashSet2)}override def value: util.List[String] = hashSet
}
Spark 之 Accumulator 累加器相关推荐
- Spark Accumulator累加器
什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...
- Accumulator累加器(一)
累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...
- Spark累加器(Accumulator)陷阱及解决办法
Accumulator简介 Accumulator是spark提供的累加器,顾名思义,该变量只能够增加. 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增 ...
- Spark一路火花带闪电——Accumulator Broadcast
文章目录 Accumulator累加器 Accumulator简介 陷阱及解决办法 Broadcast广播变量 Accumulator累加器 Accumulator简介 Accumulator是spa ...
- spark 累加器的使用探索
spark 累加器的使用探索 1 spark不能在遍历rdd过程中修改全局map 2 spark 提供的累加器的使用 2.1 了解累加器 2.2 spark 提供的累加器的使用 2.3 完整代码 3 ...
- Spark累加器实现原理及基础编程
Spark累加器实现原理及基础编程 实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端.在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这 ...
- ❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️
目录 前言 Spark的关键技术回顾 一.Spark复习题回顾 1.Spark使用的版本 2.Spark几种部署方式? 3.Spark的提交任务的方式? 4.使用Spark-shell的方式也可以交互 ...
- spark RDD官网RDD编程指南
http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在较高的层次上, ...
- Spark使用总结与分享
背景 使用spark开发已有几个月.相比于python/hive,scala/spark学习门槛较高.尤其记得刚开时,举步维艰,进展十分缓慢.不过谢天谢地,这段苦涩(bi)的日子过去了.忆苦思甜,为了 ...
最新文章
- 沙场秋点兵---走出软件作坊:三五个人十来条枪 如何成为开发正规军(二十七)...
- 基于双TMS320C6678+双XC6VSX315T的6U VPX高速数据处理平台
- JavaScript是如何工作的:事件循环和异步编程的崛起+ 5种使用 async/await 更好地编码方式!...
- Exynos4412 所用内存 —— DDR2
- Git bash 编码格式配置_02
- 原生JS去除二维数组中重复了的一维数组
- mysql 阿里云 版本_阿里云虚拟主机mysql已经支持版本切换,支持MySQL 5.7.25
- Adopt Open JDK官方文档(四)基于虚拟机的编译环境
- NOIP2016愤怒的小鸟 题解报告 【状压DP】
- LightOJ 1258 Making Huge Palindromes(KMP)
- ORA-01157报错cannot identify/lock data file
- mysql查询数据库修改记录_11. 查询数据库各种历史记录
- matlab 滤波器设计工具,滤波器设计工具快速入门
- linux下ftp工具
- PS CS6增加导出ICO图标文件(ICOFormat64.8bi)64位系统
- vue-element-template模板
- 计数器控制led灯的亮灭
- 我有酒,你有故事吗?
- PhoneApplicationFrame以及设置Obscured/Unobscured的event handler
- pdf批量修改属性工具软件使用教程