RDD的创建

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集

并行化集合

由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。

演示范例代码,从List列表构建RDD集合:

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD*  - 将Scala集合转换为RDD*      sc.parallelize(seq)*  - 将RDD转换为Scala中集合*      rdd.collect()*      rdd.collectAsMap()*/
object SparkParallelizeTest {def main(args: Array[String]): Unit = {// 创建应用程序入口SparkContext实例对象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、Scala中集合Seq序列存储数据val linesSeq: Seq[String] = Seq("hello me you her","hello you her","hello her","hello")// 2、并行化集合创建RDD数据集/*def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]*/val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)//val inputRDD: RDD[String] = sc.makeRDD(linesSeq, numSlices = 2)// 3、调用集合RDD中函数处理分析数据val resultRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 4、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)resultRDD.foreach(println)// 应用程序运行结束,关闭资源sc.stop()}
}

外部存储系统

由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop支持的数据集,比如 HDFS、Cassandra、HBase 等。实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。

范例演示:从文件系统读取数据,设置分区数目为2,代码如下。

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 从HDFS/LocalFS文件系统加载文件数据,封装为RDD集合, 可以设置分区数目*  - 从文件系统加载*      sc.textFile("")*  - 保存文件系统*      rdd.saveAsTextFile("")*/
object SparkFileSystemTest {def main(args: Array[String]): Unit = {// 创建应用程序入口SparkContext实例对象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 1、从文件系统加载数据,创建RDD数据集/*def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]*/val inputRDD: RDD[String] = sc.textFile("data/input/words.txt",2)println(s"Partitions Number : ${inputRDD.getNumPartitions}")// 2、调用集合RDD中函数处理分析数据val resultRDD: RDD[(String, Int)] = inputRDD.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 3、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)resultRDD.foreach(println)// 应用程序运行结束,关闭资源sc.stop()}}

其中文件路径:可以指定文件名称,可以指定文件目录,可以使用通配符指定。

小文件读取

在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。

范例演示:读取10个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。

package cn.itcast.coreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 采用SparkContext#wholeTextFiles()方法读取小文件*/
object SparkWholeTextFileTest {def main(args: Array[String]): Unit = {// 创建应用程序入口SparkContext实例对象val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// wholeTextFiles()val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10", minPartitions = 2)filesRDD.map(_._1).foreach(println)val inputRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\n"))println(s"Partitions Number = ${inputRDD.getNumPartitions}")println(s"Count = ${inputRDD.count()}")// 应用程序运行结束,关闭资源sc.stop()}
}

实际项目中,可以先使用wholeTextFiles方法读取数据,设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。

2021年大数据Spark(十三):Spark Core的RDD创建相关推荐

  1. 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)--Spark概述 文章目录 前言 一.Spark基础 1.1 Spark是什么 1.2 Spark VS Hadoop 1.3 Spark优势及特点 1.3.1 优秀的数 ...

  2. 成都大数据Hadoop与Spark技术培训班

    成都大数据Hadoop与Spark技术培训班 中国信息化培训中心特推出了大数据技术架构及应用实战课程培训班,通过专业的大数据Hadoop与Spark技术架构体系与业界真实案例来全面提升大数据工程师.开 ...

  3. 大数据求索(8):Spark Streaming简易入门一

    大数据求索(8):Spark Streaming简易入门一 一.Spark Streaming简单介绍 Spark Streaming是基于Spark Core上的一个应用程序,可伸缩,高吞吐,容错( ...

  4. 2021年大数据HBase(十三):HBase读取和存储数据的流程

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase读取和存储数据的流程 一.HBase读取数据的流程 ...

  5. 大数据入门:Spark RDD、DataFrame、DataSet

    在Spark的学习当中,RDD.DataFrame.DataSet可以说都是需要着重理解的专业名词概念.尤其是在涉及到数据结构的部分,理解清楚这三者的共性与区别,非常有必要.今天的大数据入门分享,我们 ...

  6. 大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计

    今天向大家介绍一个帮助往届学生完成的毕业设计项目,大数据hadoop,spark数据分析之 基于大数据平台的运营商在线服务系统设计. 基于大数据平台的运营商在线服务系统设计 随着通信行业的业务拓展以及 ...

  7. 行业大数据 -- 基于hadoop+spark+mongodb+mysql开发医院临床知识库系统(建议收藏)

    一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS ...

  8. mllib逻辑回归 spark_大数据技术之Spark mllib 逻辑回归

    本篇教程探讨了大数据技术之Spark mllib 逻辑回归,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入. 逻辑回归 逻辑回归其实是一个分类算法而不是回归算法.通常是利用已知的 ...

  9. 大数据实时处理-基于Spark的大数据实时处理及应用技术培训

    随着互联网.移动互联网和物联网的发展,我们已经切实地迎来了一个大数据 的时代.大 数据是指无法在一定时间内用常规软件工具对其内容进行抓取.管理和处理的数据集合,对大数据的分析已经成为一个非常重要且紧迫 ...

  10. 2021年大数据HBase(八):Apache Phoenix的基本介绍

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 前言 系列历史文章 Apache Phoenix的基本介绍 Apache ...

最新文章

  1. .Net 中如何测试静态方法
  2. tensorflow2.0的cpu与gpu运行时间对比
  3. boost::contract模块实现virtual private protected的测试程序
  4. mysql or会用到索引吗_mysql or条件可以使用索引而避免全表
  5. 以NETSCREEN-50为例,说明防火墙配置步骤
  6. Wannafly挑战赛23F-计数【原根,矩阵树定理,拉格朗日插值】
  7. PDH光端机的主要作用以及特点有哪些?
  8. 汽车租赁php参考文献,国内外汽车租赁文献综述
  9. 蚁群算法(实验分析)
  10. 为什么一般的眼科医院很难发现眼底疾病?这个是关键!
  11. PDF文件被加密问题
  12. 由于找不到VCRUNTIME140.dll,无法继续执行代码问题解决
  13. 代码实现:圣诞树效果(易懂,必会)/用html实现圣诞树效果
  14. 企业云邮箱申请,TOM企业邮箱,2021不见不散
  15. Docker容器处于Removal in process 无法删除解决方案
  16. ffmpeg 向H.264编码的视频中添加 SEI
  17. 计算机组成原理-chp4-指令系统
  18. 安卓手机安装charles证书后,抓包依然提示unkown问题(An unknown issue occurred processing the certificate )
  19. 论文浏览(27) Long-Term Feature Banks for Detailed Video Understanding
  20. Windows下文件或文件夹不能删除时的解决办法

热门文章

  1. 在Chrome中打开网页时出现以下问题 您的连接不是私密连接 攻击者可能会试图从 x.x.x.x 窃取您的信息(例如:密码、通讯内容或信用卡信息)
  2. 后端怎么防止重复提交?(常用的做法)
  3. 2022-2028年中国汽车橡胶密封件行业市场深度分析及发展趋势分析报告
  4. 2022-2028年中国阻燃母料行业市场深度分析及发展规模预测报告
  5. debian10 简单搭建squid
  6. hadoop,spark,scala,flink 大数据分布式系统汇总
  7. torch中的copy()和clone()
  8. 功率半导体碳化硅(SiC)技术
  9. CVPR2019论文看点:自学习Anchor原理
  10. dp,sp,px相互转化