内容:

1.RDD创建的几个方式

2.RDD创建实战

3.RDD内幕

第一个RDD:代表了星火应用程序输入数据的来源

通过转型来对RDD进行各种算子的转换实现算法

RDD的3种基本的创建方式

1,使用程序中的集合创建RDD;

2,使用本地文件系统创建RDD;

3,使用HDS创建RDD

其他:

4,基于DB创建RDD

5,基于NoSQL的,例如HBase的

如图6所示,基于S3创建RDD

如图7所示,基于数据流创建RDD

1.通过集合创建RDD的实际意义:测试

2.使用本地文件系统创建RDD的作用:测试大量数据文件

3.使用HDFS创建RDD:生产环境最常用的RDD创建方式

hadoop是基础设施,spark是计算核心

下面以代码演示通过集合创建RDD:

Object RDDBasedOnCollections {

def main(args:Array[String]) {

val conf = new SparkConf()    //创建SparkConf对象

conf.setAppName(“RDDBasedOnCollections”)  //设置应用程序名称,在程序运行的监控界面可以看到这个名称

conf.setMaster(“local”)

val sc = new SparkContext(conf)  //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息。

val numbers = 1 to 100  //创建一个scala集合

val rdd = sc.parallelize(numbers)   //创建一个ParallelCollectionRDD

val sum = rdd.reduce(_+_)  //1+2=3  3+3=6  6+4=10 ...

println(“1+2+......+99+100=” + sum)

}

}

你可以在再智能设备 例如手机 平板 电视 上使用Spark,也可以在PC和Server使用使用Spark。Spark可以运行在一切设备上,只要有JVM即可。

如果是单台机,可以通过多线程方式模拟分布式

Local模式 默认情况下如果失败了 就是失败了。

下面是SparkContext的createTaskScheduler方法的源码:

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

// When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

case "yarn-standalone" | "yarn-cluster" =>
        if (master == "yarn-standalone") {
          logWarning(
            "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
        }
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
        } catch {
          // TODO: Enumerate the exact reasons why it can fail
          // But irrespective of it, it means we cannot proceed !
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        scheduler.initialize(backend)
        (backend, scheduler)

case "yarn-client" =>
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

scheduler.initialize(backend)
        (backend, scheduler)

case MESOS_REGEX(mesosUrl) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
        } else {
          new MesosSchedulerBackend(scheduler, sc, mesosUrl)
        }
        scheduler.initialize(backend)
        (backend, scheduler)

case SIMR_REGEX(simrUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
        scheduler.initialize(backend)
        (backend, scheduler)

case zkUrl if zkUrl.startsWith("zk://") =>
        logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
          "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
        createTaskScheduler(sc, "mesos://" + zkUrl)

case _ =>
        throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
  }
}

通过源码可以看出如果使用LOCAL_N_FAILURES_REGEX模式,设置线程数和最大失败次数,如果失败了可以重试。所以Spark作为一个单机版软件也是非常强悍的。

未指定并行度的情况下,spark看集群有多少core就用多少个Core(并行度)。

spark会最大化使用计算资源,计算效率非常高。但如果管理不当会更耗资源。

前面的对象RDDBasedOnCollections 运行时只有一个stage。原因是

代码中只有一个reduce,reduce是Action,不会产生RDD,所以也没有Shuffle。

hadoop的mr已没有任何应用场景了。

ParallelCollectionRDD 的源码如下:

private object ParallelCollectionRDD {
  /**
   * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
   * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
   * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
   * is an inclusive Range, we use inclusive range for the last slice.
   */
  def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of slices required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map(i => {
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      })
    }
    seq match {
      case r: Range => {
        positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }).toSeq.asInstanceOf[Seq[Seq[T]]]
      }
      case nr: NumericRange[_] => {
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      }
      case _ => {
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map({
          case (start, end) =>
            array.slice(start, end).toSeq
        }).toSeq
      }
    }
  }
}

 可以看出ParallelCollectionRDD可以有两处参数,seq: Seq[T], numSlices: Int,numSlices如果不指定将会默认利用所有CPU,获得最高并行度。如果指定numSlices将会按指定的分片(并行度)运行Spark程序

实际上Spark的并行度到底应该设置为多少呢?

最佳实践:spark并行度:每个core可以承载2-4个partition,

例如:32个core的话可以设为64-128

跟数据规模没有关系,只跟每个Task计算partition时使用的内存使用量和cpu使用时间有关。

blockmanager管理数据的优先位置,在程序启动时就完成了这个过程。SparkContext在构建DAGScheduler对DAG进行Stage划分时已经决定好了每一个数据分片的优先位置。

无论数据是放在内存还是磁盘还是Tachyon上,都由BlockManager管理。

下面再看一下ParallelCollectionPartition的源码:

private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {
  // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
  // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
  // instead.
  // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.

override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray

//读取数据时调用ParallelCollectionRDD.slice并转换为数组。
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray

//对数组分片,将每一片数据变成ParallelCollectionPartition
  }

override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  }

override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)

//获取数据的优先位置。
  }
}

下面通过读取本地文件创建RDD:

val rdd = sc.textFile(“D://README.txt”)  //注意是双斜杠

//计算所有行的长度的总和

val lineLength = rdd.map(line => line.length)

val sum = lineLength.reduce(_+_)

println(“The total character of the file is ” +  sum)

下面看一下textFile的源码:

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 */
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString)
}

可以看出在textFile是读取HDFS或本地文件系统或其他hadoop支持的文件系统上的文件,并将其转换为RDD。在textFile内部调用了hadoopFile函数。

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
 *
 * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
 * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
 * operation will create many references to the same object.
 * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
 * copy them using a `map` function.
 */
