本博文的主要内容是:

1、rdd基本操作实战

2、transformation和action流程图

3、典型的transformation和action

RDD有3种操作:

1、  Trandformation      对数据状态的转换,即所谓算子的转换

2、  Action    触发作业,即所谓得结果的

3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

/** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T => U): RDD[U] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

传入类型是T,返回类型是U。

元素之间,为什么reduce操作,要符合结合律和交换律?答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。   在交换律基础上,想要reduce操作,必须要符合结合律。

/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) => T): T = withScope {  val cleanF = sc.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  var jobResult: Option[T] = None  val mergeResult = (index: Int, taskResult: Option[T]) => {    if (taskResult.isDefined) {      jobResult = jobResult match {        case Some(value) => Some(f(value, taskResult.get))        case None => taskResult      }    }  }  sc.runJob(this, reducePartition, mergeResult)  // Get the final result out of our Option, or throw an exception if the RDD was empty  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}

RDD.scala(源码)

这里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**  * Created by Administrator on 2016/9/27.  */object TextLines {

}

下面,开始编代码本地模式

自动 ,会写好

源码来看,

所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple

val textLines = lineCount.reduceByKey(_+_)

textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

成功!


现在,将此行代码,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))

总结:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。
集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
collect源码:
/** * Return an array that contains all of the elements in this RDD. */def collect(): Array[T] = withScope {  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)  Array.concat(results: _*)}

得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!

foreach源码:
/** * Applies a function f to all elements of this RDD. */def foreach(f: T => Unit): Unit = withScope {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
  

rdd实战(rdd基本操作实战)至此!

rdd实战(transformation流程图)

拿wordcount为例!

启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

或者

scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

注意,~目录,不是这里。

为什么,我的,不是这样的显示呢?

RDD的transformation和action执行的流程图

典型的transformation和action

转载于:https://www.cnblogs.com/zlslch/p/5913334.html

Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...相关推荐

  1. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

  2. Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

    不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...

  3. HBase编程 API入门系列之HTable pool(6)

    HTable是一个比较重的对此,比如加载配置文件,连接ZK,查询meta表等等,高并发的时候影响系统的性能,因此引入了"池"的概念. 引入"HBase里的连接池" ...

  4. Hadoop MapReduce编程 API入门系列之查找相同字母组成的字谜(三)

    找出相同单词的所有单词.现在,是拿取部分数据集(如下)来完成本项目. 项目需求 一本英文书籍包含成千上万个单词或者短语,现在我们需要在大量的单词中,找出相同字母组成的所有anagrams(字谜). 思 ...

  5. Spark SQL 编程API入门系列之SparkSQL数据源

    不多说,直接上干货! SparkSQL数据源:从各种数据源创建DataFrame 因为 spark sql,dataframe,datasets 都是共用 spark sql 这个库的,三者共享同样的 ...

  6. Spark MLlib编程API入门系列之特征选择之R模型公式(RFormula)

    不多说,直接上干货! 特征选择里,常见的有:VectorSlicer(向量选择) RFormula(R模型公式) ChiSqSelector(卡方特征选择). RFormula用于将数据中的字段通过R ...

  7. Windows SDK编程 API入门系列(转)

    之一 -那'烦人'的Windows数据类型 原创文章,转载请注明作者及出处. 首发 http://blog.csdn.net/beyondcode http://www.cnblogs.com/bey ...

  8. HBase编程 API入门系列之put(客户端而言)(1)

    心得,写在前面的话,也许,中间会要多次执行,连接超时,多试试就好了. [hadoop@HadoopSlave1 conf]$ cat regionservers HadoopMaster Hadoop ...

  9. Hadoop MapReduce编程 API入门系列之join(二十六)

    天气记录数据库 气象站数据库 气象站和天气记录合并之后的示意图如下所示. 011990-99999 SIHCCAJAVRI 195005150700 0 011990-99999 SIHCCAJAVR ...

最新文章

  1. 2022-2028年中国钢化玻璃行业市场研究及前瞻分析报告
  2. 两个经典递归问题:菲波那契数列 + 汉诺塔
  3. Vue.js示例:GitHub提交(watch数据,created钩子,filters过滤); 网格组件(功能:1.检索,2排序);...
  4. 为什么RESTful很糟糕?
  5. 信息系统项目管理师-信息化与信息系统核心知识点思维脑图
  6. python机器学习、数据分析常用第三方库(实时更新)
  7. DOM(二)——修改内容、属性、样式
  8. android 8 wifi 不稳定,Android 8.0又背锅?网络兼容问题导致WiFi狂掉线
  9. Linq lambda表达式经验总结
  10. qtableview选中第一行时表头会变色_超新颖的Word目录制作法,包你一看就会!【Word教程】...
  11. linux驱动基础开发2——linux 驱动开发前奏(模块编程)-转
  12. Vue:中向对象中添加数据
  13. django 模型-----模型查询
  14. 《PowerMock实战手册》读书笔记及个人总结
  15. cad2004教程_CAD插件自动编号软件安装包+安装教程
  16. 【转载】Goldendict下优质词典简介及安装 (2016-07-29 23:33:20)
  17. 【新星计划】Matlab pid参数调节工具箱
  18. PowerGraph:Distributed Graph-Parellel Computation on Natural Graph
  19. TortoiseSVN配置外部对比工具
  20. 钉钉小程序获取用户信息

热门文章

  1. python打卡摄像头黑屏_500行Python代码打造刷脸考勤系统 !
  2. 权限柜作用_超市条码寄存柜使用要点
  3. bazel 链接第三方动态库_惠州权威信息系统安全检测第三方
  4. Python 如何创建多维的list
  5. LeetCode Hot100 ---- 动态规划专题
  6. TensorFlow 2.0快速上手指南12条:“Keras之父”亲授 | 高赞热贴
  7. C++笔记——自定义函数
  8. 边缘计算边缘计算edge_Edge AI-边缘上的计算机视觉推理
  9. dcase_util教程
  10. 64位Ubuntu kylin 16.04下使用DNW下载uboot到tiny4412的EMMC