Spark 广播变量BroadCast
一、 广播变量
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
二、为什么使用广播变量
在默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,
map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一下子就耗费掉1G的内存。对性能会有什么影响呢?不必要的内存的消耗和占用,就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;就只能写入磁盘,最后导致后续的操作在磁盘IO上消耗性能;
你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,GC。GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁GC的话,对Spark作业的运行的速度会有相当可观的影响。
优点:
不是每个task一份副本,而是变成每个节点Executor上一个副本。
1.举例来说:
50个Executor 1000个task。 一个map10M
默认情况下,1000个task 1000个副本
1000 * 10M = 10 000M = 10 G
10G的数据,网络传输,在集群中,耗费10G的内存资源。
如果使用 广播变量,
50个Executor ,50个副本,10M*50 = 500M的数据。
网络传输,而且不一定是从Drver传输到各个节点,还可能是从就近的节点
的Executor的BlockManager上获取变量副本,网络传输速度大大增加。
之前 10000M 现在 500M。
20倍网络传输性能的消耗。20倍内存消耗的减少。
三、如何使用
开始使用broadcast变量,使用完后,程序结束记得释放
sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME)broadCastForLog = Nonetry:broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc)elogging.initLogFromDict(broadCastForLog.value)except StandardError:pass.......#执行完程序逻辑,记得释放该变量if broadCastForLog is not None:broadCastForLog.unpersist(False)
#获取要被共享的大变量,这里是log配置
class ELogForDistributedApp(object):LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json"@staticmethoddef setLogConf2BroadCast(sc):logFilePath = ELogForDistributedApp.LOGHDFSPATHif sc is not None:configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc)broadCast = sc.broadcast(configDict)#globals()['broadCast'] = broadCast#elogging.initLogFromDict(broadCast.value)return broadCast#print broadCast.valueelse:return None
def initLogFromDict(self):elogging.initLogFromDict(self.eloggingConfig)
从hdfs中找到相应配置文件
class HDFSOperation(object):@staticmethoddef getConfigFromHDFS(hdfsPath,sc):if sc is not None:filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystemhadoop_configuration = sc._jsc.hadoopConfiguration()fs =filesystem_class.get(hadoop_configuration)path_class = sc._gateway.jvm.org.apache.hadoop.fs.PathpathObj = path_class(hdfsPath)try:hdfsInStream = fs.open(pathObj)bufferedReader_class = sc._gateway.jvm.java.io.BufferedReaderinputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReaderbufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream))except IOError,msg:print str(msg)return Noneelse:return NoneconfigStr = ''while True:tmpStr = bufferedReader.readLine()if tmpStr == None:breakconfigStr += tmpStrtry:confDict = json.loads(configStr)except IOError,msg:print str(msg)return Nonereturn confDict
参考文档
- Spark Programming Guide1.6.3
- How can I update a broadcast variable in spark streaming?
Spark踩坑记——共享变量
转载于:https://www.cnblogs.com/ITtangtang/p/7967902.html
Spark 广播变量BroadCast相关推荐
- Spark广播变量Broadcast
注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...
- spark共享变量(广播变量Broadcast Variable,累加器Accumulators)
2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...
- Spark广播变量实现原理及基础编程
Spark广播变量实现原理及基础编程 实现原理 广播变量用来高效分发较大的对象.向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用.比如,如果你的应用需要向所有节点发送一个较大的 ...
- Spark广播变量使用示例
Spark广播变量使用示例 实现原理 广播变量用来高效分发较大的对象.向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用.比如,如果你的应用需要向所有节点发送一个较大的只读查询表 ...
- Spark广播变量之超大表left join小表时如何进行优化以及小表的正确位置
Spark广播变量之大表left join小表时如何进行优化以及小表的正确位置放置,带着这个目标我们一探究竟. 项目场景: 最近工作中遇到一个场景: 有一个超大表3.5T和一个小表963K 需要做关联 ...
- spark中的广播变量broadcast
Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...
- spark广播变量的原理_spark使用广播变量
import java.io.{File, FileReader} import java.util import org.apache.spark.SparkConf import org.apac ...
- spark 广播变量大数据_Spark基础知识(三)--- Spark的广播变量和累加器
在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些 ...
- spark广播变量的使用(转)
环境: ubuntu16.04 64 伪分布式 使用的spark是2.3.1 scala 2.11.8 参考连接: https://blog.csdn.net/android_xue/article/ ...
- spark广播变量 和 累加器
1 为什么使用广播变量 和 累加器 变量存在的问题:在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的 ...
最新文章
- 第一章 DevOps概述
- 高级Java必看的10本书
- 浅析JavaScript解析赋值、浅拷贝和深拷贝的区别
- DotNetCore跨平台~发布脚本PowerShell的设计
- linux测试dvi接口,TeX Live DVI字体数据解析整数溢出漏洞
- Linux环境下基于策略的路由
- CentOS7 上以 RPM 包方式安装 Oracle 18c 单实例
- 如何const定义一个不可变数组
- C语言 进制转换 将十进制转换为任意进制
- 计算机工具栏文件夹选项在哪里,windows10系统下工具栏里找不到文件夹选项如何解决...
- 解决APKIDE(APK改之理)运行出现ApkIDE Exception:…报错的问题
- 移动app原型设计工具:Flinto for Mac
- 亚马逊防关联方法适合shopee平台吗?
- 读【选修计算机专业的伤与痛】
- Task 4 用户输入->知识库的查询语句
- TXT文件批量生成二维码
- 【Auto.js】QQ名片点赞
- 技术人的充电时刻,24个精彩研发案例在等你
- 十二星座物语,女生最喜欢的星座性格【10】
- One PUNCH Man——变量显著性检验
热门文章
- C++实现一个不能被继承的类
- keyshot分辨率多少合适_分辨率单位及换算详解
- STL标准模版库--vector的操作集合
- MySQL incompatible with sql_mode=only_full_group_by 问题解决
- element step控件点击事件
- java.util.stream.IntStream
- MyBatis的XML配置文件(一)
- Numpy系列(五)给数组增加一个维度
- Python import其他文件夹的文件
- day17 10.jdbc的crud操作