IBM 技术文档:Spark, 快速数据分析的又一选择

原文出处:http://www.ibm.com/developerworks/library/os-spark/

摘要:尽管Hadoop在分布式数据分析领域备受瞩目,但还是有其他选择比典型的Hadoop平台更具优势。Spark是一种可伸缩(scalable)的基于内存计算(In-Memory Computing)的数据分析平台,比Hadoop集群存储方法更有性能优势。Spark采用Scala语言实现,提供了单一的数据处理环境。本文讲述Spark的集群计算方法,并与Hadoop进行比较。

Spark与Hadoop一样,是一种开源的集群计算环境,但在特定工作负载情况下比Hadoop更高效。Spark采用基于内存的分布式数据集,优化了迭代式的工作负载以及交互式查询。

Spark采用Scala语言实现,使用Scala作为应用框架。与Hadoop不同的是,Spark和Scala紧密集成,Scala像管理本地collective对象那样管理分布式数据集。

Spark支持分布式数据集上的迭代式任务,实际上它可以在Hadoop文件系统上与Hadoop一起运行,这是由第三方集群框架Mesos实现的。Spark由加州大学伯克利分校开发,用于构建大规模、低延时的数据分析应用。


Spark集群计算架构

Spark是一种类似于Hadoop的新型集群计算框架。不同的是,Spark用于特定工作负载类型的集群计算,这种计算在多个并行操作之间重用工作数据集(如机器学习算法)。为了优化这种类型的计算,Spark引入基于内存的集群计算,即将数据集缓存在内存中,减少访问延迟。

Spark还引入了一个抽象概念,即弹性分布式数据集RDD(resilient distributed datasets )。RDD是一个分布在一组节点之间的只读的对象集合。这些集合是弹性的,即能够在部分数据集丢失的情况下重建。重建部分数据集的过程需要一种维护血统(lineage,即重建部分数据集所需的信息,说明数据是根据什么过程产生的)的容错机制支持。一个RDD可以是:(1)一个从文件创建的Scala对象,或(2)一个并行切片(分布在各个节点之间),或(3)从其他RDD转换得来,或(4)改变已有RDD的持久性,如请求将已有RDD缓存在内存中。

Spark应用称为driver,实现单个节点或一组节点上的操作。与Hadoop一样,Spark支持单节点和多节点集群。对于多节点操作,Spark依附于Mesos集群管理器。Mesos为分布式应用提供了有效的资源共享和隔离的平台(见图1)。这种配置允许Spark与Hadoop共用一个节点共享池。

图1 Spark依赖于Mesos集群管理器实现资源共享和隔离

Spark编程模型

Driver在数据集上执行两种操作:行为(action)和转换(transformation)。action,即在数据集上执行计算,并向driver返回一个值;transformation,即从已有数据集创建新的数据集。例如,执行Reduce操作(使用某个函数)、遍历数据集(即在每个元素上执行一个函数,类似Map操作),属于action;Map操作、Cache操作(即请求新的数据集缓存在内存中),属于transformation。

下面我们将简单介绍一下这两种操作的实例。不过首先熟悉一下Scala语言。


Scala简介


很多著名网站都使用Scala,像Twitter,LinkedIn,及Foursquare(其web应用框架叫Lift)。此外,有证据表明金融机构也对Scala的性能感兴趣(例如使用EDF Trading进行衍生工具定价)。

Scala是一种多范式的编程语言,支持命令式、函数式和面向对象的编程范式。从面向对象的角度来看,Scala中的每个值都是一个对象。同理,从函数式编程的角度来看,每个函数也都是一个值。Scala还是一种静态类型语言,其类型系统表达能力强且安全。

此外,Scala还是一种虚拟机语言,Scala编译器生成字节码,使用JRE2直接在Java虚拟机(JVM)上运行。这样,Scala可以在几乎任何支持JVM的地方运行(需要增加Scala运行时库),并使用已有的Java库和Java代码。

最后,Scala是可扩展的,可以以库的形式轻易无缝地集成到其他语言中去。

Scala实例


现在我们来看看Scala的几个实例。Scala有自己的解释器,可以交互式地使用它。本文不对Scala语言进行具体论述,可以参考这里。

