广播变量和累加器

广播变量

  • 广播变量理解图

未使用广播变量:

package SparkRadioimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("redio").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val list = List[String]("honey","lucy")//Distribute a local Scala collection to form an RDD.val nameList: RDD[String] = sc.parallelize(List[String]("george", "honey", "lucy"))val result = nameList.filter(name => {!list.contains(name)})result.foreach(println)//george}
}

使用 广播变量

package SparkRadioimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("redio").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val list = List[String]("honey","lucy")//广播变量val bc: Broadcast[List[String]] = sc.broadcast(list)//Distribute a local Scala collection to form an RDD.val nameList: RDD[String] = sc.parallelize(List[String]("george", "honey", "lucy"))val result = nameList.filter(name => {val innerList: List[String] = bc.value!innerList.contains(name)})result.foreach(println)//george}
}

注意事项

  • 能不能将一个RDD使用广播变量广播出去?

不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

广播变量的意义

如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么只是每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。



累加器

  • 累加器理解图

package SparkRadioimport org.apache.spark.sql.SparkSessionobject AccumulatorTest {def main(args: Array[String]): Unit = {//另一种方式而已val spark: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()val sc = spark.sparkContextsc.setLogLevel("error")val rddd1 = sc.textFile("./data/words.txt")var i = 0val rdd2 = rddd1.map(one => {i += 1println(s"Executor i = $i")one})rdd2.collect()println(s"i = $i")}
}
package SparkRadioimport org.apache.spark.sql.SparkSessionobject AccumulatorTest {def main(args: Array[String]): Unit = {//另一种方式而已val spark: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()val sc = spark.sparkContextsc.setLogLevel("error")val accumulator = sc.longAccumulatorval rddd1 = sc.textFile("./data/words.txt")val rdd2 = rddd1.map(one => {accumulator.add(1)one})rdd2.collect()println(s"accumulator = ${accumulator.value}")//accumulator = 5}
}

注意事项

  • 累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。

累加器的意义

在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

Spark _15 _广播变量和累加器相关推荐

  1. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

  2. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  3. spark广播变量 和 累加器

    1 为什么使用广播变量 和 累加器 变量存在的问题:在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的 ...

  4. Spark 广播变量和累加器

    Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器 广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点.对 ...

  5. 【Spark】广播变量和累加器

    文章目录 一.Spark广播变量 二.累加器 Reference 一.Spark广播变量 多进程编程中,不同进程可以通过创建共享内存,进行进程间通信.而在分布式中,Spark通过[广播变量]和[累加器 ...

  6. Spark广播变量与累加器

    在之前的文章中,我介绍了flink广播状态,从而了解了flink广播状态实际上就是将一个流广播到下游所有算子之中.在本文中我将介绍spark中类似的概念,为了方便理解,先放张spark应用程序架构图. ...

  7. spark 广播变量大数据_Spark基础知识(三)--- Spark的广播变量和累加器

    在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些 ...

  8. Spark共享变量(广播变量、累加器)

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...

  9. 031 广播变量与累加器

    1.广播变量机制 将传递给task的值,变成传递给executor. 为什么可以共用,因为task是executor下的线程. 只读的变量,在task中不允许修改 2.累加器介绍 在只写的变量,在ta ...

最新文章

  1. js空对象undefined测试
  2. 服务端JavaScript之Rhino
  3. 【 MATLAB 】DFT的性质讨论(二)序列的循环移位及其 MATLAB 实现(频域方法)
  4. (POST请求中的三种数据请求格式.application/x-www-form-urlencoded和multipart/form-data和application/json)
  5. python mysql倒序_day40:MySQL:python操作mysql:pymysql模块SQL注入攻击
  6. windows远程连接ubuntu 黑屏_Windows跟Windows远程连接传输文件
  7. Sublime Text 3插件之Emmet:HTML/CSS代码快速编写神器
  8. 4固定在底部_礼堂椅厂家教你如何固定座椅
  9. 服务器imm口加载硬盘,ibm x3250 M4如何进IMM(远程管理口)
  10. python 语句简写_自学Python-语句之列表推导式
  11. 写给考完SDOI2016R2D1的自己
  12. .DateTimeToStr函数专用优化版
  13. django-orm的表操作.
  14. WPS2019 所有宏被禁,用启用宏要付费,如何免费使用
  15. wx.uploadFile上传图片 在正式环境无响应问题
  16. 用图片来搜索 教你玩转Google按图搜索
  17. Latex大括号左对齐
  18. potato电脑版连接不上_土豆电脑版-potato chat下载 v2.13.200323 电脑版 - 安下载
  19. R语言可视化——熵曲线
  20. java调起本地摄像头,利用openCV进行人脸识别(一)

热门文章

  1. CodeForces - 1345E Quantifier Question(dfs实现拓扑序)
  2. HDU- 2973 YAPTCHA(威尔逊定理)
  3. iphone双卡_放心了:IT之家实测,苹果iPhone 12支持双卡5G
  4. Python配置-virtualenv和conda的区别
  5. codeforces contest 1142
  6. PTA第3章-9 字符串转换成十进制整数 (15 分)
  7. IDA+OD双剑合璧=逆向无敌
  8. 反汇编程序导致程序crash的解决思路
  9. C++ Boost 学习资源列表
  10. 3_5 ResponsibilityChainMode 责任链模式