Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Java Programmers 或者相关 Scala书籍 进行学习。

本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用。

1. WordCount编程实例

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数,编写步骤如下:

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

1
2
val sc = new SparkContext(args(0), "WordCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:

1
val textFile = sc.textFile(args(1))

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

1
2
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])

或者直接创建一个HadoopRDD对象:

1
2
var hadoopRdd = new HadoopRDD(sc, conf,
     classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

1
2
3
    val result = hadoopRdd.flatMap{
        case(key, value)  => value.toString().split("\\s+");
}.map(word => (word, 1)). reduceByKey (_ + _)

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation。

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

1
result.saveAsSequenceFile(args(2))

当然,一般我们写Spark程序时,需要包含以下两个头文件:

1
2
import org.apache.spark._
import SparkContext._

WordCount完整程序已在“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文中进行了介绍,在次不赘述。

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是hdfs://hadoop-test/tmp/input,输出目录是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体替换成你的配置即可。

2. TopK编程实例

TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用MapReduce实现,则需要编写两个作业:WordCount和TopK,而使用Spark则只需一个作业,其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文的实现并不是最优的,有很大改进空间。

步骤1:首先需要对所有词按照词频排序,如下:

1
2
3
val sorted = result.map {
  case(key, value) => (value, key); //exchange key and value
}.sortByKey(true, 1)

步骤2:返回前K个:

1
val topK = sorted.top(args(3).toInt)

步骤3:将K各词打印出来:

1
topK.foreach(println)

注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,每个Container存在三个日志文件,分别是stdout、stderr和syslog,前两个保存的是标准输出产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中有内容。

本程序完整代码、编译好的jar包和运行脚本可以从这里下载。下载之后,按照“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文操作流程运行即可。

3. SparkJoin编程实例

在推荐领域有一个著名的开放测试集是movielens给的,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt,本节给出的SparkJoin实例则通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m。程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.spark._
import SparkContext._
object SparkJoin {
  def main(args: Array[String]) {
    if (args.length != 4 ){
      println("usage is org.test.WordCount <master> <rating> <movie> <output>")
      return
    }
    val sc = new SparkContext(args(0), "WordCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
    // Read rating from HDFS file
    val textFile = sc.textFile(args(1))
    //extract (movieid, rating)
    val rating = textFile.map(line => {
        val fileds = line.split("::")
        (fileds(1).toInt, fileds(2).toDouble)
       })
    val movieScores = rating
       .groupByKey()
       .map(data => {
         val avg = data._2.sum / data._2.size
         (data._1, avg)
       })
     // Read movie from HDFS file
     val movies = sc.textFile(args(2))
     val movieskey = movies.map(line => {
       val fileds = line.split("::")
        (fileds(0).toInt, fileds(1))
     }).keyBy(tup => tup._1)
     // by join, we get <movie, averageRating, movieName>
     val result = movieScores
       .keyBy(tup => tup._1)
       .join(movieskey)
       .filter(f => f._2._1._2 > 4.0)
       .map(f => (f._1, f._2._1._2, f._2._2._2))
    result.saveAsTextFile(args(3))
  }
}

你可以从这里下载代码、编译好的jar包和运行脚本。

这个程序直接使用Spark编写有些麻烦,可以直接在Shark上编写HQL实现,Shark是基于Spark的类似Hive的交互式查询引擎,具体可参考:Shark。

4. 总结

Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照官方实例编写程序,包括Scala、Java和Python三种语言实例。

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/

Apache Spark学习:利用Scala语言开发Spark应用程序相关推荐

  1. Spark开发学习之使用idea开发Spark应用

    Spark学习之使用idea开发Spark应用 该文章是基于jdk1.8,idea开发工具,maven都配置好的前提下进行讲述的. 背景 由于已经在远程centos服务器上部署了saprk服务,但基于 ...

  2. maven环境下使用java、scala混合开发spark应用

    熟悉java的开发者在开发spark应用时,常常会遇到spark对java的接口文档不完善或者不提供对应的java接口的问题.这个时候,如果在java项目中能直接使用scala来开发spark应用,同 ...

  3. 使用Scala语言编写Spark应用程序实现数据去重

    使用Scala语言编写Spark应用程序实现数据去重 一.题目需求 二.建立目录结构 (一)创建 sparkapp4 文件夹并切换 (二)创建 data 文件夹(存放A.txt B.txt) (三)创 ...

  4. Spark学习笔记1——第一个Spark程序:单词数统计

    Spark学习笔记1--第一个Spark程序:单词数统计 笔记摘抄自 [美] Holden Karau 等著的<Spark快速大数据分析> 添加依赖 通过 Maven 添加 Spark-c ...

  5. android studio scala插件,Scala 语言开发Andorid ,开发环境的搭建(一)

    Scala 语言开发Andorid ,开发环境的搭建 厌倦 Java 繁琐的语法,为了更优雅的开发 Android 程序,Scala 代替 Java 是一个不错的尝试. 开发前可以学习 Scala 的 ...

  6. 利用PHP语言开发手机app后台服务器的框架是什么?或者说开发流程是怎么样的?

    最近正在做一个手机APP的服务端API开发,虽然是基于Ruby on Rails的,做的也不太专业,不过大致相通,希望能够给你一些启发. 首先,如果是比较简单的手机APP,例如新闻客户端这样的 不会涉 ...

  7. php开发的app商城,如何利用PHP语言开发手机APP

    如何利用PHP语言开发手机APP 一般的PHP框架都可以用来做app后台服务器.因为原理上客户端从你这边拿的都是字符串数据,所以就算你不用框架也没有问题,不过会引发后续的问题.PHP提供API给客户端 ...

  8. 使用Scala语言开发GUI界面的计算24点的游戏应用

    今年开始学习Scala语言,对它的强大和精妙叹为观止,同时也深深感到,要熟练掌握这门语言,还必须克服很多艰难险阻. 这时,我就在想,如果能有一种方式,通过实际的应用实例,以寓教于乐的方式,引导我们逐步 ...

  9. 大数据DTSpark蘑菇云行动之 第一课:Scala语言开发环境搭建

    大数据DTSpark"蘑菇云"行动之 第一课:Scala语言开发环境搭建 第一次听王家林老师的课,感觉很不错,特别是家林老师对技术的那种热情深深的感染了我.希望在以后的日子学有所成 ...

最新文章

  1. 数据结构与算法(2-1)线性表之顺序存储(顺序表)
  2. seaborn系列 (4) | 分类图catplot()
  3. net5:Theme主题样式的动态变换,在内容页content中操作影响模板页的操作
  4. legend2---开发常用语句
  5. 渲染管道(3)几何阶段二“坐标转换”
  6. python与java、php、go的优势对比
  7. bzoj 1369: Gem 树形dp
  8. 码支付如何对接网站_支付宝当面付门店码如何做?
  9. ASP.NET Core的配置(1):读取配置信息
  10. 谷歌笔试题(Google十二岁生日晚)
  11. python输出字典的前十项,从字典列表中获取前5个值?
  12. 就业技术书文件表格_429页标准指南,教你如何管理工程监理文件资料,丰富图表一看就会...
  13. order by 子查询_【框架】118:mybatis之多表高级查询
  14. Python 第二篇:python字符串、列表和字典的基本操作方法
  15. 百度小程序html转码,百度小程序全局配置
  16. 基于运动特征的视频质量评价方法(基于H.264)
  17. 云智慧悄然“变身”业务运维,到底发生了什么?
  18. PGP加密软件使用教程-云安全技术
  19. 计算机组装配置兼容,电脑组装时怎么选择配置主板
  20. python计算样本方差_Python numpy 样本方差估计

热门文章

  1. 学习笔记(十三)——vim编辑与linux命令
  2. expdp oracle 并行_关于Expdp/Impdp 并行导入导出详细测试结果和并行参数的正确理解!!...
  3. Ubuntu使用docker安装redmine
  4. java map移除key为空_Java实现过滤掉map集合中key或value为空的值示例
  5. Python的setuptools详解【3】打包wheel并提交给pypi
  6. python的QT5:如何用QT5实现菜单
  7. 动态性是Java的特性吗_Java的动态特性有哪些?
  8. 嗅觉计算机应用,重磅!美国科技巨头宣布!计算机终于有了“嗅觉”了!
  9. git 改了一段代码不想要了_Git - 如何将master/其他分支上修改代码不提交直接移到新建分支...
  10. wps打包exe文件_如何使用PTEmaker将PPT打包为exe可执行文件(图文) - 电脑教程