清单1 使用解释器快速了解一下Scala语言。启动Scala之后,出现命令提示符,你就可以在交互模式下评估表达式和程序。创建变量有两种方式,一是使用val创建不可变变量(称为单一赋值的变量),二是使用var创建可变变量。如果试图对val变量进行更改,将提示错误。

清单1 Scala中的变量

$ scala

Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).

Type in expressions to have them evaluated.

Type :help for more information.

scala> val a = 1

a: Int = 1

scala> var b = 2

b: Int = 2

scala> b = b + a

b: Int = 3

scala> a = 2

<console>6: error: reassignment to val

a = 2

^

接下来,定义一个简单的函数,计算一个Int类型的平方并返回这个值。使用def定义函数,后面紧跟函数名和参数列表。不需要指定返回值,函数本身可以推断出返回值。注意,这与变量赋值操作类似。这里我演示了在3这个对象上执行这个函数,返回一个名为res0的结果变量(该变量是Scala解释器自动创建的)。见清单2。

清单2 Scala中的函数

scala> def square(x: Int) = x*x

square: (x: Int)Int

scala> square(3)

res0: Int = 9

scala> square(res0)

res1: Int = 81

接着,我们看看如何在Scala中创建简单的类(见清单3)。定义一个简单的类Dog,接受String类型的参数(相当于构造器)。注意这里类直接接受参数,而不需要在类主体中定义这个类参数。类中只有一个打印该字符串的函数。创建一个类的实例,然后调用这个函数。注意解释器会插入一些竖线,它们不是代码的一部分。

清单3 Scala中的类

scala> class Dog( name: String ) {

|   def bark() = println(name + " barked")

| }

defined class Dog

scala> val stubby = new Dog("Stubby")

stubby: Dog = Dog@1dd5a3d

scala> stubby.bark

Stubby barked

scala>

完成工作以后,只需要敲入:quit就可以退出Scala解释器。


安装Scala和Spark

首先下载和配置Scala。清单4给出了Scala的下载命令,并准备安装。根据Spark文档,这里使用2.8版本。

清单4 Scala安装

$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz

$ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/

为了使Scala可见,将以下语句添加到.bashrc文件中(假设你使用Bash):

export SCALA_HOME=/opt/scala-2.8.1.finalexport PATH=SCALAHOME/bin:PATH

然后按照清单5测试安装。这组命令加载bashrc文件,然后快速测试了Scala解释器。

清单5 配置并在交互模式下运行Scala

$ scala

Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).

Type in expressions to have them evaluated.

Type :help for more information.

scala> println("Scala is installed!")

Scala is installed!

scala> :quit

$

现在可以看到Scala命令提示符了,输入:quit退出。注意Scala在JVM上下文中执行,所以还需要JVM。我用的是Ubuntu,默认自带了OpenJDK。

接下来,根据清单6获取最新的Spark框架。

清单6 下载和安装Spark框架

$ wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

然后,设置Spark配置文件 ./conf/spar-env.sh,添加SCALA_HOME环境变量:

export SCALA_HOME=/opt/scala-2.8.1.final

最后,使用简单构建工具(sbt, simple build tool)更新Spark。sbt是Scala的构建工具,Spark中也使用它。在mesos-spark-c86af80子目录下执行更新和编译:

$ sbt/sbt update compile

注意这一步需要连接到互联网。完成以后,按照清单7测试一下Spark。这个测试例子运行SparkPi计算pi的估计值(在单位正方形中随机取点)。命令格式是示例程序(spark.examples.SparkPi),加上主机参数(即定义Mesos master)。本例实在localhost上运行,因为这是一个单节点集群。注意清单7执行了两个任务,但是它们是顺序执行的(任务0结束后任务1才开始)。

清单7 快速测试Spark

$ ./run spark.examples.SparkPi local[1]

11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501

11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501

11/08/26 19:52:33 INFO spark.SparkContext: Starting job...

11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache

11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions

11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations

11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0

11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()

11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()

11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...

11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0

11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes

11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0

11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1

11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)

11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes

11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1

11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)

11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s

Pi is roughly 3.14952

$

通过增加线程数目,不仅可以增加线程执行的并行度,同时还能缩短执行时间(见清单8)。

清单8 使用两个线程测试Spark

$ ./run spark.examples.SparkPi local[2]

11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501

11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501

11/08/26 20:04:30 INFO spark.SparkContext: Starting job...

11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache

11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions

11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations

11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0

11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()

