spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark—RDD编程常用转换算子代码实例
Spark rdd 常用 Transformation 实例:
1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTestextendsApp{
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//map
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.map(_*10)
sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100
}
2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD
//filter
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.filter(_.
sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。
//flatMap
var source = sc.parallelize(1 to 5)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5
var sourceMap = source.flatMap(x=>(1to x))
sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//mapPartitions
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap =source.mapPartitions(partitionsFun)
sourceMap.collect().foreach(e=> print(e + " ")) //jams jack
}
def partitionsFun(iter:Iterator[(String,String)]): Iterator[String]={
var males=List[String]()while(iter.hasNext){
val next=iter.next()
next match {case (_,"male") => males =next._1::malescase _ =>}
}returnmales.iterator
}
}
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//mapPartitionsWithIndex
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap =source.mapPartitionsWithIndex(partitionsFunWithIndex)
sourceMap.collect().foreach(e=> print(e + " ")) //[1]jams [1]jack
}
def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String]={
var males=List[String]()while(iter.hasNext){
val next=iter.next()
next match {case (_,"male") => males="["+index+"]"+next._1 :: malescase _ =>}
}
males.iterator
}
}
6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//sample
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.sample(true,0.4,2)
sourceMap.collect().foreach(e=> print(e + " ")) //1 2 2
}
}
7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD
//union
var source = sc.parallelize(1 to 3)
source.collect().foreach(e=> print(e + " "))//1 2 3
var rdd = sc.parallelize(6 to 9)
var sourceMap=source.union(rdd)
sourceMap.collect().foreach(e=> print(e + " "))//1 2 3 6 7 8 9
8、def intersection(other: RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD
//intersection
var source = sc.parallelize(1 to 8)
source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8
var rdd = sc.parallelize(6 to 9)
var sourceMap=source.intersection(rdd)
sourceMap.collect().foreach(e=> print(e + " "))//6 8 7
9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD
//distinct
var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))
source.collect().foreach(e=> print(e + " "))//1 1 2 2 3 3 4 4 5 5
var sourceMap =source.distinct()
sourceMap.collect().foreach(e=> print(e + " "))//4 2 1 3 5
10、def partitionBy(partitioner: Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD
//partitionBy
var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)
source.collect().foreach(e=> print(e + " "))
print("分区数:"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分区数:4
var sourceMap = source.partitionBy(new HashPartitioner(2))
sourceMap.collect().foreach(e=> print(e + " "))
print("分区数:"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分区数:2
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
//reduceByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap = source.reduceByKey((x,y)=>x+y)
sourceMap.collect().foreach(e=> print(e + " "))//(hello,2) (world,2)
12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD
//groupByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap =source.groupByKey()
sourceMap.collect().foreach(e=> print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。
packagetop.ruandbimportorg.apache.spark.{ SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//combineByKey 计算平均成绩
var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99),
("james", 44))
var input=sc.parallelize(scores);
input.collect().foreach(e=> print(e + " "))//(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)
var output = input.combineByKey((v) => (v, 1),
(acc: (Int, Int), v)=> (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 +acc2._2))
output.collect().foreach(e=> print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))
var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}
result.collect().foreach(e=> print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)
}
}
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。
spark应用程序转换_Spark—RDD编程常用转换算子代码实例相关推荐
- python如何实现选项功能_python几种常用功能如何实现 python几种常用功能实现代码实例...
本篇文章小编给大家分享一下python几种常用功能实现代码实例,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看. 1.python 程序退出的几种方式 import sys s ...
- spark期末大作业RDD编程初级实践
1.需求描述 本次实验需要:系统:linux unbuntu14.04,处理器:至少需要两个处器,一个内核,内存:至少4G,硬盘空间:大小需要20GB.Hadoop:2.7.1以上版本,JDK:1.8 ...
- Spark RDD编程模型及算子介绍(二)
文章目录 常见的Action算子 常见分区操作算子 常见的Action算子 countByKey算子:统计Key出现的次数,部分代码如下: rdd_file = sc.textFile(". ...
- vba代码编程800例_VBA编程常用“积木”过程代码Address的含义
蓝字关注,加微信NZ9668获资料信息 VBA解决方案 系列丛书作者 头条百家平台 VBA资深创作者 _______________________________ 大家好,今日继续和大家分享 ...
- spark更改分区_spark RDD分区是否可以指定分区
更多详细内容 数据分区: 在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能. mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输 ...
- python大小写转换if_python代码实例大小写转换,首字母大写,去除特殊字符
下面是编程之家 jb51.cc 通过网络收集整理的代码片段. 编程之家小编现在分享给大家,也给大家做个参考. #字母大小写转换 #首字母转大写 #去除字符串中特殊字符(如:'_','.',',','; ...
- python excel转csv两列互换,python excel转换csv代码实例
为了提高工作效率(偷懒),用python去解决. 工作需要,需要将excel文件转化为csv文件,要是手工的一个个去转换,每个sheet页不但有几十个字段,中间还夹杂着空格,然后按顺序转换成csv文件 ...
- Java8时间转换(LocalDateTime)代码实例
1.将LocalDateTime转为自定义的时间格式的字符串 1 2 3 4 public static String getDateTimeAsString(LocalDateTime localD ...
- python读取txt文件代码-Python txt文件常用读写操作代码实例
python读取txt文件 #方式一: file = r'D: est.txt' with open(file, 'rb+') as f: #可读可写二进制,文件若不存在就创建 data = f.re ...
最新文章
- R语言-常用对象及元素类型
- GitHub因“纳粹”评论遭解雇的犹太员工被复职,CEO公开致歉,开除他的HR走人...
- Pycharm常用的使用方法
- [ZZ]如何在Web页面上直接打开、编辑、创建Office文档
- 用c写按键精灵脚本语言,按键精灵之插件编写
- U-Boot顶层Makefile分析
- 【ajax】前端ajax传值的几种方法
- Ubuntu16.04.1安装Caffe(GPU)
- 【剑指offer】题目20 顺时针打印矩阵
- h5页面如何切图_HTML5自助切图
- 企业如何实践开源协同
- Ubuntu - Firefox 视频无法播放问题解决方法
- 点击折叠菜单(HTML/CSS/JS)
- 贝叶斯分类器matlab实现
- unity免费资源获取
- php访问80端口强制跳转443,nginx 80端口重定向到443端口
- Android虚拟机AVD has terminated
- 510cms渗透过程,挂马并提权
- python大数据计算_大数据计算平台 python
- 思科三层交换机开启ipv6路由功能_思科路由器配置 IPv6 和 OSPFv3 路由
热门文章
- git checkout 和 git reset
- PopupWindow和AlertDialog区别
- 学成在线--20.新增课程(最后完善)
- python面试题总结(1)--语言特性
- 机器学习之线性回归 损失函数、代价函数、目标函数
- html如何设置滑轮效果,HTML中鼠标滚轮事件onmousewheel处理
- Python中的虚拟环境-virtualenv
- xampp php5.6 7.1共存,New XAMPP with PHP 7.2.8, 7.1.20, 7.0.31 5.6.37
- ARM中各始终之间的关系,FCLK HCLK PCLK的关系
- linux unix shell programming,UnixampLinux Shell Programming I.ppt