RDD累加器和广播变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark提供了两种类型的变量:
1.累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)
2.广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

累加器

不使用累加器

使用累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。

val xx: Accumulator[Int] = sc.accumulator(0)

代码演示

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}object AccumulatorTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//使用scala集合完成累加var counter1: Int = 0;var data = Seq(1,2,3)data.foreach(x => counter1 += x )println(counter1)//6println("+++++++++++++++++++++++++")//使用RDD进行累加var counter2: Int = 0;val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]dataRDD.foreach(x => counter2 += x)println(counter2)//0//注意:上面的RDD操作运行结果是0//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!//如果解决?---使用累加器val counter3: Accumulator[Int] = sc.accumulator(0)dataRDD.foreach(x => counter3 += x)println(counter3)//6}
}

广播变量

不使用广播变量

使用广播变量

代码演示

package cn.itcast.coreimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object BroadcastVariablesTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//不使用广播变量val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))//根据水果编号取水果名称val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))fruitNames.foreach(println)//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,//那么会导致,被各个Task共用到的fruitMap会被多次传输//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可//如何做到?---使用广播变量println("=====================")val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))fruitNames2.foreach(println)}
}

SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!相关推荐

  1. Spark 的共享变量之累加器和广播变量

    前言 本期将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量. 简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的. 学习目标 闭包的概念 累加器的原理 广播变量 ...

  2. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  3. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  4. Spark共享变量(广播变量、累加器)

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...

  5. spark共享变量(广播变量Broadcast Variable,累加器Accumulators)

    2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...

  6. 【大数据开发】SparkCore——利用广播变量优化ip地址统计、Spark2.x自定义累加器

    文章目录 一.Broadcast广播变量 1.1 广播变量的逻辑过程 1.2 [优化ip地址统计](https://blog.csdn.net/weixin_37090394/article/deta ...

  7. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

  8. 031 广播变量与累加器

    1.广播变量机制 将传递给task的值,变成传递给executor. 为什么可以共用,因为task是executor下的线程. 只读的变量,在task中不允许修改 2.累加器介绍 在只写的变量,在ta ...

  9. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

最新文章

  1. 【Ray Tracing The Next Week 超详解】 光线追踪2-6 Cornell box
  2. 用 Java 写一个植物大战僵尸简易版!
  3. 【渝粤教育】电大中专幼儿园课程论 (7)作业 题库
  4. mysql查询某张表的所有外键_oracle中查询所有外键引用到某张表的记录
  5. view类不响应自定义消息_安卓平台如何给控件添加自定义操作?
  6. 如何利用System.Net.Mail类发送EMAIL
  7. 中移动飞信2010Beta1.0体验版
  8. SVN中trunk,branches,tags用法详解
  9. 接口测试及常用接口测试工具
  10. 视频流中的DTS/PTS到底是什么?
  11. ​最强全集,数据科学领域,那些你不能不知道的大咖们!
  12. 桌面快捷方式图标异常怎么办
  13. 1一9数字行书写法_1一9数字行书写法
  14. 电脑桌面不见了怎么办?只能调出任务管理器!
  15. vue slot具名插槽
  16. 中南大学杰出校友_杰出客户服务的10个要点。
  17. 专业技术职务代码-GBT8561-2001
  18. 怎么关闭计算机硬件加速,怎么关闭硬件加速?关闭硬件加速的操作技巧分享
  19. ctf新手总结--web做题
  20. mysql threads create_MySql轻松入门系列——第一站 从源码角度轻松认识mysql整体框架图...

热门文章

  1. redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池
  2. SIM卡的密码PIN与PUK密码PIN
  3. vue中template的三种写法
  4. 2022-02-13 机器学习基本概念
  5. python数据分析与应用第五章实训 2_第五章实训(二)
  6. kindle出现电池感叹号,充电黄灯亮,怎么解决?按AWZ客服的回复弄好了。
  7. AutoHotKey 新手入门教程
  8. 商业智能助力 银行业数据“挖金”
  9. Appium-Long Press(长按)
  10. 自定义UTI注册自己的APP