RDD创建方式

1)从Hadoop文件系统(如HDFS、Hive、HBase)输入创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。

4)基于DB(Mysql)、NoSQL(HBase)、S3(SC3)、数据流创建。

从集合创建RDD

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

从一个Seq集合创建RDD。

参数1:Seq集合,必须。

参数2:分区数,默认为该Application分配到的资源的CPU核数

  1. scala> var rdd = sc.parallelize(1 to 10)

  2. rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21

  3. scala> rdd.collect

  4. res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  5. scala> rdd.partitions.size

  6. res4: Int = 15

  7. //设置RDD为3个分区

  8. scala> var rdd2 = sc.parallelize(1 to 10,3)

  9. rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21

  10. scala> rdd2.collect

  11. res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  12. scala> rdd2.partitions.size

  13. res6: Int = 3

  • makeRDD

def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

这种用法和parallelize完全相同

def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]

该用法可以指定每一个分区的preferredLocations。

  1. scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),

  2. (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))

  3. collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),

  4. List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))

  5. scala> var rdd = sc.makeRDD(collect)

  6. rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23

  7. scala> rdd.partitions.size

  8. res33: Int = 2

  9. scala> rdd.preferredLocations(rdd.partitions(0))

  10. res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)

  11. scala> rdd.preferredLocations(rdd.partitions(1))

  12. res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)

指定分区的优先位置,对后续的调度优化有帮助。

从外部存储创建RDD

  • textFile

//从hdfs文件创建.

  1. //从hdfs文件创建

  2. scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")

  3. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21

  4. scala> rdd.count

  5. res48: Long = 4

  6. //从本地文件创建

  7. scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")

  8. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21

  9. scala> rdd.count

  10. res49: Long = 97

注意这里的本地文件路径需要在Driver和Executor端存在。

  • 从其他HDFS文件格式创建

hadoopFile

sequenceFile

objectFile

newAPIHadoopFile

  • 从Hadoop接口API创建

hadoopRDD

newAPIHadoopRDD

比如:从HBase创建RDD

  1. scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

  2. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

  3. scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat

  4. import org.apache.hadoop.hbase.mapreduce.TableInputFormat

  5. scala> import org.apache.hadoop.hbase.client.HBaseAdmin

  6. import org.apache.hadoop.hbase.client.HBaseAdmin

  7. scala> val conf = HBaseConfiguration.create()

  8. scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")

  9. scala> var hbaseRDD = sc.newAPIHadoopRDD(

  10. conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

  11. scala> hbaseRDD.count

  12. res52: Long = 1

Spark RDD使用详解2--RDD创建方式相关推荐

  1. Spark RDD 论文详解(三)Spark 编程接口

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  2. Spark RDD 论文详解(一)摘要和介绍

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  3. Spark RDD 论文详解(二)RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  4. Spark RDD 论文详解(七)讨论

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  5. Spark RDD 论文详解(四)表达 RDDs

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  6. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  7. Spark 内存管理详解(下):内存管理

    本文转自:Spark内存管理详解(下)--内存管理 本文最初由IBM developerWorks中国网站发表,其链接为Apache Spark内存管理详解 在这里,正文内容分为上下两篇来阐述,这是下 ...

  8. “iOS 推送通知”详解:从创建到设置到运行

    "iOS 推送通知"详解:从创建到设置到运行 转自 http://www.csdn.net/article/2012-02-18/311976 这是一篇编译的文章,内容均出自Par ...

  9. CSS3新特性详解(二):CSS3 字体@font-face详解、如何创建和修改woff字体文件及text-shadow等文本效果

      关于CSS3新特性,在上篇博文中"CSS3新特性详解(一):CSS3选择器.边框.背景使用细节及案例演示",讨论了CSS3选择器.边框和背景,本文讨论字体@font-face使 ...

  10. niosii spi 外部_NIOS II SPI详解 如何使用SPI方式传输

    NIOS II SPI 详解 如何使用 SPI 方式传输 1 .说明 本文是依据笔者阅读< Embedded Peripherals (ver 9.0, Mar 2009, 4 MB).pdf ...

最新文章

  1. 小猫咪小狗狗也有智能「手表」了,可监测健康,识别情绪,还防乱跑|CES 2022...
  2. 算法笔记之回溯法(2)
  3. 怎样在linux中创建硬盘,在linux中添加新硬盘并创建LVM组
  4. [转载] python numpy.sqrt_python中numpy库ndarray多维数组的的运算:np.abs(x)、np.sqrt(x)、np.modf(x)等...
  5. 苹果mac矢量图形设计软件:Illustrator
  6. Apache部署多个WordPress网站
  7. java.lang.IllegalStateException: Web app root system property already set to different value
  8. oracle表修改语句怎么写,Oracle修改表结构语句
  9. Scrum敏捷开发实践
  10. Chromium浏览器的一些使用总结
  11. CSDN日报191021:我与CSDN的这十年——笔耕不辍,青春热血
  12. 安卓手机变Linux服务器丨AidLux上手体验
  13. 论文翻译:混合维在庞加莱几何三维骨架的动作识别
  14. VS2010运行DirectShow的错误—typedef void * POINTER_64 PVOID64
  15. c语言odbc编程,c语言之odbc编程指南c语言之odbc编程指南.doc
  16. Guava系列:Shorts、Doubles、Chars、Floats、Ints、Longs、Bytes使用方法
  17. ❤520情人节送女朋友的生日礼物~html+css+js实现抖音炫酷樱花3D相册(含音乐)
  18. 2003-2021年高铁线路信息数据
  19. EX Sports中文Telegram社区正式成立啦 欢迎中国地区的伙伴加入
  20. Vue升序降序(前端价格排序)

热门文章

  1. led大屏按实际尺寸设计画面_年会活动要用LED大屏还是投影?专业行家都是看这些数据。...
  2. 软件工程领域相关的技术标准_女生是否适合学习软件工程专业,以及是否能够有好的就业机会...
  3. java继承circle类_java的继承
  4. Redis数据库(四)——Redis集群模式(主从复制、哨兵、Cluster)
  5. linux 显示器分辨率设置太小了,显示器不显示 如何在设置回来,当“显示设置”中的分辨率不可用时,如何使用xrandr设置自定义分辨率...
  6. hcia是什么等级的证书_华为的HCNA,HCNP,HCIE认证证书都有什么用?
  7. 被替换的项目不是替换值长度的倍数_面试官,为啥HashMap的长度是2的n次方?
  8. python读取字典数据_Python:读取列表[{}]中的字典数据
  9. 人行二代征信报告模版_人行首度明确,替代数据纳入征信管理!美化信用报告,难了...
  10. poi报空指针_POI 导出文件 报空指针异常 --Docker 中