Spark—RDD编程常用转换算子代码实例

Spark rdd 常用 Transformation 实例:

1、def map[U: ClassTag](f: T => U): RDD[U]   将函数应用于RDD的每一元素,并返回一个新的RDD

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTestextendsApp{

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//map

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.map(_*10)

sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100

}

2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD

//filter

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.filter(_.

sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4

3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]   将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。

//flatMap

var source = sc.parallelize(1 to 5)

source.collect().foreach(e=>print(e+" "))//1 2 3 4 5

var sourceMap = source.flatMap(x=>(1to x))

sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//mapPartitions

var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))

source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)

var sourceMap =source.mapPartitions(partitionsFun)

sourceMap.collect().foreach(e=> print(e + " ")) //jams jack

}

def partitionsFun(iter:Iterator[(String,String)]): Iterator[String]={

var males=List[String]()while(iter.hasNext){

val next=iter.next()

next match {case (_,"male") => males =next._1::malescase _ =>}

}returnmales.iterator

}

}

5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]  将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//mapPartitionsWithIndex

var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))

source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)

var sourceMap =source.mapPartitionsWithIndex(partitionsFunWithIndex)

sourceMap.collect().foreach(e=> print(e + " ")) //[1]jams [1]jack

}

def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String]={

var males=List[String]()while(iter.hasNext){

val next=iter.next()

next match {case (_,"male") => males="["+index+"]"+next._1 :: malescase _ =>}

}

males.iterator

}

}

6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。

packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//sample

var source = sc.parallelize(1 to 10)

source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8 9 10

var sourceMap = source.sample(true,0.4,2)

sourceMap.collect().foreach(e=> print(e + " ")) //1 2 2

}

}

7、def union(other: RDD[T]): RDD[T]  将两个RDD中的元素进行合并,返回一个新的RDD

//union

var source = sc.parallelize(1 to 3)

source.collect().foreach(e=> print(e + " "))//1 2 3

var rdd = sc.parallelize(6 to 9)

var sourceMap=source.union(rdd)

sourceMap.collect().foreach(e=> print(e + " "))//1 2 3 6 7 8 9

8、def intersection(other: RDD[T]): RDD[T]  将两个RDD做交集,返回一个新的RDD

//intersection

var source = sc.parallelize(1 to 8)

source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8

var rdd = sc.parallelize(6 to 9)

var sourceMap=source.intersection(rdd)

sourceMap.collect().foreach(e=> print(e + " "))//6 8 7

9、def distinct(): RDD[T]  将当前RDD进行去重后,返回一个新的RDD

//distinct

var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))

source.collect().foreach(e=> print(e + " "))//1 1 2 2 3 3 4 4 5 5

var sourceMap =source.distinct()

sourceMap.collect().foreach(e=> print(e + " "))//4 2 1 3 5

10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]  根据设置的分区器重新将RDD进行分区,返回新的RDD

//partitionBy

var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)

source.collect().foreach(e=> print(e + " "))

print("分区数:"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分区数:4

var sourceMap = source.partitionBy(new HashPartitioner(2))

sourceMap.collect().foreach(e=> print(e + " "))

print("分区数:"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分区数:2

11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]   根据Key值将相同Key的元组的值用func进行计算,返回新的RDD

//reduceByKey

var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))

source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)

var sourceMap = source.reduceByKey((x,y)=>x+y)

sourceMap.collect().foreach(e=> print(e + " "))//(hello,2) (world,2)

12、def groupByKey(): RDD[(K, Iterable[V])]   将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD

//groupByKey

var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))

source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)

var sourceMap =source.groupByKey()

sourceMap.collect().foreach(e=> print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))

13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]   根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。

packagetop.ruandbimportorg.apache.spark.{ SparkConf, SparkContext}

object RddTest {

def main(args: Array[String]): Unit={

val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")

val sc= newSparkContext(sparkConf)//combineByKey 计算平均成绩

var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99),

("james", 44))

var input=sc.parallelize(scores);

input.collect().foreach(e=> print(e + " "))//(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)

var output = input.combineByKey((v) => (v, 1),

(acc: (Int, Int), v)=> (acc._1 + v, acc._2 + 1),

(acc1: (Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 +acc2._2))

output.collect().foreach(e=> print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))

var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}

result.collect().foreach(e=> print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)

}

}

14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,

