Spark Java API:broadcast、accumulator
broadcast
官方文档描述:
Broadcast a read-only variable to the cluster, returning a
[[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
The variable will be sent to each cluster only once.
- 1
- 2
- 3
- 4
函数原型:
def broadcast[T](value: T): Broadcast[T]
- 1
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
源码分析:
def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { // This is a warning instead of an exception in order to avoid breaking user programs that // might have created RDD broadcast variables but not used them: logWarning("Can not directly broadcast RDDs; instead, call collect() and " + "broadcast the result (see SPARK-5063)") } val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
实例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data);
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() { List<Integer> iList = broadcast.value(); @Override public Integer call(Integer v1) throws Exception { Integer isum = 0; for(Integer i : iList) isum += i; return v1 + isum; }
});
System.out.println(result.collect());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
accumulator
官方文档描述:
Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can “add”
values to using the add
method. Only the master can access the accumulator’s value
.
函数原型:
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T]
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T]) : Accumulator[T]
- 1
- 2
- 3
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和sum。Spark原生地只支持数字类型的累加器,开发者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程(对于Python还不支持) 。
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
源码分析:
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) : Accumulator[T] = { val acc = new Accumulator(initialValue, param, Some(name)) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc
}
- 1
- 2
- 3
- 4
- 5
- 6
实例:
class VectorAccumulatorParam implements AccumulatorParam<Vector> { @Override //合并两个累加器的值。//参数r1是一个累加数据集合//参数r2是另一个累加数据集合public Vector addInPlace(Vector r1, Vector r2) {r1.addAll(r2);return r1; } @Override //初始值 public Vector zero(Vector initialValue) { return initialValue; } @Override//添加额外的数据到累加值中//参数t1是当前累加器的值//参数t2是被添加到累加器的值 public Vector addAccumulator(Vector t1, Vector t2) { t1.addAll(t2); return t1; }
}
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
Vector initialValue = new Vector();
for(int i=6;i<9;i++) initialValue.add(i);
//自定义累加器
final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam());
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { accumulator.add(1); Vector term = new Vector(); term.add(v1); accumulator1.add(term); return v1; }
});
System.out.println(result.collect());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());
Spark Java API:broadcast、accumulator相关推荐
- Spark Java API:foreach、foreachPartition、lookup
foreach 官方文档描述: Applies a function f to all elements of this RDD. 函数原型: def foreach(f: VoidFunction[ ...
- Spark Java API:Transformation
mapPartitions 官方文档描述: Return a new RDD by applying a function to each partition of this RDD. mapPart ...
- Spark Java API:Action
reduce 官方文档描述: Reduces the elements of this RDD using the specified commutative and associative bina ...
- 第103课:动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数
第103课:动手实战联合使用Spark Streaming.Broadcast.Accumulator实现在线黑名单过滤和计数 /* 王家林老师授课http://weibo.com/ilovepain ...
- 大白话说Java反射:入门、使用、原理,BAT 面试官 如何面试
写在最前面,我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家.扫码加微信好友进[程序员面试学习交流群],免费领取.也欢迎各位一起在群里探讨技术. 文章 ...
- java spark 环境_在 IntelliJ IDEA 中配置 Spark(Java API) 运行环境
1. 新建Maven项目 初始Maven项目完成后,初始的配置(pom.xml)如下: 2. 配置Maven 向项目里新建Spark Core库 xmlns:xsi="http://www. ...
- java日志:三、JCL使用
java日志:三.JCL使用 1 介绍 全称为Jakarta Commons Logging,是Apache提供的一个通用日志API.它是为"所有的Java日志实现",提供一个统一 ...
- 大白话说Java泛型:入门、使用、原理
文章首发于[博客园-陈树义],点击跳转到原文<大白话说Java泛型:入门.使用.原理> 远在 JDK 1.4 版本的时候,那时候是没有泛型的概念的.当时 Java 程序员们写集合类的代码都 ...
- 【腾讯面试题】Java集合:List、Set以及Map
Java集合:List.Set以及Map 概述 Collection接口 List:有序,可重复 ArraysList Vector LinkedList Set:无序,唯一 HashSet Link ...
最新文章
- C语言 #ifndef 引起的redefinition of xxx 问题解决
- python基础-字符串
- 微型计算机主机的主要组成部件是,微型计算机主机的主要组成部分有
- mysql存储过程知识点_知识点:Mysql 基本用法之存储过程
- Spring MVC中@Controller和@RequestMapping注解详解
- sql语句count_带COUNT()函数SQL SELECT语句
- 什么是RUTP协议?RUDP协议在视频会议系统中的应用
- 漫画:从今天开始,我不再买书了!
- 树莓派 与 Xbox360手柄 基于pygame 的一次邂逅
- java借口不同程序_「JAVA」多态的灵魂,面向接口的程序设计,这才是你该懂得的接口(interface)...
- 第1章 数据库应用系统开发方法
- Docker CentOS 7.x underlay网络 桥接网络配置
- 两种方法判断是否为移动端访问,跳转到对应wap页面
- Flutter Dio二次封装
- 计算机本科学生毕业设计课题,计算机科学与技术本科毕业设计选题.doc
- 三天打鱼两天晒网问题Java解法
- 和前司运维(张松)谈网络
- GTJ2018如何导出全部工程量_广联达GCCP5.0和GTJ2018软件新版发版通知
- 什么是国际期货主账户?如何操作?
- BMG5100千兆5G智能杆网关
热门文章
- mysql mybatis配置_mybatis详解 与配置mybatis+spring+mysql.doc
- 偏置面命令_UG10.0 入门图文教程——同步建模之移动面
- 在表示计算机内存储器容量时 1gb等于,在表示计算机内存储器容量时,1GB等于_________MB...
- cesium模型不遮挡点线面_cesium点线面测试数据
- test.php创建,基于phpstudy简单快速搭建一个php程序
- go语言高并发与微服务实战_go-micro+gin+etcd微服务实战之服务注册与发现
- jetson nano poe_工业POE交换机工作流程及使用注意事项介绍
- vb设计一个由计算机,计算机VB程序的设计第一章.ppt
- free mybatis 不生效_关于 Mybatis 设置懒加载无效的问题
- rtsp服务器搭建_直播系统搭建所用到的技术原理