9.spark core之共享变量
简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
- 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
- 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
- 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
- 累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes):country = lookupCountry(sign_count[0], signPrefixes.value)count = sign_count[1]return (country, count)countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes):country = lookupCountry(sign_count[0], signPrefixes.value)count = sign_count[1]return (country, count)val countryContactCounts = contactCounts.map{case (sign, count) => {val country = lookupInArray(sign, signPrefixes.value)(country, count)}}.reduceByKey((x, y) => x+y)countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {String sign = callSignCount._1();String country = lookupCountry(sign, signPrefixes.value());return new Tuple2(country, callSignCount._2()); }
}).reduceByKey(new SumInts());countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器
原理
- 累加器在Driver端定义赋初始值。
- 累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
- 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
- 驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)def extractCallSigns(line):global blankLines # 访问全局变量if (line == ""):blankLines += 1return line.split(" ")callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => {if (line == "") {blankLines += 1 //累加器加1}line.split(" ")
})callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
java
JavaRDD<String> rdd = sc.textFile(args[1]);final Accumulator<Integer> blankLines = sc.accumulator(0);JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) {if ("".equals(line)) {blankLines.add(1);}return Arrays.asList(line.split(" "));}
});callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
转载于:https://blog.51cto.com/12967015/2172863
9.spark core之共享变量相关推荐
- 2021年大数据Spark(十九):Spark Core的共享变量
目录 共享变量 广播变量 累加器 案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...
- rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)
spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...
- Spark Core
Spark Core DAG概念 有向无环图 Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系 ...
- 2021年大数据Spark(十八):Spark Core的RDD Checkpoint
目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...
- SparkSQL 与 Spark Core的关系
不多说,直接上干货! SparkSQL 与 Spark Core的关系 Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL). Spark SQL在Spark C ...
- sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)
本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...
- 分布式实时计算—Spark—Spark Core
原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...
- Spark源码和调优简介 Spark Core
作者:calvinrzluo,腾讯 IEG 后台开发工程师 本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了 ...
- Spark Core项目实战(3) | 页面单跳转化率统计
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
最新文章
- android viewgroup 事件,android中viewgroup的事件传递分析
- 数位dp 的简单入门
- 定时任务 ||​​​​​​​​​​​​​​ Cron表达式
- Python中小括号( )、中括号[ ]和大括号{}分别代表什么?
- 【Python】直接赋值、浅拷贝和深度拷贝解析
- 汇编语言——十六进制数据前加0规则
- Linux记录-CPU指标介绍
- 使用C语言读取properties文件V1.0
- JavaEE 7 正式发布
- HDU 5832——A water problem 2016CCPC网络赛1001
- 实现同步请求_图解 Promise 实现原理(二)—— Promise 链式调用
- git 使用_Git-介绍与使用
- Abp框架默认表中如何添加字段
- 数据库引擎 SQLite 发布的新行为准则,为何引众怒?
- python 计算两个经纬度的距离_使用经纬度和海拔(高程)计算两点之间的距离...
- SSM框架原理,作用及使用方法
- mysql静默安装_Windows环境Mysql静默安装
- 小柯-java 进阶
- 后台模拟页面登陆_微信直播小程序后台,微信朋友圈可直达小程序直播了
- 评论与回复数据表设计
热门文章
- iOS实现tableViewCell或collectionCell中点击界面按钮跳转
- 《C++游戏编程入门(第4版)》——1.12 习题
- 搜索引擎广告过滤Chrome插件
- Android WebView访问SSL证书网页(onReceivedSslError)
- 深入理解JavaScript系列(10):JavaScript核心(晋级高手必读篇)
- DZX2无法读取会员短消息
- 校验正确获取对象或者数组的属性方法(babel-plugin-idx/_.get)
- 2018年12月,华为HCNP大面积更新题目,军哥独家解题咯
- HP 服务器使用 SmartStart CD 引导安装 windows 2008 操作系统
- linux文本分析利器awk