11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()

11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...

11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0

11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1

11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes

11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes

11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0

11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1

11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)

11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)

11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s

Pi is roughly 3.14052

$

使用Scala创建简单的Spark应用

要想创建Spark应用,需要将Spark及其依赖打包成一个JAR文件。在Spark主目录下使用sbt创建JAR包:

$ sbt/sbt assembly

结果文件是./core/target/scala_2.8.1/Spark Core-assembly-0.3.jar。将这个文件添加到CLASSPATH。本例不使用这个JAR,因为我们是在Scala解释器中运行的,不需要编译。

本例使用标准的MapReduce转换(见清单9)。首先导入必需的Spark类,然后定义自己的类(SparkTest),在类中定义主函数,负责解析输入参数。这些参数定义了Spark的执行环境(这里是单节点集群)。然后,创建SparkContext对象,告诉Spark怎样访问集群。这个对象需要两个参数:Mesos主节点名(这个参数已经传进来了)和Job名。从命令行解析分片数,告诉Spark这个job需要使用多少线程。最后一步设置是指定MapReduce操作需要使用的文本文件。

现在可以真正地使用Spark了,由一系列转换(transformation)组成。调用flatMap函数返回一个RDD(根据特定的函数对文本行进行分割)。然后将这个RDD传给map函数(创建键值对),最后通过ReduceByKey函数聚集键值对。ReduceByKey将键值对传递给匿名函数 _+_,这个函数接受两个参数(键和值),返回它们组合在一起的结果(即一个String和一个Int)。然后将这个值写入到一个文本文件(位于output目录)。

清单 9 Scala/Spark中的MapReduce(SparkTest.scala)

import spark.SparkContext

import SparkContext._

object SparkTest {

def main( args: Array[String]) {

if (args.length == 0) {

System.err.println("Usage: SparkTest <host> [<slices>]")

System.exit(1)

}

val spark = new SparkContext(args(0), "SparkTest")

val slices = if (args.length > 1) args(1).toInt else 2

val myFile = spark.textFile("test.txt")

val counts = myFile.flatMap(line => line.split(" "))

.map(word => (word, 1))

.reduceByKey(_ + _)

counts.saveAsTextFile("out.txt")

}

}

SparkTest.main(args)

使用下面命令执行脚本:

$ scala SparkTest.scala local[1]

这个MapReduce测试文件将输出到output目录(output/part-00000)


其他大数据分析框架


还有很多其他大数据分析平台也值得一看。这些平台有的只是简单的基于脚本,有的提供类似于Hadoop的产品环境。

bashreduce是最简单的一种平台之一。顾名思义,它是在bash环境下,在多个机器上执行MapReduce类型的操作。bashreduce需要集群机器之间的无密码的SSH,并且可以通过UNIX风格的工具(sort, awk, netcat之类)提交脚本请求任务。

GraphLab是另一种有趣的MapReduce抽象实现,侧重机器学习算法的并行实现。GraphLab中,Map阶段定义了可以独立执行(在独立的主机上)的计算,Reduce阶段合并这些计算结果。

最后,还有最近新起的Twitter Storm。Storm是“实时处理的Hadoop”,重点是流处理和连续计算(即在计算时就将结果以流的形式输出)。Storm是用Clojure(现代版的Lisp语言)写的,同时也支持其他各种语言(如Ruby、Python)。Twitter于2011年9月将Storm开源。

更多资料请参考这里。


Spark的未来

Spark对于大数据分析方法这个日益庞大的家族而言,无疑增添了有趣的一笔。Spark处理分布式数据集的框架不仅是有效的,而且是高效的(通过简洁的Scala脚本)。Spark和Scala目前都还尚处于开发中。尽管如此,随着加入更多的关键互联网特性,它越来越从有趣的开源软件过渡为基础的web技术。

本文转自http://blog.sciencenet.cn/blog-425672-519991.html,仅供学习收藏,所有权利归原作者所有。