combOp: (U, U) => U): RDD[(K, U)]   通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。

spark应用程序转换_Spark—RDD编程常用转换算子代码实例相关推荐

  1. python如何实现选项功能_python几种常用功能如何实现 python几种常用功能实现代码实例...

    本篇文章小编给大家分享一下python几种常用功能实现代码实例,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看. 1.python 程序退出的几种方式 import sys s ...

  2. spark期末大作业RDD编程初级实践

    1.需求描述 本次实验需要:系统:linux unbuntu14.04,处理器:至少需要两个处器,一个内核,内存:至少4G,硬盘空间:大小需要20GB.Hadoop:2.7.1以上版本,JDK:1.8 ...

  3. Spark RDD编程模型及算子介绍(二)

    文章目录 常见的Action算子 常见分区操作算子 常见的Action算子 countByKey算子:统计Key出现的次数,部分代码如下: rdd_file = sc.textFile(". ...

  4. vba代码编程800例_VBA编程常用“积木”过程代码Address的含义

    蓝字关注,加微信NZ9668获资料信息  VBA解决方案   系列丛书作者  头条百家平台 VBA资深创作者 _______________________________ 大家好,今日继续和大家分享 ...

  5. spark更改分区_spark RDD分区是否可以指定分区

    更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...

  6. python大小写转换if_python代码实例大小写转换,首字母大写,去除特殊字符

    下面是编程之家 jb51.cc 通过网络收集整理的代码片段. 编程之家小编现在分享给大家,也给大家做个参考. #字母大小写转换 #首字母转大写 #去除字符串中特殊字符(如:'_','.',',','; ...

  7. python excel转csv两列互换,python excel转换csv代码实例

    为了提高工作效率(偷懒),用python去解决. 工作需要,需要将excel文件转化为csv文件,要是手工的一个个去转换,每个sheet页不但有几十个字段,中间还夹杂着空格,然后按顺序转换成csv文件 ...

  8. Java8时间转换(LocalDateTime)代码实例

    1.将LocalDateTime转为自定义的时间格式的字符串 1 2 3 4 public static String getDateTimeAsString(LocalDateTime localD ...

  9. python读取txt文件代码-Python txt文件常用读写操作代码实例

    python读取txt文件 #方式一: file = r'D: est.txt' with open(file, 'rb+') as f: #可读可写二进制,文件若不存在就创建 data = f.re ...

最新文章

  1. R语言-常用对象及元素类型
  2. GitHub因“纳粹”评论遭解雇的犹太员工被复职,CEO公开致歉,开除他的HR走人...
  3. Pycharm常用的使用方法
  4. [ZZ]如何在Web页面上直接打开、编辑、创建Office文档
  5. 用c写按键精灵脚本语言,按键精灵之插件编写
  6. U-Boot顶层Makefile分析
  7. 【ajax】前端ajax传值的几种方法
  8. Ubuntu16.04.1安装Caffe(GPU)
  9. 【剑指offer】题目20 顺时针打印矩阵
  10. h5页面如何切图_HTML5自助切图
  11. 企业如何实践开源协同
  12. Ubuntu - Firefox 视频无法播放问题解决方法
  13. 点击折叠菜单(HTML/CSS/JS)
  14. 贝叶斯分类器matlab实现
  15. unity免费资源获取
  16. php访问80端口强制跳转443,nginx 80端口重定向到443端口
  17. Android虚拟机AVD has terminated
  18. 510cms渗透过程,挂马并提权
  19. python大数据计算_大数据计算平台 python
  20. 思科三层交换机开启ipv6路由功能_思科路由器配置 IPv6 和 OSPFv3 路由

热门文章

  1. git checkout 和 git reset
  2. PopupWindow和AlertDialog区别
  3. 学成在线--20.新增课程(最后完善)
  4. python面试题总结(1)--语言特性
  5. 机器学习之线性回归 损失函数、代价函数、目标函数
  6. html如何设置滑轮效果,HTML中鼠标滚轮事件onmousewheel处理
  7. Python中的虚拟环境-virtualenv
  8. xampp php5.6 7.1共存,New XAMPP with PHP 7.2.8, 7.1.20, 7.0.31 5.6.37
  9. ARM中各始终之间的关系,FCLK HCLK PCLK的关系
  10. linux unix shell programming,UnixampLinux Shell Programming I.ppt