2019独角兽企业重金招聘Python工程师标准>>>

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C

mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

看下面例子:

def main(args: Array[String]): Unit = {//默认分区12个val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))val rdd1 = sc.makeRDD(Array((1, "A"), (1, "B"), (2, "A"), (2, "D"), (3, "E"), (1, "A")))rdd1.combineByKey((v: String) => List(v),(c: List[String], v: String) => c.::(v),(c1: List[String], c2: List[String]) => c1 ::: c2,2).collect.foreach(println(_))}

16/12/20 15:59:37 INFO DAGScheduler: ResultStage 1 (collect at ShellTest.scala:27) finished in 0.216 s
16/12/20 15:59:37 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:27, took 1.356017 s
(2,List(A, D))
(1,List(A, B, A))
(3,List(E))
16/12/20 15:59:37 INFO SparkContext: Invoking stop() from shutdown hook

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.

直接看例子:

 
  1. scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
  2. scala> rdd1.foldByKey(0)(_+_).collect
  3. res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
  4. //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
  5. //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
  6. //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

 
  1. scala> rdd1.foldByKey(2)(_+_).collect
  2. res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
  3. //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
  4. //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

 
  1. scala> rdd1.foldByKey(0)(_*_).collect
  2. res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
  3. //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
  4. //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
  5. //其他K也一样,最终都得到了V=0
  6. scala> rdd1.foldByKey(1)(_*_).collect
  7. res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
  8. //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。

在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。

转载于:https://my.oschina.net/chensanti234/blog/809084

Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey相关推荐

  1. Spark中的键值对操作-scala

    1.PairRDD介绍 Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRD ...

  2. Redis命令介绍之键值对操作

    前文已经提及过Redis中对于键值对操作的一些命令,如DEL.EXPIRE / PEXPIRE.TTL / PTTL以及EXISTS.今天我们继续介绍Redis中对键值对操作的相关命令. TYPE T ...

  3. Spark 杂记--- 键值对操作RDD

    1. 将一个普通的RDD转换为键值对RDD时,可以通过调用map()函数来实现,传递的函数需要返回键值对.   scala 版: scala> val lines =sc.parallelize ...

  4. Redis中的键值过期操作

    1.过期设置 Redis 中设置过期时间主要通过以下四种方式: expire key seconds:设置 key 在 n 秒后过期: pexpire key milliseconds:设置 key ...

  5. java redis 过期_Redis中的键值过期操作

    1.过期设置 Redis 中设置过期时间主要通过以下四种方式: expire key seconds:设置 key 在 n 秒后过期: pexpire key milliseconds:设置 key ...

  6. Python 字典创建、更新、按键值排序、取最大键值对等操作

    1. 字典创建 In [1]: d = {}In [2]: d Out[2]: {}In [3]: d = dict()In [4]: d Out[4]: {}In [5]: dict(a=1,b=2 ...

  7. Redis 键值过期操作

    过期设置 Redis 中设置过期时间主要通过以下四种方式: expire key seconds:设置 key 在 n 秒后过期: pexpire key milliseconds:设置 key 在 ...

  8. Mysql的键值对操作ELT FIELD

    返回索引值对应的字符串 ELT(N,str1,str2,str3,...)如果N =1返回str1; 如果N= 2返回str2; 如果参数的数量小于1或大于N返回NULL; 按照索引进行返回值mysq ...

  9. javascript 键值转换

    for (var i = 0; i < headerFields.length; i++) {fieldToIndex[headerFields[i]] = i;} 转载于:https://ww ...

  10. java字典转描述_java固定键值转换,使用枚举实现字典?

    胡子哥哥 public enum MyDict {    ChineseEnglish(0, "汉语词典"),    EnglishChinese(1,"英汉词典&quo ...

最新文章

  1. 【全网之最】JavaScript中字符串以特定字符分隔开之后,获取最后一个分割出来的字符串,多用于获取文件的后缀名(格式)
  2. [C/C++标准库]_[初级]_[优先队列priority_queue的使用]
  3. 工艺仿真软件_【技术简讯】电解抛光仿真软件Elsyca EPOS技术简介
  4. BaseActivity的抽取
  5. 计组—缓存Cache
  6. 13篇顶会!25岁成985高校博导,入职半年发ICML,网友:万点暴击
  7. __asm__ __volatile__ 嵌入式内嵌汇编语法解构
  8. netfilter与用户空间通信
  9. android 获取系统所有安装的应用程序
  10. 04---项目后端业务实现
  11. 腾讯云 接口验签 使用qcloud-java-sdk
  12. MySQL有哪些“饮鸩止渴”提高性能的方法?
  13. 微信公众号学习--点亮图片
  14. flutter检测网络状态
  15. sigsuspend 与sigwait 的区别
  16. linux蓝牙接收文件路径,Linux 蓝牙系列(3) 蓝牙传输文件测试
  17. 大一初学c语言——程序设计基础
  18. 学习笔记(5):C#急速入门-单行和多行注释
  19. 大数据融合与数据仓库 -- 一些思考
  20. OpenMP 不允许使用 != 操作

热门文章

  1. Atitit attilax在自然语言处理领域的成果
  2. atitit.web 推送实现方案集合(2)---百度云,jpush 极光推送 ,个推的选型比较.o99
  3. Atitit.可视化编程jbpm6 的环境and 使用总结...
  4. paip.提升用户体验---c++ QLabel标签以及QLineEdit文本框控件透明 设置
  5. paip.vs2010 开发ASP浏览时的设置
  6. 创金合信基金公司:专户模式案例
  7. 汇添富基金总经理张晖:以高质量发展打造中国最受认可的资产管理品牌
  8. 申通完美支撑“双11”亿级包裹背后的云基础设施
  9. 毕设题目:Matlab语音识别
  10. 【图像处理】基于matlab GUI图像全局+局部美化【含Matlab源码 1461期】