IBM 技术文档:Spark, 快速数据分析的又一选择相关推荐

  1. 代码中如何让无序标记的内容并排_英语技术文档中如何正确使用无序列表和有序列表?...

    Foreword 之前跟大家分享过英语技术文档中如何正确使用时态和英语技术文档中如何正确使用人称,这一篇再跟大家分享一下如何正确使用无序列表和有序列表. 其实,在技术文档中,除了无序列表和有序列表,另 ...

  2. 【资源推荐】良心之作!超过 10000+ 的互联网团队正在使用的在线 API 文档、技术文档工具...

    搞开发的同学都知道一个好的 API 文档是有多重要! 每当接手一个别人开发好的项目,看着那些没有注释的代码,真的头大. 程序员都很希望别人能写技术文档,因为可以提高自己开发的效率,而往往自己却很不希望 ...

  3. VuePress 手摸手教你搭建一个类Vue文档风格的技术文档/博客

    前言: VuePress是尤大为了支持 Vue 及其子项目的文档需求而写的一个项目,VuePress界面十分简洁,并且非常容易上手,一个小时就可以将项目架构搭好.现在已经有很多这种类型的文档,如果你有 ...

  4. 良心之作!超过 10000+ 的互联网团队正在使用的在线 API 文档、技术文档工具

    搞开发的同学都知道一个好的 API 文档是有多重要! 每当接手一个别人开发好的项目,看着那些没有注释的代码,真的头大. 程序员都很希望别人能写技术文档,因为可以提高自己开发的效率,而往往自己却很不希望 ...

  5. 这谁写的技术文档?我想锤死他...

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 后台回复"k8s",可领取k8s资料 很多技术人自己非 ...

  6. 技术文档的撰写_如何撰写出色的技术博客文章

    技术文档的撰写 从创意到完美结果的五个步骤 (Five steps to get from idea to polished result) I've been working in the open ...

  7. python技术文档_Python技术文档最佳实践

    所有好的产品都应该有一份简洁易读的使用说明书,除了苹果的产品.苹果认为他们的产品应该设计成为无须说明,用户天生就应该知道如何使用的那种. 但是很显然,对于软件来说,其复杂性之高,往往要求有与之配套的详 ...

  8. chrome vue.js插件文档_常用web研发技术文档,这里都给你准备好了

    研发学习,工作过程中,技术文档是重要的工具之一,但是不少同学使用文档的姿势有点问题,遇到问题就一顿百度,拿着很多不一定对的博客文章翻来翻去还找不到答案,反而浪费了很多时间,我觉得解决日常问题更高效的方 ...

  9. Module-ScyllaDB技术文档

    ScyllaDB 技术文档.md 简介说明 由于 scylladb 数据库本身就是基于 cassandra 的"优化版". ScyllaDB 是用 C++ 重写的 Cassandr ...

最新文章

  1. python中data.find_all爬取网站为空列表_Python网络爬虫之Scrapy 框架-分布式【第二十九节】...
  2. Windows下使用Telnet 命令测试端口号
  3. [patl2-001]紧急救援
  4. 7. SQL -- 创建数据库(表,字段)
  5. android 电池栏的高度,Android如何取得状态栏、任务栏高度
  6. 企业为什么要开通银企直联_企业为什么要做网站推广
  7. mysql workbench 1064_MySQL Workbench:查询错误(1064):第1行“ VISIBLE”附近的语?mysql-问答-阿里云开发者社区-阿里云...
  8. 回溯算法之全排列问题
  9. 2019-07-03
  10. MySQL设置默认编码
  11. Python输入,输出,Python导入
  12. [转]Flex 中的皮肤
  13. MySQL数据以全量和增量方式,同步到ES搜索引擎
  14. 从零开始学习CANoe(三)—— 系统变量的创建和使用
  15. linux 查看隐藏文件大小,Linux运维知识之linux下显示隐藏目录或隐藏文件占用空间大小...
  16. 基于E-Mail的隐蔽控制:机理与防御
  17. delphi盒子希腊打开潘多拉魔盒?债务重组或是唯一出
  18. Cutting a Rod
  19. 论文笔记-对话系统综述
  20. opencv中几种阈值分割

热门文章

  1. 马云:格局不够大,人生成就再高也有限!
  2. ZJOI2008 树的统计 树链剖分学习
  3. 关于JAVA的一道面试题
  4. leetcode 198 python
  5. java列表框_Java图形用户界面之列表框
  6. CCF201909-2 小明种苹果(续)
  7. SQL-必知必会-触发器
  8. 深度学习和神经网络——第二周笔记
  9. 读者福利:复盘2018上半年精选文章,还有礼品等着你!
  10. 一文带你梳理Clang编译步骤及命令