Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象

共享变量出现的原因:

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

 广播变量的引入

Spark 会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但也很低效。原因有二:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是 Spark 会为每个操作分别发送。

用一段代码来更直观的解释:

list是在driver端创建的,但是因为需要在excutor端使用,所以driver会把list以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个list,如果这个list非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。

使用广播变量后:

使用广播变量的过程很简单:

(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。任何可序列化的类型都可以这么实现。

(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。

(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

案例如下

object BroadcastTest {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("broadcast")

val sc = new SparkContext(conf)

val list = List("hello java")

val broadcast = sc.broadcast(list)

val linesRDD = sc.textFile("./word")

linesRDD.filter(line => {

broadcast.value.contains(line)

}).foreach(println)

sc.stop()

}

}

注意事项:

能不能将一个RDD使用广播变量广播出去?

不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

广播变量只能在Driver端定义,不能在Executor端定义。

在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值

我们发现打印的结果为

依然是driver和excutor端的数据不能共享的问题。excutor端修改了变量,根本不会让driver端跟着修改,这个就是累加器出现的原因。

累加器的作用:

提供了将工作节点中的值聚合到驱动器程序中的简单语法。(如下图)

常用场景:

调试时对作业执行过程中的事件进行计数。

 累加器的用法如下所示:

(1)通过在driver中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。

(2)Spark闭包(函数序列化)里的excutor代码可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。

(3)driver程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。

案例如下:

object AccumulatorTest {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("accumulator")

val sc = new SparkContext(conf)

val accumulator = sc.accumulator(0); //创建accumulator并初始化为0

val linesRDD = sc.textFile("./word")

val result = linesRDD.map(s => {

accumulator.add(1) //有一条数据就增加1

s

})

result.collect();

println("words lines is :" + accumulator.value)

sc.stop()

}

}

输出结果:

注意事项

累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新(如下图)。

Spark共享变量(广播变量、累加器)相关推荐

  1. spark共享变量(广播变量Broadcast Variable,累加器Accumulators)

    2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...

  2. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

  3. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  4. spark 广播变量大数据_Spark基础知识(三)--- Spark的广播变量和累加器

    在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些 ...

  5. 【Spark】广播变量和累加器

    文章目录 一.Spark广播变量 二.累加器 Reference 一.Spark广播变量 多进程编程中,不同进程可以通过创建共享内存,进行进程间通信.而在分布式中,Spark通过[广播变量]和[累加器 ...

  6. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  7. spark 获取广播变量_spark使用广播变量

    import java.io.{File, FileReader} import java.util import org.apache.spark.SparkConf import org.apac ...

  8. Spark中广播变量(boardcast)的使用

    1 使用广播变量的使用步骤 (1)声明广播变量 val board = sc.boardcast(XX) (2)使用 .value 获得广播变量的值 (3)变量只会被发到各个节点一次,且作为" ...

  9. 广播变量kyro_Spark简介

    简介: spark是一个内存计算框架 , 解决了mapreduce在迭代与交互方面的不足 迭代: 多轮算法计算形式,pagerank,逻辑回归等 交互: 实时数据,数据仓库查询等 spark重要概念: ...

最新文章

  1. Docker容器管理总结
  2. cs专业学生对于家乡的思考
  3. 鸿蒙思维和小央美,中心路汝南路站附近艺术培训
  4. js `` 手机不支持
  5. c语言延时系统,基于VB的单片机C语言延时程序各参数计算系统
  6. 2020年苹果App Store销售额达6430亿美元 同比增长24%
  7. Python深入理解yield
  8. HDU 3966 Aragorn's Story(树链剖分)题解
  9. C# 如何遍历删除某个控件上的所有子控件
  10. 24. Django部署:项目部署
  11. Swift 再等等?我的答案是:快上车
  12. JNPF开发平台3.3企业版框架 力软敏捷开发框架源码7.0.6旗舰版
  13. 一个新人眼中的O2O
  14. 从csv文件中导入数据到MySQL数据库
  15. Web前端开发需要具备哪些职责?需要掌握哪些技能?
  16. MSRA的2022秋招各大厂SSP+ offer
  17. SRS(简单实时视频服务) 笔记(3)- 配置文件和Http回调
  18. win10 uwp 如何使用DataTemplate
  19. ZUST ACM 学生博客链接【浙江科技学院】
  20. JAVA笔记:shape类(接口,继承,lambda表达式...)

热门文章

  1. ansible基本模块-server
  2. event事件的坐标 offsetWidth client scroll
  3. 解决 IntelliJ 乱码问题
  4. 存储过程插入100 条数据
  5. 运用C#在采集时进行自动验证登录[转]
  6. 数据库 事务提交和回滚
  7. Linux CentOS安装JDK
  8. 讨论下怎么能实现保证新闻时效性推荐算法机制
  9. MYSQL数据库字符编码设置
  10. Struts2笔记——通配符和动态方法调用