def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()
  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

可以看出在hadoopFile内创建了一个HadoopRDD,HadoopRDD的创建要依赖于Hadoop底层本身。

def hadoopRDD[K, V](
    conf: JobConf,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()
  // Add necessary security credentials to the JobConf before broadcasting it.
  SparkHadoopUtil.get.addCredentials(conf)
  new HadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minPartitions)
}

FileInputFormat是java的写的,是org.apache.hadoop。mapred 的包。所以这里是用星火操作的Hadoop的实现。

有人说星火的缺点是没有文件系统,但其实这正是星火的优点,正因为没有文件系统,所以才可以跨一切文件系统。

用HBase的/ MySQL的/ ORACLE的话要考虑数据本地性,要认真写getPreferredLacation.getPreferredLacation决定计算发生在什么地方.DAGScheduler在对DAG划分不同阶段时,阶段内部具体任务已经决定了数据优先位置。所以MySQL的/预言数据库机上要安装Spark.HBase节点上也要安装的火花。

实际生产环境下,HBase的和火花安装在同一节点上是可能的,但MySQL的/ oracle的节点上安装火花的可能性较小,这时就需要用的Tachyon作为中间件,导入数据库的数据,也可以把数据库中的数据导入配置单元中,在蜂巢节点上运行的火花。

第15课:RDD创建内幕彻底解密相关推荐

  1. 15.RDD 创建内幕解析

    第15课:RDD创建内幕 RDD的创建方式 Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体 ...

  2. 第71课:Spark SQL窗口函数解密与实战学习笔记

    第71课:Spark SQL窗口函数解密与实战学习笔记 本期内容: 1 SparkSQL窗口函数解析 2 SparkSQL窗口函数实战 窗口函数是Spark内置函数中最有价值的函数,因为很多关于分组的 ...

  3. [scala-spark]9. RDD创建操作

    1. 从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implici ...

  4. Spark RDD创建操作

    从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit a ...

  5. Spark算子:RDD创建操作

    关键字:Spark RDD 创建.parallelize.makeRDD.textFile.hadoopFile.hadoopRDD.newAPIHadoopFile.newAPIHadoopRDD ...

  6. 蚂蚁区块链第8课 如何创建新的账户,获取私钥和identity标识?

    1,摘要 在创建TEE硬件隐私合约链(标准合约链)的时候,配套已经创建了一个证书对和2个公私钥对,对应了这1个账号(duncanwang).不理解辉哥说什么的参考<蚂蚁区块链第4课 如何创建TE ...

  7. Hash-based Shuffle内幕彻底解密

    Hash-based Shuffle内幕彻底解密 视频学习来源:DT-大数据梦工厂 IMF传奇行动视频 本期内容: 1 Hash Shuffle彻底解密 2 Shuffle Pluggable解密 3 ...

  8. 太空射击第15课: 道具

    太空射击第15课: 道具(第1部分) 在本课中,我们将添加一些偶尔出现的道具. 本教程所需素材从这里可以下载 视频 您可以在此处观看本课程的视频 道具 我们的游戏已经走了很远,但我们仍然缺少的一件事是 ...

  9. 《实用VC编程之玩转控件》第15课:Tree树形控件

    本文转载自:VC驿站 https://www.cctry.com/thread-297465-1-1.html 1.控件简介: Tree树形控件也是我们编程过程中比较常用的一个控件,而且在其他软件中也 ...

  10. C语言探索之旅 | 第二部分第六课:创建你自己的变量类型

    -- 简书作者 谢恩铭 转载请注明出处 第二部分第六课:创建你自己的变量类型 上一课C语言探索之旅 | 第二部分第五课:预处理之后,我们进入令人激动也非常有意思的一课. 众所周知,C语言是面向过程的编 ...

最新文章

  1. linux下如何实现mysql数据库每天自动备份定时备份
  2. Linux学习笔记 1 环境变量 2 vi命令
  3. 判断整数序列是不是二元查找树的后序遍历结果
  4. 计算机网络校园网简单设计与实现,简单校园网的设计与实现.docx
  5. java中如何声明外键约束,外键约束不正确 - java-mysql
  6. IOT(9)--- 基础知识
  7. 查找算法---二分查找(递归方式)
  8. tp3.2 批量插入
  9. ScrollView中Spinner问题
  10. 基于内外环PD控制的四旋翼飞行器控制系统simulink仿真
  11. 新买的显卡需要用软件测试吗,怎么测试显卡性能?新手测试教程,秒懂
  12. lisp princ详解_LISP – 输入和输出
  13. 【studio】整理了下studio中make Project、clean Project、Rebuild Project的区别
  14. 微信小程序制作水印相机
  15. oracle18c安装教程6,Oracle 18c 安装详细过程(最全面)
  16. 一文了解DeFi主经济商,为何说它是DEX主导市场的关键
  17. 解决老Mac强行双系统后Mac系统引导丢失出现no bootable device
  18. python美元汇率兑换程序代码_如何实现python汇率转换代码
  19. 【转】【转】一个一年工作经验的java工程师从工作初到今天的所有收藏的学习java的网站(有些很经典...
  20. 弱网测试工具使用--web/pc/app

热门文章

  1. 51单片机学习历程---单片机入门
  2. origin视频教程
  3. 知识图谱_数据挖掘主要技术
  4. DSP存储器与寄存器管理
  5. 【老生谈算法】matlab实现模拟退火算法——模拟退火算法
  6. Error creating bean with name错误,spring-boot报错
  7. Eclipse使用基础教程
  8. Ubuntu下bin文件的安装
  9. 自适应权重的交叉熵计算
  10. WPS如何使参考文献对齐