broadcast


官方文档描述:

Broadcast a read-only variable to the cluster, returning a
[[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
The variable will be sent to each cluster only once.
  • 1
  • 2
  • 3
  • 4

函数原型:

def broadcast[T](value: T): Broadcast[T]
  • 1

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

源码分析:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {  assertNotStopped()  if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {    // This is a warning instead of an exception in order to avoid breaking user programs that    // might have created RDD broadcast variables but not used them:    logWarning("Can not directly broadcast RDDs; instead, call collect() and "      + "broadcast the result (see SPARK-5063)")  }  val bc = env.broadcastManager.newBroadcast[T](value, isLocal)  val callSite = getCallSite  logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)  cleaner.foreach(_.registerBroadcastForCleanup(bc))  bc
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data);
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    List<Integer> iList = broadcast.value();    @Override    public Integer call(Integer v1) throws Exception {        Integer isum = 0;        for(Integer i : iList)            isum += i;        return v1 + isum;    }
});
System.out.println(result.collect());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

accumulator


官方文档描述:

Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can “add” 
  values to using the add method. Only the master can access the accumulator’s value.

函数原型:

def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T]
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])   : Accumulator[T]
  • 1
  • 2
  • 3

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和sum。Spark原生地只支持数字类型的累加器,开发者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程(对于Python还不支持) 。 
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

源码分析:

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])  : Accumulator[T] = {  val acc = new Accumulator(initialValue, param, Some(name))  cleaner.foreach(_.registerAccumulatorForCleanup(acc))  acc
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

实例:

class VectorAccumulatorParam implements AccumulatorParam<Vector> {    @Override    //合并两个累加器的值。//参数r1是一个累加数据集合//参数r2是另一个累加数据集合public Vector addInPlace(Vector r1, Vector r2) {r1.addAll(r2);return r1;    }    @Override //初始值   public Vector zero(Vector initialValue) {        return initialValue;    }    @Override//添加额外的数据到累加值中//参数t1是当前累加器的值//参数t2是被添加到累加器的值    public Vector addAccumulator(Vector t1, Vector t2) {        t1.addAll(t2);        return t1;    }
}
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
Vector initialValue = new Vector();
for(int i=6;i<9;i++)    initialValue.add(i);
//自定义累加器
final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam());
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    @Override    public Integer call(Integer v1) throws Exception {        accumulator.add(1);        Vector term = new Vector();        term.add(v1);        accumulator1.add(term);        return v1;    }
});
System.out.println(result.collect());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());

Spark Java API:broadcast、accumulator相关推荐

  1. Spark Java API:foreach、foreachPartition、lookup

    foreach 官方文档描述: Applies a function f to all elements of this RDD. 函数原型: def foreach(f: VoidFunction[ ...

  2. Spark Java API:Transformation

    mapPartitions 官方文档描述: Return a new RDD by applying a function to each partition of this RDD. mapPart ...

  3. Spark Java API:Action

    reduce 官方文档描述: Reduces the elements of this RDD using the specified commutative and associative bina ...

  4. 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数

    第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...

  5. 大白话说Java反射:入门、使用、原理,BAT 面试官 如何面试

    写在最前面,我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家.扫码加微信好友进[程序员面试学习交流群],免费领取.也欢迎各位一起在群里探讨技术. 文章 ...

  6. java spark 环境_在 IntelliJ IDEA 中配置 Spark(Java API) 运行环境

    1. 新建Maven项目 初始Maven项目完成后,初始的配置(pom.xml)如下: 2. 配置Maven 向项目里新建Spark Core库 xmlns:xsi="http://www. ...

  7. java日志:三、JCL使用

    java日志:三.JCL使用 1 介绍 全称为Jakarta Commons Logging,是Apache提供的一个通用日志API.它是为"所有的Java日志实现",提供一个统一 ...

  8. 大白话说Java泛型:入门、使用、原理

    文章首发于[博客园-陈树义],点击跳转到原文<大白话说Java泛型:入门.使用.原理> 远在 JDK 1.4 版本的时候,那时候是没有泛型的概念的.当时 Java 程序员们写集合类的代码都 ...

  9. 【腾讯面试题】Java集合:List、Set以及Map

    Java集合:List.Set以及Map 概述 Collection接口 List:有序,可重复 ArraysList Vector LinkedList Set:无序,唯一 HashSet Link ...

最新文章

  1. C语言 #ifndef 引起的redefinition of xxx 问题解决
  2. python基础-字符串
  3. 微型计算机主机的主要组成部件是,微型计算机主机的主要组成部分有
  4. mysql存储过程知识点_知识点:Mysql 基本用法之存储过程
  5. Spring MVC中@Controller和@RequestMapping注解详解
  6. sql语句count_带COUNT()函数SQL SELECT语句
  7. 什么是RUTP协议?RUDP协议在视频会议系统中的应用
  8. 漫画:从今天开始,我不再买书了!
  9. 树莓派 与 Xbox360手柄 基于pygame 的一次邂逅
  10. java借口不同程序_「JAVA」多态的灵魂,面向接口的程序设计,这才是你该懂得的接口(interface)...
  11. 第1章 数据库应用系统开发方法
  12. Docker CentOS 7.x underlay网络 桥接网络配置
  13. 两种方法判断是否为移动端访问,跳转到对应wap页面
  14. Flutter Dio二次封装
  15. 计算机本科学生毕业设计课题,计算机科学与技术本科毕业设计选题.doc
  16. 三天打鱼两天晒网问题Java解法
  17. 和前司运维(张松)谈网络
  18. GTJ2018如何导出全部工程量_广联达GCCP5.0和GTJ2018软件新版发版通知
  19. 什么是国际期货主账户?如何操作?
  20. BMG5100千兆5G智能杆网关

热门文章

  1. mysql mybatis配置_mybatis详解 与配置mybatis+spring+mysql.doc
  2. 偏置面命令_UG10.0 入门图文教程——同步建模之移动面
  3. 在表示计算机内存储器容量时 1gb等于,在表示计算机内存储器容量时,1GB等于_________MB...
  4. cesium模型不遮挡点线面_cesium点线面测试数据
  5. test.php创建,基于phpstudy简单快速搭建一个php程序
  6. go语言高并发与微服务实战_go-micro+gin+etcd微服务实战之服务注册与发现
  7. jetson nano poe_工业POE交换机工作流程及使用注意事项介绍
  8. vb设计一个由计算机,计算机VB程序的设计第一章.ppt
  9. free mybatis 不生效_关于 Mybatis 设置懒加载无效的问题
  10. rtsp服务器搭建_直播系统搭建所用到的技术原理