1.需求的出现

  当我们在driver端调度spark作用的过程中,需要向各个节点发送任务“数据”--Rdd,一个般一个Rdd会对应多个任务,没一个任务可以交给一个excutor执行,而一个excutor可以开启多个线程去计算,那么此时每个线程都要从Driver端获取Rdd,那样就会产生大量的副本,当需要向excutor传递大型变量的时候,就会产生大量的网络占用,而且多次序列化,与反序列化都会占用资源。

2.解决方案

  Spark采用了广播变量的方案,解决了产生副本过多的问题,driver会将在任务执行过程中需要发送的序列化变量对象进行切割,形成多个chunk,储存在BlockkManager中,每个excutor一样都会有一个blockManager,当excutor需要变量的时候首先会从自身的BlockManager中去寻找,如果没有才去Driver或者其他执行器进行抓取,这样就可以确保在一个excutor中只需要一份变量副本。也就减少了大量变量副本而产生的网络占用了。

验证实例:

一、不采用广播变量

1)定义方法可以监控Spark任务执行端的信息

def sendInfo(obj: Object, m: String, param: String) = {
val ip = java.net.InetAddress.getLocalHost.getHostAddress
val pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val tname = Thread.currentThread().getName
val classname = obj.getClass.getSimpleName
val objHash = obj.hashCode()
val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n"//发送数据给nc 服务器
val sock = new java.net.Socket("s101", 8888)
val out = sock.getOutputStream
out.write(info.getBytes())
out.flush()
out.close()
}

2)首先创建一个可序列化的dog类,(entends Serializable)

scala> class dog extends Serializable

3)创建一个dog对象,并将其作为变量传入spark作业任务中

//创建对象scala> val d = new dog
d: dog = dog@321b8863//创建rdd
scala> val rdd1 = sc.makeRDD(1 to 10 ,10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
val rdd2 = rdd1.map(e=>{sendInfo(d,"x","x");e})
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:31//触发Spark任务

 scala> rdd2.collect
 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)监控结果:

192.168.48.101/4140/Executor task launch worker-0/dog@1920051793/x(x)192.168.48.102/3355/Executor task launch worker-0/dog@867775714/x(x)
192.168.48.102/3360/Executor task launch worker-0/dog@634533975/x(x)
192.168.48.102/3355/Executor task launch worker-1/dog@1176307278/x(x)192.168.48.104/3364/Executor task launch worker-0/dog@1762941990/x(x)
192.168.48.104/3357/Executor task launch worker-0/dog@1176451488/x(x)192.168.48.103/3450/Executor task launch worker-0/dog@2076792302/x(x)
192.168.48.103/3445/Executor task launch worker-0/dog@1844856176/x(x)192.168.48.103/3445/Executor task launch worker-1/dog@1152883024/x(x)
192.168.48.103/3450/Executor task launch worker-1/dog@1619414885/x(x)

  通过dog对象的地址可以看出,每个executor的每个线程都会创建(反序列化,从Driver端抓取(是否会从其他executor抓取还有待验证))一个新的对象。

二、广播变量的实现

1)定义方法可以监控Spark任务执行端的信息

def sendInfo(obj: Object, m: String, param: String) = {
val ip = java.net.InetAddress.getLocalHost.getHostAddress
val pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
val tname = Thread.currentThread().getName
val classname = obj.getClass.getSimpleName
val objHash = obj.hashCode() val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n" //发送数据给nc 服务器 val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream out.write(info.getBytes()) out.flush() out.close() }

2)首先创建一个可序列化的dog类,(entends Serializable)

scala> class dog extends Serializable

3)创建一个dog对象,并将其作为变量传入spark作业任务中

//创建对象scala> val d = new dog
d: dog = dog@321b8863//创建广播变量scala> val d1 = sc.broadcast(d)//创建rdd
scala> val rdd1 = sc.makeRDD(1 to 10 ,10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
val rdd2 = rdd1.map(e=>{sendInfo(d1.value,"x","x");e}) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:31//触发Spark任务

 scala> rdd2.collect
 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)监控结果:(由于集群设计的问题导致很难看出结论)

192.168.48.104/3559/Executor task launch worker-0/dog@1092230054/x(x)
192.168.48.104/3559/Executor task launch worker-1/dog@1092230054/x(x)
192.168.48.104/3562/Executor task launch worker-0/dog@2084510396/x(x)192.168.48.103/3646/Executor task launch worker-0/dog@613174275/x(x)
192.168.48.103/3655/Executor task launch worker-0/dog@603565101/x(x)192.168.48.102/3561/Executor task launch worker-0/dog@369123253/x(x)
192.168.48.102/3561/Executor task launch worker-1/dog@369123253/x(x)
192.168.48.102/3546/Executor task launch worker-1/dog@2120472001/x(x)
192.168.48.102/3546/Executor task launch worker-0/dog@2120472001/x(x)192.168.48.101/5281/Executor task launch worker-0/dog@976431494/x(x)

看到这个结果开始我是有点懵的,为什么同一个节点的 worker-0和worker-1是同一个dog,但是同样的worker-0却是不同的dog。后来才想起来之前修改过配置文件spark-env.sh。

