https://www.cnblogs.com/yongjian/p/6425772.html

概述

键值对RDD是Spark操作中最常用的RDD,它是很多程序的构成要素,因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。

创建

Spark中有许多中创建键值对RDD的方式,其中包括

  • 文件读取时直接返回键值对RDD
  • 通过List创建键值对RDD

在Scala中,可通过Map函数生成二元组

1
2
3
4
5
6
7
8
9
10
val listRDD = sc.parallelize(List(1,2,3,4,5))
val result = listRDD.map(x => (x,1))
result.foreach(println)
//结果
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)

键值对RDD的转化操作

基本RDD转化操作在此同样适用。但因为键值对RDD中包含的是一个个二元组,所以需要传递的函数会由原来的操作单个元素改为操作二元组。

下表总结了针对单个键值对RDD的转化操作,以 { (1,2) , (3,4) , (3,6) }  为例,f表示传入的函数

函数名 目的 示例 结果
reduceByKey(f) 合并具有相同key的值 rdd.reduceByKey( ( x,y) => x+y ) { (1,2) , (3,10) }
groupByKey() 对具有相同key的值分组 rdd.groupByKey() { (1,2) , (3, [4,6] ) }
mapValues(f) 对键值对中的每个值(value)应用一个函数,但不改变键(key) rdd.mapValues(x => x+1) { (1,3) , (3,5) , (3,7) }
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner) 使用不同的返回类型合并具有相同键的值 下面有详细讲解 -
flatMapValues(f) 对键值对RDD中每个值应用返回一个迭代器的函数,然后对每个元素生成一个对应的键值对。常用语符号化 rdd.flatMapValues(x => ( x to 5 ))

{ (1, 2) ,  (1, 3) ,   (1, 4) , (1, 5) ,  (3, 4) , (3, 5) }

keys() 获取所有key rdd.keys() {1,3,3}
values() 获取所有value rdd.values() {2,4,6}
sortByKey() 根据key排序 rdd.sortByKey() { (1,2) , (3,4) , (3,6) }

下表总结了针对两个键值对RDD的转化操作,以rdd1 = { (1,2) , (3,4) , (3,6) }  rdd2 = { (3,9) } 为例,

函数名 目的 示例 结果
subtractByKey 删掉rdd1中与rdd2的key相同的元素 rdd1.subtractByKey(rdd2) { (1,2) }
join 内连接 rdd1.join(rdd2)

{(3, (4, 9)), (3, (6, 9))}

leftOuterJoin 左外链接 rdd1.leftOuterJoin (rdd2)

{(3,( Some( 4), 9)), (3,( Some( 6), 9))}

rightOuterJoin 右外链接 rdd1.rightOuterJoin(rdd2)

{(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))}

cogroup 将两个RDD钟相同key的数据分组到一起 rdd1.cogroup(rdd2) {(1,([ 2],[])), (3, ([4, 6],[ 9]))}

combineByKey

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)

combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)

combineByKey( createCombiner, mergeValue, mergeCombiners)

函数功能:

聚合各分区的元素,而每个元素都是二元组。功能与基础RDD函数aggregate()差不多,可让用户返回与输入数据类型不同的返回值。

combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以,理解此函数对Spark如何操作RDD会有很大帮助。

参数解析:

createCombiner:分区内 创建组合函数

mergeValue:分区内 合并值函数

mergeCombiners:多分区 合并组合器函数

partitioner:自定义分区数,默认为HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

工作流程:

  1. combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。
  2. 如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值
  3. 如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并

代码例子:

//统计男女个数

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf ().setMaster ("local").setAppName ("app_1")
   val sc = new SparkContext (conf)
   val people = List(("男", "李四"), ("男", "张三"), ("女", "韩梅梅"), ("女", "李思思"), ("男", "马云"))
   val rdd = sc.parallelize(people,2)
   val result = rdd.combineByKey(
     (x: String) => (List(x), 1),  //createCombiner
     (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue
     (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners
   result.foreach(println)

结果

(男, ( List( 张三,  李四,  马云),3 ) )
(女, ( List( 李思思,  韩梅梅),2 ) )

流程分解:

解析:两个分区,分区一按顺序V1、V2、V3遍历

  • V1,发现第一个key=男时,调用createCombiner,即

    (x: String) => (List(x), 1)
  • V2,第二次碰到key=男的元素,调用mergeValue,即
    (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
  • V3,发现第一个key=女,继续调用createCombiner,即
    (x: String) => (List(x), 1)
  • … …
  • 待各V1、V2分区都计算完后,数据进行混洗,调用mergeCombiners,即
    (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))

add by jan 2017-02-27 18:34:39

以下例子都基于此RDD

1
2
3
4
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

reduceByKey(func)

reduceByKey(func)的功能是,使用func函数合并具有相同键的值。

比如,reduceByKey((a,b) => a+b),有四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),对具有相同key的键值对进行合并后的结果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对("spark",1)、("spark",2),a就是1,b就是2。

1
2
3
4
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)

  

groupByKey()

roupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5))。

1
2
3
4
5
6
7
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
//从上面执行结果信息中可以看出,分组后,value被保存到Iterable[Int]中
scala> pairRDD.groupByKey().foreach(println)
(Spark,CompactBuffer(11))
(Hive,CompactBuffer(1))
(Hadoop,CompactBuffer(1))

  

