spark reduceByKey源码解析
使用关联和可交换的归约函数合并每个key的value。 在将结果发送给reducer之前,这还将在每个Mapper上本地执行合并,类似于MapReduce中的“ combiner”。
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}
调用了combineByKeyWithClassTag
,继续查看
::实验::通用函数,使用一组自定义的聚合函数来组合每个键的元素。
对于“组合类型” C,将RDD [(K,V)]转换为RDD [(K,C)]类型的结果
用户提供三个功能:
createCombiner,它将V变成C(例如,创建一个元素列表)
mergeValue,将V合并为C(例如,将其添加到列表的末尾)
mergeCombiners,将两个C合并为一个。
此外,用户可以控制输出RDD的分区,以及是否执行map侧聚合(如果一个映射器可以使用相同的键产生多个项目)。
注意:
V和C可以不同-例如,可以将类型(Int,Int)的RDD分组为类型(Int,Seq [Int])的RDD。
def combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0if (keyClass.isArray) {if (mapSideCombine) {throw new SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("HashPartitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](self.context.clean(createCombiner),self.context.clean(mergeValue),self.context.clean(mergeCombiners))if (self.partitioner == Some(partitioner)) {self.mapPartitions(iter => {val context = TaskContext.get()new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))}, preservesPartitioning = true)} else {new ShuffledRDD[K, V, C](self, partitioner).setSerializer(serializer).setAggregator(aggregator).setMapSideCombine(mapSideCombine)}}
spark reduceByKey源码解析相关推荐
- spark word2vec 源码详细解析
spark word2vec 源码详细解析 简单介绍spark word2vec skip-gram 层次softmax版本的源码解析 word2vec 的原理 只需要看层次哈弗曼树skip-gram ...
- spark源码解析之基本概念
从两方面来阐述spark的组件,一个是宏观上,一个是微观上. 1. spark组件 要分析spark的源码,首先要了解spark是如何工作的.spark的组件: 了解其工作过程先要了解基本概念 官方罗 ...
- Spark任务提交后是如何完成提交过程的?源码解析!
Spark任务提交后是如何完成提交过程的?源码解析! 我们熟知的提交命令: sparksubmit v class xxx master spark://xxx7077 .... 然后我们 ...
- spark shell 删除失效_Spark任务提交源码解析
1. 前言 反反复复捣鼓了很久,终于开始学习Spark的源码了,果不其然,那真的很有趣.这里我打算一本正经的胡说八道来讲一下Spark作业的提交过程. 基础mac系统基础环境如下: JDK 1.8 I ...
- 第42课: Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践
第42课: Spark Broadcast内幕解密:Broadcast运行机制彻底解密.Broadcast源码解析.Broadcast最佳实践 Broadcast在机器学习.图计算.构建日常的各种算 ...
- [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案
[源码解析] 深度学习分布式训练框架 horovod (11) - on spark - GLOO 方案 文章目录 [源码解析] 深度学习分布式训练框架 horovod (11) --- on spa ...
- [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark
[源码解析] 深度学习分布式训练框架 horovod (10) - run on spark 文章目录 [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark ...
- Spark特征处理之RFormula源码解析
##RFormula简单介绍 RFormula通过R模型公式来操作列. 支持R操作中的部分操作包括'~', '.', ':', '+'以及'-'. 1. ~分隔目标和对象2. +合并对象," ...
- Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x
文章目录 Spark ALS recommendForAll源码解析实战 1. 软件版本: 2. 本文要解决的问题 3. 源码分析实战 3.1 Spark2.2.2 ALS recommendForA ...
最新文章
- 第1课 - make和makefile
- pcb二次钻孔_线路板中的二次孔是什么?线路板钻孔有哪些常见问题?
- VS2015中DataGridView的DataGridViewComBoboxCell列值无效及数据绑定错误的解决方法
- 关于selecteditem.value和selecteditem.text
- jenkins插件调用job_【Jenkins插件_实践】1.Job Import Plugin迁移Jobs
- java 平均分割list_Java 实现将List平均分成若干个集合
- Python入门学习笔记(7)
- oracle升级补丁报错,oracle rac升级补丁及中间的错误提示
- R开发环境(Eclipse+StatET)
- linux 64 mysql下载官网_Linux下安装MySQL5.7
- ckfinder 配置 php,GitHub - itxq/ckfinder: CkFinder3.5.1 for PHP 优化版 (添加又拍云存储)...
- 再安利几个看片追剧的App
- Android系统结构
- 项目管理-WBS与RACI的使用
- Scrapy爬虫轻松抓取网站数据
- Android 后台服务(Service)
- Python 城市分类
- matlab 半高斯拟合,高斯曲线拟合求半宽高
- 摩拜单车,还能走多远?
- IBM Thinkpad 相关软件详解
热门文章
- python跟php服务器对比_python学习笔记一和PHP的一些对比
- illustrator插件开发指南pdf_Jenkins之pipeline开发工具
- linux服务器学习笔记:linux忘记密码怎么办?
- python读取成功_Python如何从文件读取数据()
- studioone唱歌效果精调_Sidechain是如何工作的,为什么sidechain可以带来如此酷炫的效果...
- pacman吃豆人_“植物河豚”狗爪豆,你吃过吗?
- php如何判断是ajax,php如何判断是ajax
- linux activemq 日志,log4j通过ActiveMQ远程记录日志设计配置
- python解读器_Python装饰器完全解读
- lua能在stm32arm上运行吗_IOS App能在Mac运行!苹果这黑科技能撼动微软吗?