简介

  spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。

  • 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
  • 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。

  spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。

  • 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
  • 累加器用于在驱动器中对数据结果进行聚合。

广播变量

原理

  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  • 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

用法

  • 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)

    实例

      查询每个国家的呼号个数

    python

# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes):country = lookupCountry(sign_count[0], signPrefixes.value)count = sign_count[1]return (country, count)countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

scala

// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes):country = lookupCountry(sign_count[0], signPrefixes.value)count = sign_count[1]return (country, count)val countryContactCounts = contactCounts.map{case (sign, count) => {val country = lookupInArray(sign, signPrefixes.value)(country, count)}}.reduceByKey((x, y) => x+y)countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

java

// 将呼号前缀(国家代码)作为广播变量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {String sign = callSignCount._1();String country = lookupCountry(sign, signPrefixes.value());return new Tuple2(country, callSignCount._2()); }
}).reduceByKey(new SumInts());countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

  • 累加器在Driver端定义赋初始值。
  • 累加器只能在Driver端读取最后的值,在Excutor端更新。

用法

  • 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
  • 驱动器程序可以调用累加器的value属性来访问累加器的值

实例

  累加空行

python

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)def extractCallSigns(line):global blankLines # 访问全局变量if (line == ""):blankLines += 1return line.split(" ")callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

scala

val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => {if (line == "") {blankLines += 1 //累加器加1}line.split(" ")
})callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)

java

JavaRDD<String> rdd = sc.textFile(args[1]);final Accumulator<Integer> blankLines = sc.accumulator(0);JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) {if ("".equals(line)) {blankLines.add(1);}return Arrays.asList(line.split(" "));}
});callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

转载于:https://blog.51cto.com/12967015/2172863

9.spark core之共享变量相关推荐

  1. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  2. rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)

    spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...

  3. Spark Core

    Spark Core     DAG概念         有向无环图         Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系 ...

  4. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  5. SparkSQL 与 Spark Core的关系

    不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...

  6. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  7. 分布式实时计算—Spark—Spark Core

    原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...

  8. Spark源码和调优简介 Spark Core

    作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...

  9. Spark Core项目实战(3) | 页面单跳转化率统计

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

最新文章

  1. android viewgroup 事件,android中viewgroup的事件传递分析
  2. 数位dp 的简单入门
  3. 定时任务 ||​​​​​​​​​​​​​​ Cron表达式
  4. Python中小括号( )、中括号[ ]和大括号{}分别代表什么?
  5. 【Python】直接赋值、浅拷贝和深度拷贝解析
  6. 汇编语言——十六进制数据前加0规则
  7. Linux记录-CPU指标介绍
  8. 使用C语言读取properties文件V1.0
  9. JavaEE 7 正式发布
  10. HDU 5832——A water problem 2016CCPC网络赛1001
  11. 实现同步请求_图解 Promise 实现原理(二)—— Promise 链式调用
  12. git 使用_Git-介绍与使用
  13. Abp框架默认表中如何添加字段
  14. 数据库引擎 SQLite 发布的新行为准则,为何引众怒?
  15. python 计算两个经纬度的距离_使用经纬度和海拔(高程)计算两点之间的距离...
  16. SSM框架原理,作用及使用方法
  17. mysql静默安装_Windows环境Mysql静默安装
  18. 小柯-java 进阶
  19. 后台模拟页面登陆_微信直播小程序后台,微信朋友圈可直达小程序直播了
  20. 评论与回复数据表设计

热门文章

  1. iOS实现tableViewCell或collectionCell中点击界面按钮跳转
  2. 《C++游戏编程入门(第4版)》——1.12 习题
  3. 搜索引擎广告过滤Chrome插件
  4. Android WebView访问SSL证书网页(onReceivedSslError)
  5. 深入理解JavaScript系列(10):JavaScript核心(晋级高手必读篇)
  6. DZX2无法读取会员短消息
  7. 校验正确获取对象或者数组的属性方法(babel-plugin-idx/_.get)
  8. 2018年12月,华为HCNP大面积更新题目,军哥独家解题咯
  9. HP 服务器使用 SmartStart CD 引导安装 windows 2008 操作系统
  10. linux文本分析利器awk