keys

keys只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{"spark","spark","hadoop","hadoop"}。

1
2
3
4
5
6
7
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark

  

values

values只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的RDD,采用keys后得到的结果是一个RDD[Int],内容是{1,2,3,5}。

1
2
3
4
5
6
7
8
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34
  
scala> pairRDD.values.foreach(println)
1
1
1
1

  

sortByKey()

sortByKey()的功能是返回一个根据键排序的RDD。

1
2
3
4
5
6
7
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)

  

mapValues(func)

我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。

1
2
3
4
5
6
7
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)

  

join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。对于这个实例,我们下面在spark-shell中运行一下:

1
2
3
4
5
6
7
8
9
10
11
12
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27
  
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27
  
scala> pairRDD1.join(pairRDD2)
res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32
  
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))

Spark 键值对RDD操作相关推荐

  1. Spark键值对RDD的转化操作

    1.1  键值对RDD的转化操作\\ 1.1.1  转化操作列表 针对一个Pair RDD的转化操作 : . 针对两个Pair RDD的转化操作 : 转载于:https://www.cnblogs.c ...

  2. spark编程基础--5.2键值对RDD

    键值对RDD的创建 常用的键值对转换操作 reduceByKey(func) groupByKey() keys values sortByKey() mapValues(func) join com ...

  3. php中数组的指针函数参数传递参数,循环语句、函数的参数及作用域、数组键值及指针操作函数(8月23日作业)...

    实例演示while(),do~while() 实例 /** * while循环 */ $num = 1; $sum = 0; while ($num <= 100) { $sum +=$num; ...

  4. 【Redis】Redis 哈希 Hash 键值对集合操作 ( 哈希 Hash 键值对集合简介 | 查询操作 | 增加操作 | 修改操作 )

    文章目录 一.哈希 Hash 键值对集合 二.查询操作 1.Redis 中查询 Hash 键值对数据 2.查询 Hash 键是否存在 3.查询 Hash 中所有的键 Field 4.查询 Hash 中 ...

  5. vue如何获取数组中的键值_vue中操作数组的相关方法

    1,锁定数组的长度(只读模式)[ Array.join() ] 2.将数组合并成字符串(返回字符串)[ Array.join() ] 3.返回逆序数组(倒叙排列数组)[ Array..reverse( ...

  6. Spark 杂记--- 键值对操作RDD

    1. 将一个普通的RDD转换为键值对RDD时,可以通过调用map()函数来实现,传递的函数需要返回键值对.   scala 版: scala> val lines =sc.parallelize ...

  7. Spark中的键值对操作-scala

    1.PairRDD介绍 Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRD ...

  8. Spark的RDD操作之Join大全

    一.RDD的Join操作有哪些? (一)Join:Join类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的.源代码如下: /** * Return an ...

  9. Spark——RDD操作详解

    转载自:https://blog.csdn.net/zhaojw_420/article/details/53261965 一.基本RDD 1.针对各个元素的转化操作  最常用的转化操作是map()和 ...

最新文章

  1. DL框架之MXNet :深度学习框架之MXNet 的简介、安装、使用方法、应用案例之详细攻略
  2. 类继承(c++细节篇六)
  3. C++走向远洋——26(项目二,2,构造函数与析构函数)
  4. 电压放大倍数公式运放_运放电路:同相放大还是反相放大?
  5. linux启动顺序怎么修改,怎样修改启动顺序?
  6. 阅读笔记,软件需求分析
  7. 小哥哥你有98K吗?利用Python制作一款多功能变声器!
  8. root用户连接mysql数据库出错 1045 access denied for user 'root'@'localhost' using password yes
  9. MongoDB,还有一个角度看数据
  10. MAC OS 如何安装命令行工具:Command Line Tools
  11. vb6.0动态加载odbc驱动(mysql数据源)_vb教程之在VB中动态加载ODBC数据源
  12. 微信养号防封攻略_防封群微信怎么卖
  13. 微信小程序漂亮按钮(中间矩形两端半圆形)
  14. 运行时设置数据库连接字符串
  15. TCP/IP网络编程 学习笔记_9 --域名系统(DNS)
  16. 什么是虚拟主机管理系统?
  17. 数据可视化技术有什么特点
  18. 魔兽世界服务端源码各个重要文件详细情况说明开服一条龙
  19. oracle 11 wm concat,【Oracle开发】关于11.2上WMSYS.WM_CONCAT的workaround
  20. uni-app实战教程

热门文章

  1. java jni linux_java jni实现linux环境下绑定硬件的License
  2. C++学习笔记(三)
  3. 软件工程学习笔记《目录》
  4. NAU8810相关问题
  5. 计算机网络(一)计算机网络体系
  6. Linux系统编程(二)孤儿进程和僵尸进程
  7. 树莓派第一次开机自动连接WIFI(不用显示屏方法)
  8. 【算法】【殊途同归】搜索算法之(深度优先 || 广度优先) (约束条件 || 限界函数)
  9. 1057 数零壹 (20 分)
  10. sys/queue.h分析(图片复制不过来,查看原文)