“戏”说Spark-Spark核心-RDD转换行动类算子详解
算子概述
对于RDD可以有两种计算方式:
转换(返回值还是一个RDD)---懒执行
操作(返回值不是一个RDD)---立即执行
转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
我们可以形象的使用下图表示Spark的输入、运行转换、输出。
Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中( collect输出到Scala集合,count返回Scala Int型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
常用算子总结:
官方文档中常用算子:
翻译:
Action算子:
Transformations算子:
如何区分Transformations算子和Action类算子?
常用的Transformations算子+Action算子案例演示:代码可直接运行

package spark.mySpark.transformationAndaction;
import groovy.lang.Tuple;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.sysFuncNames_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
//java中的元组使用scala中的Tuple
import scala.Tuple2;
/**
* @author
* Spark算子演示:
* Transformation类算子
* map
* flatMap
* filter
* sortByKey
* reduceByKey
* sample
* Action类算子:
* count
* collect
* foreach
*/
public class transformation_test {
@SuppressWarnings("resource")
public static void main(String[] args) {
//因为java是面向对象的语言,当使用java来写Spark代码的时候,是传递对象,自动的提示生成返回值可以简化开发
//快捷键:Ctrl+1
//Spark应用程序的配置文件对象,可以设置:1:运行模式,2:应用程序Application的名称,3:运行时的资源的需求
SparkConf sparkConf = new SparkConf().setAppName("transformation_test").setMaster("local[3]");
//SparkContext是非常的重要的,它是通往集群的唯一的通道
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//加载文件生成RDD
JavaRDD<String> textFileRDD= sparkContext.textFile("words.txt");
//==========================filter(function(T,Boolean))=========================//
//filter算子是Transformation类算子,返回一个由通过filter()的函数的元素组成的RDD,结果为true的元素会返回,可以用于过滤
//第一个泛型是textFileRDD里内容的类型,Boolean是返回值类型
JavaRDD<String> filterRDD = textFileRDD.filter(new Function<String, Boolean>() {
/**
* 分布式的程序:对象需要走网络传输
* 添加序列化id
*/
private static final long serialVersionUID = 1L;
public Boolean call(String line) throws Exception {
//过滤掉java
System.out.println("是否执行filter算子");
return !line.contains("java");
}
});
//============================foreach========================================//
//foreach算子是Action类算子,遍历RDD的计算结果
filterRDD.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
public void call(String word) throws Exception {
System.out.println(word);
}
});
//================================collect=========================//
//collect算子是Action类算子:将在集群中运行任务的结果拉回Driver端
//注意:当计算结果很大的时候,会导致Driver端OOM
List<String> list = filterRDD.collect();
for(String string:list){
System.out.println(string);
}
//===============================================map=========================//
//map算子是transformation类算子,一般用于改变RDD的内数据的内容格式
//String输入数据的类型,第二个为返回值类型
JavaRDD<Integer> mapRDD = textFileRDD.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(String line) throws Exception {
return line.contains("java")?1:0;
}
});
mapRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
public void call(Integer num) throws Exception {
System.out.println(num);
}
});
//============================================sample=========================//
//sample算子是一个Transformation类算子,通常用于大数据中的抽样
//withReplacement:是否为放回式的抽样,false为不放会式抽样。fraction为抽样的比例,seed为随机种子:随机抽样算法的初始值
JavaRDD<String> sampleRDD = textFileRDD.sample(true, 0.5);
long count= sampleRDD.count();
System.out.println(count);
//=========================================flatmap=========================//
//flatmap:map+flat,input 1 output *
//map :input 1 output 1
//切分单词
JavaRDD<String> flatMapRDD = textFileRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
//返回迭代器
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
List<String> collect = flatMapRDD.collect();
for(String string:collect){
System.out.println("word="+string);
}
//===============================sortByKey=========================//
//在java的API中:将RDD转化为(K,V)格式的RDD,需要使用**toPair
//第一个为输入数据的类型,第二个,第三个参数为返回的K,V
List<Tuple2<String,Integer>> temp = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
// reduceByKey为Transformation类算子
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
//循环反复将v1+v2的值累加
return v1+v2;
}
//变换(K,V)格式的RDD
}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)
throws Exception {
return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(Tuple2<Integer, String> line)
throws Exception {
return new Tuple2<String, Integer>(line._2,line._1);
}
}). collect();;
//注意:当使用本地的local[*>1]的时候,使用foreach遍历数据的时候会出错?
//具体的什么问题我也不是很清楚?
/**
foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println(tuple);
}
});
**/
for(Tuple2<String, Integer> list1:temp){
System.out.println(list1);
}
//关闭SparkContext
sparkContext.stop();
}
}

