Spark Accumulator

  • 累加器作用
  • 源码
  • 累加器原理图
  • Spark中累加器的执行流程:
  • 累加器使用demo
    • spark ui
  • 使用累加器中可能遇到的坑
  • 线程安全问题

累加器作用

  • 累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。
  • 累加器对信息进行聚合。

源码

累加器的基类,可以累加 IN 类型的输入,并产生 OUT 类型的输出。
OUT 应该是可以原子读取的类型(例如,Int、Long)或线程安全的(例如,synchronized collections),因为它将从其他线程读取。abstract class AccumulatorV2[IN, OUT] extends Serializable {...
}

累加器原理图

Spark中累加器的执行流程:

  • 1.首先序列化 driver 端 accumulator 到 executor ,序列化前调用 reset 重置 value 并使用 isZero 检测是否重置成
  • 2.有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的)
  • 3.单个 executor 内使用 add 进行累加(注意在此过程中,被最初注册的累加器的值是不变的),
  • 4.最终 driver 端对多个 executor 间的 accumulaotr 使用merge 进行合并得到结果。

累加器使用demo

package accumulatorimport org.apache.spark.util.AccumulatorV2
import java.utilclass MyAccumulator extends AccumulatorV2[(String,String),util.Map[String,AnyRef]] {//初始化一个输出的变量var map = new util.HashMap[String, AnyRef]()//1.driver端序列化累加器前重置累加器override def reset(): Unit = map.clear()//2.driver端序列化累加器前重置后检测是否重置成功override def isZero: Boolean = map.isEmpty//3.拷贝累加器到executor的task中   override def copy(): AccumulatorV2[(String, String), util.Map[String, AnyRef]] = {val myAcc = new MyAccumulatormyAcc.map = this.mapmyAcc}//4.单个executor 内使用 add 进行累加override def add(v: (String, String)): Unit = {map.put(v._1,v._2)}//5.driver端对executor中各个task的累加器进行合并override def merge(other: AccumulatorV2[(String, String), util.Map[String, AnyRef]]): Unit = {val map1: util.HashMap[String, AnyRef] = this.mapval map2: util.Map[String, AnyRef] = other.valuemap1.putAll(map2)}//6.driver端累加器的输出格式override def value: util.Map[String, AnyRef] = map
}
package accumulatorimport com.alibaba.fastjson.{JSONArray, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object AccumulatorTest {def main(args: Array[String]): Unit = {val session: SparkSession = SparkSession.builder().appName("regex").master("local").getOrCreate()val sc: SparkContext = session.sparkContextval rdd2: RDD[(String, String)] = sc.makeRDD(List(("home", "北京"),("name", "小宽"), ("age", "30"), ("tools", "篮球"), ("sex", "男")), 4)//注册累加器val myAcc2 = new MyAccumulatorsc.register(myAcc2,"myAcc2")println(rdd2.getNumPartitions)rdd2.foreachPartition(partition=>{partition.foreach(f=>{println(f)myAcc2.add(f._1,f._2)})})val jSONObject2 = new JSONObject(myAcc2.value)println(jSONObject2.toJSONString)session.close()}
}

spark ui

使用累加器中可能遇到的坑

当我们把累加器的操作放在 map 中执行的时候,后续如果有多个 action 操作共用该累加器的 RDD ,将会导致重复执行。也就意味着累加器会重复累加。为了避免这种错误,我们最好只在 action 算子如 foreach 中使用累加器,如果实在需要在 transformation 中使用,记得使用 cache 操作

1.累加器少加

 //注册累加器val myAcc: LongAccumulator = sc.longAccumulator("myAcc")//todo 1.累加器少加 : 累计器是lazy加载的 没action算子 不执行sc.parallelize(1 to 20).map(myAcc.add(_))println(myAcc.value) // 0

2.累加器多加

   //todo 2.累加器多加: 多个action算子 重复执行val rdd: RDD[Int] = sc.parallelize(1 to 10).map(f => {myAcc.add(1)f + 1})rdd.count()println("AccumuLator1: " + myAcc.value)rdd.reduce(_+_)println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 20

map算子实际上被执行了两次,在reduce操作提交作业后累加器又完成了一轮技数,所以最终的累加器的值为20。究其原因是因为count虽然促使numberRDD累计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集时,还需要从并行化数据集的部分开始执行计算

2.1避免累加器多加

//2.1 cache
//在count之前调用rdd的cache方法(或persist),这样在count后数据集就会被缓存下来
//reduce操作就会读取缓存的数据集,而无需从头开始计算。val rdd: RDD[Int] = sc.parallelize(1 to 10).map(f => {myAcc.add(1)f + 1})rdd.cache()rdd.count()println("AccumuLator1: " + myAcc.value)rdd.reduce(_+_)println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 10
  //todo 2.2. 如果累计器在actions操作算子里面执行时,只会累加一次val rdd2: RDD[Int] = sc.parallelize(1 to 10)rdd2.foreach(f=>{myAcc.add(1)f+1})println("AccumuLator1: " + myAcc.value)rdd2.count()println("AccumuLator2: " + myAcc.value)//AccumuLator1: 10//AccumuLator2: 10

线程安全问题

一、踩坑经历
  自定义的accumulator是线程不安全的,会造成累加结果不正确。如果在累加器内使用集合就要用java的线程安全的集合,这里说一下scala原本是有线程安全的集合的但是在2.11之后全被弃用,看了源码建议使用java的线程安全集合

二、解决方法
  创建一个线程安全的集合变量(我用的是Java的ConcurrentHashMap),赋好初始值 ,在重写add方法时,将旧的值取出来累加后再放回去(取与放的动作要加上上锁操作)。这样就可以,测试的时候再也没出现累加少值的情况。

我原本hashSet但是线程不安全,改后用的是Collections.synchronizedList

class MyAccumulator extends AccumulatorV2[String,util.List[String]]{var hashSet = Collections.synchronizedList(new util.ArrayList[String]())override def reset(): Unit = hashSet.clear()override def isZero: Boolean = hashSet.isEmptyoverride def copy(): AccumulatorV2[String, util.List[String]] = {val myAcc = new MyAccumulatormyAcc.hashSet = this.hashSetmyAcc}override def add(v: String): Unit = {if(!hashSet.contains(v)){hashSet.add(v)}}override def merge(other: AccumulatorV2[String, util.List[String]]): Unit ={val hashSet1: util.List[String] = this.hashSetval hashSet2: util.List[String] = other.valuehashSet1.addAll(hashSet2)}override def value: util.List[String] = hashSet
}

Spark 之 Accumulator 累加器相关推荐

  1. Spark Accumulator累加器

    什么是累加器 累加器用来对信息进行聚合 1 算子在计算时,不会影响到driver里的变量的值(driver里的变量称之为共享变量) 2 算子使用的其实都是driver里的变量的一个副本 3 如果想要影 ...

  2. Accumulator累加器(一)

    累加器 应用场景:Driver端定义一个共享变量,将数据累加到该变量上,如果直接用foreach或map等迭代算子, 是无法将累加的变量返回到driver端,因为累加的过程发生在Executor端.一 ...

  3. Spark累加器(Accumulator)陷阱及解决办法

    Accumulator简介 Accumulator是spark提供的累加器,顾名思义,该变量只能够增加. 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增 ...

  4. Spark一路火花带闪电——Accumulator Broadcast

    文章目录 Accumulator累加器 Accumulator简介 陷阱及解决办法 Broadcast广播变量 Accumulator累加器 Accumulator简介 Accumulator是spa ...

  5. spark 累加器的使用探索

    spark 累加器的使用探索 1 spark不能在遍历rdd过程中修改全局map 2 spark 提供的累加器的使用 2.1 了解累加器 2.2 spark 提供的累加器的使用 2.3 完整代码 3 ...

  6. Spark累加器实现原理及基础编程

    Spark累加器实现原理及基础编程 实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端.在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这 ...

  7. ❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️

    目录 前言 Spark的关键技术回顾 一.Spark复习题回顾 1.Spark使用的版本 2.Spark几种部署方式? 3.Spark的提交任务的方式? 4.使用Spark-shell的方式也可以交互 ...

  8. spark RDD官网RDD编程指南

    http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在较高的层次上, ...

  9. Spark使用总结与分享

    背景 使用spark开发已有几个月.相比于python/hive,scala/spark学习门槛较高.尤其记得刚开时,举步维艰,进展十分缓慢.不过谢天谢地,这段苦涩(bi)的日子过去了.忆苦思甜,为了 ...

最新文章

  1. 沙场秋点兵---走出软件作坊:三五个人十来条枪 如何成为开发正规军(二十七)...
  2. 基于双TMS320C6678+双XC6VSX315T的6U VPX高速数据处理平台
  3. JavaScript是如何工作的:事件循环和异步编程的崛起+ 5种使用 async/await 更好地编码方式!...
  4. Exynos4412 所用内存 —— DDR2
  5. Git bash 编码格式配置_02
  6. 原生JS去除二维数组中重复了的一维数组
  7. mysql 阿里云 版本_阿里云虚拟主机mysql已经支持版本切换,支持MySQL 5.7.25
  8. Adopt Open JDK官方文档(四)基于虚拟机的编译环境
  9. NOIP2016愤怒的小鸟 题解报告 【状压DP】
  10. LightOJ 1258 Making Huge Palindromes(KMP)
  11. ORA-01157报错cannot identify/lock data file
  12. mysql查询数据库修改记录_11. 查询数据库各种历史记录
  13. matlab 滤波器设计工具,滤波器设计工具快速入门
  14. linux下ftp工具
  15. PS CS6增加导出ICO图标文件(ICOFormat64.8bi)64位系统
  16. vue-element-template模板
  17. 计数器控制led灯的亮灭
  18. 我有酒,你有故事吗?
  19. PhoneApplicationFrame以及设置Obscured/Unobscured的event handler
  20. pdf批量修改属性工具软件使用教程

热门文章

  1. 剪辑软件生产力工具pr,ae,达芬奇对比
  2. 神兽归笼,又是一波斗智斗勇?这款QLED电视机让你带娃更省心
  3. 前端文字下划线的模拟
  4. 9 个美观大气的后台管理系统(收藏备用)
  5. cut命令的详细用法
  6. 上传文件连接失败问题
  7. 大整数加减乘除的实现
  8. 【Lilishop商城】No4-2.业务逻辑的代码开发,涉及到:会员B端第三方登录的开发-平台注册会员接口开发
  9. vue中样式穿透的三种写法
  10. Leetcode-数据结构-88. 合并两个有序数组