broadcast


官方文档描述:

Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. The variable will be sent to each cluster only once.

源码分析:

这里使用告警方式代替异常,为了是避免用户进程中断;可能有用户创建了广播变量但是没有使用他们;


  /*** Broadcast a read-only variable to the cluster, returning a* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.* The variable will be sent to each cluster only once.*/def broadcast[T: ClassTag](value: T): Broadcast[T] = {assertNotStopped()require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")val bc = env.broadcastManager.newBroadcast[T](value, isLocal)val callSite = getCallSitelogInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)cleaner.foreach(_.registerBroadcastForCleanup(bc))bc}

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

实例


List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data);
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    List<Integer> iList = broadcast.value();    @Override    public Integer call(Integer v1) throws Exception {        Integer isum = 0;        for(Integer i : iList)            isum += i;        return v1 + isum;    }
});
System.out.println(result.collect());

accumulator


源码分析:

// Methods for creating shared variables/*** Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"* values to using the `+=` method. Only the driver can access the accumulator's `value`.*/@deprecated("use AccumulatorV2", "2.0.0")def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {val acc = new Accumulator(initialValue, param)cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))acc}/*** Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the* driver can access the accumulator's `value`.*/@deprecated("use AccumulatorV2", "2.0.0")def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T] = {val acc = new Accumulator(initialValue, param, Some(name))cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))acc}

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和sum。Spark原生地只支持数字类型的累加器,开发者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程(对于Python还不支持) 。
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

class VectorAccumulatorParam implements AccumulatorParam<Vector> {    @Override    //合并两个累加器的值。//参数r1是一个累加数据集合//参数r2是另一个累加数据集合public Vector addInPlace(Vector r1, Vector r2) {r1.addAll(r2);return r1;    }    @Override //初始值   public Vector zero(Vector initialValue) {        return initialValue;    }    @Override//添加额外的数据到累加值中//参数t1是当前累加器的值//参数t2是被添加到累加器的值    public Vector addAccumulator(Vector t1, Vector t2) {        t1.addAll(t2);        return t1;    }
}
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
Vector initialValue = new Vector();
for(int i=6;i<9;i++)    initialValue.add(i);
//自定义累加器
final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam());
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    @Override    public Integer call(Integer v1) throws Exception {        accumulator.add(1);        Vector term = new Vector();        term.add(v1);        accumulator1.add(term);        return v1;    }
});
System.out.println(result.collect());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());

参考文章:


https://www.cnblogs.com/jinggangshan/p/8117155.html

转载于:https://www.cnblogs.com/AlanWilliamWalker/p/10960858.html

spark变量使用broadcast、accumulator相关推荐

  1. 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数

    第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...

  2. Spark 基础 —— sc.broadcast

    broadcast:广播,扩散. broadcast,广播,顾名思义,就是分布式集群环境下,将数据从一个节点发送到其他各个节点上去.这样的场景很多,比如 driver 上有一张表,其他节点上运行的 t ...

  3. 理解spark闭包以及broadcast(转载)

    什么叫闭包: 跨作用域访问函数变量. 又指的一个拥有许多变量和绑定了这些变量的环境的表达式(通常是一个函数),因而这些变量也是该表达式的一部分. Spark闭包的问题引出:  在spark中实现统计L ...

  4. spark hint中Broadcast Hints、COALESCE and REPARTITION Hints

    spark默认的hint只有以下5种 COALESCE and REPARTITION Hints(两者区别比较) Spark SQL 2.4 added support for COALESCE a ...

  5. spark中的广播变量broadcast

    Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...

  6. Spark广播变量Broadcast

    注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...

  7. Spark共享变量(广播变量、累加器)

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...

  8. 9、共享变量(Broadcast Variable和Accumulator)

    一.共享变量 1.共享变量工作原理 Spark一个非常重要的特性就是共享变量.默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中.此时每个task只能操 ...

  9. spark broadcast的TorrentBroadcast实现

    在spark中,默认采用的broadcast的方式Torrent方式,其实现方式也是TorrentBroadcast类,当通过spark上下文调用broadcast广播某数据时,将会生成唯一的broa ...

最新文章

  1. 【Android 应用开发】 ActionBar 样式详解 -- 样式 主题 简介 Actionbar 的 icon logo 标题 菜单样式修改
  2. java中上转型概念_在java 中,什么叫向上转型?
  3. spring定时器(@Scheduled)
  4. linux rsync删文件速度,Linux下使用rsync最快速删除大量文件的方法
  5. linux中fcntl()、lockf、flock的区别
  6. Js中Array对象
  7. 三星上线售价99万8K电视 网友:是不是多打了一位数
  8. java点击按钮在控制台输出_java 怎么获取控制台的数据并且输出到GUI上
  9. 使用next.js完成从开发到部署
  10. 始于颜值 敬于才华 合于性格 久于善良 终于人品
  11. mysql 右模糊_面试小知识:MySQL索引相关
  12. mysql zf_ZF 连接 Mysql
  13. 谷歌浏览器离线更换皮肤-安装谷歌浏览器插件与问题解决
  14. O2O(online to offline)营销模式
  15. 前端访问后端本地资源
  16. 我的一些关于职场和职业发展的心得
  17. UVa10019:Funny Encryption Method
  18. F. Equalize the Array【学习进度条2】
  19. axios请求415错误Uncaught (in promise) Error: Request failed with status code 415
  20. 【MM32F5270开发板试用】六、如何用 星辰内核 + 国产RTOS 通过I2S播放 “星辰大海”

热门文章

  1. Matlab scatter 如何显示不同颜色点状
  2. Launcher3删除开机引导页面
  3. 【AngularJS】—— 4 表达式
  4. POJ 1028 Web Navigation
  5. Dell 2950服务器CPU-E1422错误解决方法
  6. 面临找工作,要记住给自己的这些话。
  7. powerdesigner箭头如何画_用Scratch编程画几何图形:如何画多边形
  8. 谷歌浏览器安卓版_谷歌翻译(在线翻译)下载-谷歌翻译下载安装安卓版v5.12.0...
  9. 结构体内指针数组调用_指针的这些技巧你都掌握了吗
  10. sqlalchemy exists 子查询_学好ORM框架SQLAlchemy面试必问