思考:假如有1亿条数据,如何过滤掉出现次数最多的字符串:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。
scala版本的思考题代码:

package spark.myspark.functions
import org.apache.spark.{SparkContext, SparkConf}
//思考:假如有1亿条数据,如何动态的统计出出现次数最多的编程语言,然后过滤掉
// 思路:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。
/**
* 使用到的算子:
* sample
* flatmap
* map
* reduceByKey
* sortByKey
* take(n)-----Action算子
* first()----源码中即take(1)
* filter
*/
object Sample_test {
def main(args: Array[String]) {
val conf= new SparkConf().setAppName("sample").setMaster("local")
val context =new SparkContext(conf)
val textRDD= context.textFile("words.txt")
//抽样
val sampleRDD=textRDD.sample(false,0.9)
//拿到编程语言---(语言,1)---根据key求和---转换(语言,出现次数)--(次数,语言)---排序---取Top1---取Top1对应的编程语言
val WordRDD= sampleRDD.map(x=>{(x.split(" ")(1),1)}).reduceByKey(_+_)
//first=take(1)----Action类的算子,返回一个非RDD的值
val code= WordRDD.map(x=>{(x._2,x._1)}).sortByKey(false).first()._2
//过滤
textRDD.filter(x=>{!x.contains(code)}).foreach(println)
context.stop()
}
}

注意:有多少个Action类的算子就有多少个Job任务
reduceByKey和sortByKey会产生Shuffle
注意:一个Spark应用程序的编写流程
详细请参考: “戏”说Spark-Spark Stage切分
"戏"说Spark-Spark Shuffle详解
补充:依赖包及源码包:链接: http://pan.baidu.com/s/1nuTS8WT 密码:9bf7
1:源码包下载地址(包含依赖包):
技能补充:
如何使用Idea以maven的方式编译源码包
1:下载源码,地址: http://spark.apache.org/downloads.html ,选择相应的版本
2:将源码工程import到Idea
3:以maven的方式构建,可能需要等2-3小时,需要下载Spark相关的依赖包
如何使用Eclipse查看源码?
1:下载源码
2:Ctrl需要查看的类,然后选择源码包即可查看
思维导图构建你的知识体系:
参考:
Spark笔记:RDD基本操作(上) : http://www.cnblogs.com/sharpxiajun/p/5506822.html
Spark的算子的分类:  http://www.cnblogs.com/zlslch/p/5723857.html
Spark函数详解系列之RDD基本转换 : http://www.cnblogs.com/MOBIN/p/5373256.html
http://www.jianshu.com/p/c7eef3eb6225

