Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...
本博文的主要内容是:
1、rdd基本操作实战
2、transformation和action流程图
3、典型的transformation和action
RDD有3种操作:
1、 Trandformation 对数据状态的转换,即所谓算子的转换
2、 Action 触发作业,即所谓得结果的
3、 Contoller 对性能、效率和容错方面的支持,如cache、persist、checkpoint
Contoller包括cache、persist、checkpoint。
/** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
传入类型是T,返回类型是U。
元素之间,为什么reduce操作,要符合结合律和交换律?答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。 在交换律基础上,想要reduce操作,必须要符合结合律。 /** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}
RDD.scala(源码)
这里,新建包com.zhouls.spark.cores
package com.zhouls.spark.cores /** * Created by Administrator on 2016/9/27. */object TextLines { } 下面,开始编代码本地模式
自动 ,会写好
源码来看,
所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身
val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple
val textLines = lineCount.reduceByKey(_+_)
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
成功!
现在,将此行代码,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
总结:
本地模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。
集群模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
collect源码:
/** * Return an array that contains all of the elements in this RDD. */def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)} 得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!
foreach源码:
/** * Applies a function f to all elements of this RDD. */def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
rdd实战(rdd基本操作实战)至此!
rdd实战(transformation流程图)
拿wordcount为例!
启动hdfs集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
启动spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
启动spark-shell
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
scala> val partitionsReadmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")
或者
scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")
scala> val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)
.saveAsTextFile("~/partition1README.txt")
注意,~目录,不是这里。
为什么,我的,不是这样的显示呢?
RDD的transformation和action执行的流程图
典型的transformation和action
转载于:https://www.cnblogs.com/zlslch/p/5913334.html
Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...相关推荐
- Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...
1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...
- Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)
不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...
- HBase编程 API入门系列之HTable pool(6)
HTable是一个比较重的对此,比如加载配置文件,连接ZK,查询meta表等等,高并发的时候影响系统的性能,因此引入了"池"的概念. 引入"HBase里的连接池" ...
- Hadoop MapReduce编程 API入门系列之查找相同字母组成的字谜(三)
找出相同单词的所有单词.现在,是拿取部分数据集(如下)来完成本项目. 项目需求 一本英文书籍包含成千上万个单词或者短语,现在我们需要在大量的单词中,找出相同字母组成的所有anagrams(字谜). 思 ...
- Spark SQL 编程API入门系列之SparkSQL数据源
不多说,直接上干货! SparkSQL数据源:从各种数据源创建DataFrame 因为 spark sql,dataframe,datasets 都是共用 spark sql 这个库的,三者共享同样的 ...
- Spark MLlib编程API入门系列之特征选择之R模型公式(RFormula)
不多说,直接上干货! 特征选择里,常见的有:VectorSlicer(向量选择) RFormula(R模型公式) ChiSqSelector(卡方特征选择). RFormula用于将数据中的字段通过R ...
- Windows SDK编程 API入门系列(转)
之一 -那'烦人'的Windows数据类型 原创文章,转载请注明作者及出处. 首发 http://blog.csdn.net/beyondcode http://www.cnblogs.com/bey ...
- HBase编程 API入门系列之put(客户端而言)(1)
心得,写在前面的话,也许,中间会要多次执行,连接超时,多试试就好了. [hadoop@HadoopSlave1 conf]$ cat regionservers HadoopMaster Hadoop ...
- Hadoop MapReduce编程 API入门系列之join(二十六)
天气记录数据库 气象站数据库 气象站和天气记录合并之后的示意图如下所示. 011990-99999 SIHCCAJAVRI 195005150700 0 011990-99999 SIHCCAJAVR ...
最新文章
- 2022-2028年中国钢化玻璃行业市场研究及前瞻分析报告
- 两个经典递归问题:菲波那契数列 + 汉诺塔
- Vue.js示例:GitHub提交(watch数据,created钩子,filters过滤); 网格组件(功能:1.检索,2排序);...
- 为什么RESTful很糟糕?
- 信息系统项目管理师-信息化与信息系统核心知识点思维脑图
- python机器学习、数据分析常用第三方库(实时更新)
- DOM(二)——修改内容、属性、样式
- android 8 wifi 不稳定,Android 8.0又背锅?网络兼容问题导致WiFi狂掉线
- Linq lambda表达式经验总结
- qtableview选中第一行时表头会变色_超新颖的Word目录制作法,包你一看就会!【Word教程】...
- linux驱动基础开发2——linux 驱动开发前奏(模块编程)-转
- Vue:中向对象中添加数据
- django 模型-----模型查询
- 《PowerMock实战手册》读书笔记及个人总结
- cad2004教程_CAD插件自动编号软件安装包+安装教程
- 【转载】Goldendict下优质词典简介及安装 (2016-07-29 23:33:20)
- 【新星计划】Matlab pid参数调节工具箱
- PowerGraph:Distributed Graph-Parellel Computation on Natural Graph
- TortoiseSVN配置外部对比工具
- 钉钉小程序获取用户信息
热门文章
- python打卡摄像头黑屏_500行Python代码打造刷脸考勤系统 !
- 权限柜作用_超市条码寄存柜使用要点
- bazel 链接第三方动态库_惠州权威信息系统安全检测第三方
- Python 如何创建多维的list
- LeetCode Hot100 ---- 动态规划专题
- TensorFlow 2.0快速上手指南12条:“Keras之父”亲授 | 高赞热贴
- C++笔记——自定义函数
- 边缘计算边缘计算edge_Edge AI-边缘上的计算机视觉推理
- dcase_util教程
- 64位Ubuntu kylin 16.04下使用DNW下载uboot到tiny4412的EMMC