在每个节点启动了两个executor。也就是说 两个worker-0是属于不同的executor的所以,是不同的dog。

5)通过修改启动spark-shell时的参数配置,改变资源配置,实现一个节点只启动一个executor,一个executor启动多个线程。重复上述同样步骤

spark-shell --master spark://s101:7077 --executor-cores 4 --total-executor-cores 40

6)重复上述同样步骤,得到如下结果

192.168.48.104/4293/Executor task launch worker-2/dog@1746435851/x(x)
192.168.48.104/4293/Executor task launch worker-0/dog@1746435851/x(x)
192.168.48.104/4293/Executor task launch worker-1/dog@1746435851/x(x)192.168.48.102/4305/Executor task launch worker-1/dog@744462004/x(x)
192.168.48.102/4305/Executor task launch worker-0/dog@744462004/x(x)192.168.48.103/4374/Executor task launch worker-0/dog@1750215217/x(x)
192.168.48.103/4374/Executor task launch worker-1/dog@1750215217/x(x)
192.168.48.103/4374/Executor task launch worker-2/dog@1750215217/x(x)192.168.48.101/7610/Executor task launch worker-1/dog@1611268215/x(x)
192.168.48.101/7610/Executor task launch worker-2/dog@1611268215/x(x)

可以看出一个executor多个线程共享一个dog对象。

转载于:https://www.cnblogs.com/yanxun1/p/9800645.html

Spark 广播变量 TorrentBroadcast相关推荐

  1. Spark广播变量实现原理及基础编程

    Spark广播变量实现原理及基础编程 实现原理 广播变量用来高效分发较大的对象.向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用.比如,如果你的应用需要向所有节点发送一个较大的 ...

  2. Spark广播变量使用示例

    Spark广播变量使用示例 实现原理 广播变量用来高效分发较大的对象.向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用.比如,如果你的应用需要向所有节点发送一个较大的只读查询表 ...

  3. Spark广播变量之超大表left join小表时如何进行优化以及小表的正确位置

    Spark广播变量之大表left join小表时如何进行优化以及小表的正确位置放置,带着这个目标我们一探究竟. 项目场景: 最近工作中遇到一个场景: 有一个超大表3.5T和一个小表963K 需要做关联 ...

  4. Spark广播变量Broadcast

    注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...

  5. spark广播变量的原理_spark使用广播变量

    import java.io.{File, FileReader} import java.util import org.apache.spark.SparkConf import org.apac ...

  6. spark 广播变量大数据_Spark基础知识(三)--- Spark的广播变量和累加器

    在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些 ...

  7. spark广播变量的使用(转)

    环境: ubuntu16.04 64 伪分布式 使用的spark是2.3.1 scala 2.11.8 参考连接: https://blog.csdn.net/android_xue/article/ ...

  8. spark广播变量 和 累加器

    1 为什么使用广播变量 和 累加器 变量存在的问题:在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的 ...

  9. spark学习-Spark广播变量与共享变量(1)

    1,概念 ###1.1 广播变量: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量.广播变量可被用于有效地给每个节点一个大输入数据集的副本.Spark还尝试使用高效地广播 ...

  10. Spark 广播变量和累加器

    Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器 广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点.对 ...

最新文章

  1. hdu4911 简单树状数组
  2. 好色派:日省 6 小时,神策分析是我的“菜”
  3. 2.myql数据导入到solr,并建立solr索引(学习笔记)
  4. Python-day06-2018.7.9_编码以及小知识点补充
  5. .NET UIAutomation实现Word文档加密暴力破解
  6. 79. 单词搜索(dfs)
  7. 【ArcObject开发】实验:ArcObject地图开发基本操作
  8. 栈-线性表(代码、分析、汇编)
  9. python网络编程知识点_python 网络编程要点
  10. 怎样判断一个网站是不是前后端分离的?
  11. 二维haar小波matlab_洪泽湖入湖水沙序列的多时间尺度小波分析
  12. 系统学习NLP(二十六)--BERT详解
  13. 邮箱服务器端口以及各大型邮箱smtp服务器及端口收集
  14. 基数树(radix tree)
  15. Android学习路线(适合学生)
  16. ARTS-26(leetcode-119. 杨辉三角 II,AVOD最详细过程,Git用法,DMA原理,海天味业的企业估值(1))
  17. 22.3 MIDI 和音乐
  18. b站视频突破2倍方法,3倍?4倍?可以开10倍!!!
  19. UI设计初学者应该如何入门?
  20. 「镁客早报」三星第十一代商务旗舰W2019发布;美国实现120KW无线充电

热门文章

  1. Linux网络配置之虚拟网卡的配置(Red Hat 6.5)
  2. ImportError: No module named matplotlib.pyplot
  3. 烂泥:KVM虚拟机的关机与开启
  4. andrioid .9.png图片的制作
  5. KE之undefinded instruction问题记录
  6. linux6.8屏幕黑屏,centos6.8笔记本关盖就黑屏怎样设置不黑屏?
  7. linux内核奇遇记之md源代码解读之三
  8. mongo性能测试demo 代码正确运行
  9. python学习(五) 类
  10. 嵌入式软件工程师_嵌入式软件工程师适合去芯片公司吗?