“戏”说Spark-Spark核心-RDD转换操作算子详解(一)相关推荐

  1. Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集)

    Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集) 铺垫 在hadoop中一个独立的计算,例如在一个迭代过程中,除可复制的文件系统(HDFS)外没有 ...

  2. spark 算子 详解

    参考文档:Spark算子详解及案例分析(分类助记) - 云+社区 - 腾讯云 1.combineByKey .作为spark 的核心算子之一,有必要详细了解.reduceByKey 和groupByK ...

  3. Linux下fdisk命令操作磁盘详解--添加、删除、转换分区

    linux下fdisk命令操作磁盘详解--添加.删除.转换分区等 fdisk 操作硬盘的命令格式如下: [root@localhost beinan]# fdisk 设备 比如我们通过 fdisk - ...

  4. [Python从零到壹] 八.数据库之MySQL和Sqlite基础知识及操作万字详解

    欢迎大家来到"Python从零到壹",在这里我将分享约200篇Python系列文章,带大家一起去学习和玩耍,看看Python这个有趣的世界.所有文章都将结合案例.代码和作者的经验讲 ...

  5. eslint php,ESlint操作步骤详解

    这次给大家带来ESlint操作步骤详解,ESlint操作的注意事项有哪些,下面就是实战案例,一起来看一下. vue-cli脚手架创建的项目默认使用ESlint规则,启动项目的时候因为各种语法报错,不得 ...

  6. Quill编辑器操作实例详解

    今天分享下"Quill编辑器操作实例详解"这篇文章,文中根据实例编码详细介绍,或许对大家的编程之路有着一定的参考空间与使用价值,需要的朋友接下来跟着云南仟龙Mark一起学习一下吧. ...

  7. Cesium 核心类Viewer-查看器详解

    Cesium 核心类Viewer-查看器详解 1 简介 A base widget for building applications. It composites all of the standa ...

  8. C语言文件操作超详解(万字解读,细致入微)

    目录 一.什么是文件 1.程序文件 2.数据文件 二.文件名 三.文件的打开和关闭 1.文件指针 2.文件操作--打开和关闭 fopen函数(包含在头文件stdio.h中)的解析: fclose函数( ...

  9. 主管护师计算机考试如何舞弊,人机对话操作步骤详解,2020主管护师考生必看!...

    原标题:人机对话操作步骤详解,2020主管护师考生必看! 距离2020年卫生资格考试不到100天了. 2020年度卫生资格考试除初级护师外采用人机对话的方式进行.很多考生第一次接触人机对话考试形式,那 ...

最新文章

  1. matlab 弹出提示,谁能告诉我为什么一打开matlab2014b就弹出一个框就自动退出
  2. 【怎样写代码】偷窥高手 -- 反射技术(三):深入窥视字段
  3. bs4库的prettify()方法|粉饰的意思。就是多了换行!
  4. Wannafly挑战赛29题解
  5. Fedora开启telnet服务
  6. 使用各种方法加速大型矩阵运算的效率对比
  7. NFS网络文件系统服务
  8. FD33里面的销售值不正确应该怎么办?
  9. GameMaker Studio 之中的攻击与受击判定盒
  10. 玩游戏该怎么选择硬盘
  11. RDIFramework.NET(.NET快速信息化系统开发框架) Web版介绍
  12. 【CSS】text-align:justify 的使用
  13. GIL与线程进程小知识点
  14. Sass的安装(windows 10)
  15. 台式计算机读取不了移动硬盘,电脑识别不了硬盘的原因
  16. 2021全国大学生电子设计竞赛F题参赛简记
  17. 高效能人士的七个习惯--由内而外全面造就自己
  18. Linux下使用clang-format格式化C++代码
  19. apache ii评分怎么评_如何正确进行APACHE II评分
  20. js 生成条形码例子

热门文章

  1. 电商评论数据爬取--R语言
  2. SpringBoot项目接入支付宝第三方登录
  3. 验证码这样做,瞬间高出一个逼格!
  4. 文件、文件夹操作应用
  5. 为四川汶川捐款可靠的三大途径(来源于百度)
  6. Windows10切换桌面关闭动画效果
  7. 开放windows服务器端口(以打开端口8080为例)
  8. 解决M1 MAC安装AI闪退问题(llustratorCC2020 M1直装适配版)支持M1 Mac芯片处理器
  9. 【送福利】拆礼盒赢iPhone 7,10000万礼盒100%中奖人人有礼!
  10. emergency和urgency的区别_请教与切磋:Hypertensive urgency和hypertensive emergency翻译