Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Java Programmers或者相关Scala书籍进行学习。

本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用。

1. WordCount编程实例

WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数,编写步骤如下:

步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:

  1. val sc = new SparkContext(args(0), "WordCount",
  2. System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

复制代码

步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:

  1. val textFile = sc.textFile(args(1))

复制代码

当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:

  1. val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
  2. var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])

复制代码

或者直接创建一个HadoopRDD对象:

  1. var hadoopRdd = new HadoopRDD(sc, conf,
  2. classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])

复制代码

步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:

  1. val result = hadoopRdd.flatMap{
  2. case(key, value)  => value.toString().split("\\s+");
  3. }.map(word => (word, 1)). reduceByKey (_ + _)

复制代码

其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation。

步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:

  1. result.saveAsSequenceFile(args(2))

复制代码

当然,一般我们写Spark程序时,需要包含以下两个头文件:

  1. import org.apache.spark._
  2. import SparkContext._

复制代码

WordCount完整程序已在“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文中进行了介绍,在次不赘述。

需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是hdfs://hadoop-test/tmp/input,输出目录是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体替换成你的配置即可。

2. TopK编程实例

TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用MapReduce实现,则需要编写两个作业:WordCount和TopK,而使用Spark则只需一个作业,其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文的实现并不是最优的,有很大改进空间。

步骤1:首先需要对所有词按照词频排序,如下:

  1. val sorted = result.map {
  2. case(key, value) => (value, key); //exchange key and value
  3. }.sortByKey(true, 1)

复制代码

步骤2:返回前K个:

  1. val topK = sorted.top(args(3).toInt)

复制代码

步骤3:将K各词打印出来:

  1. topK.foreach(println)

复制代码

注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,每个Container存在三个日志文件,分别是stdout、stderr和syslog,前两个保存的是标准输出产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中有内容。

本程序完整代码、编译好的jar包和运行脚本可以从这里下载。下载之后,按照“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文操作流程运行即可。

3. SparkJoin编程实例

在推荐领域有一个著名的开放测试集是movielens给的,下载链接是: http://grouplens.org/datasets/movielens/ ,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt,本节给出的SparkJoin实例则通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m。程序代码如下:

  1. import org.apache.spark._
  2. import SparkContext._
  3. object SparkJoin {
  4. def main(args: Array[String]) {
  5. if (args.length != 4 ){
  6. println("usage is org.test.WordCount <master> <rating> <movie> <output>")
  7. return
  8. }
  9. val sc = new SparkContext(args(0), "WordCount",
  10. System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
  11. // Read rating from HDFS file
  12. val textFile = sc.textFile(args(1))
  13. //extract (movieid, rating)
  14. val rating = textFile.map(line => {
  15. val fileds = line.split("::")
  16. (fileds(1).toInt, fileds(2).toDouble)
  17. })
  18. val movieScores = rating
  19. .groupByKey()
  20. .map(data => {
  21. val avg = data._2.sum / data._2.size
  22. (data._1, avg)
  23. })
  24. // Read movie from HDFS file
  25. val movies = sc.textFile(args(2))
  26. val movieskey = movies.map(line => {
  27. val fileds = line.split("::")
  28. (fileds(0).toInt, fileds(1))
  29. }).keyBy(tup => tup._1)
  30. // by join, we get <movie, averageRating, movieName>
  31. val result = movieScores
  32. .keyBy(tup => tup._1)
  33. .join(movieskey)
  34. .filter(f => f._2._1._2 > 4.0)
  35. .map(f => (f._1, f._2._1._2, f._2._2._2))
  36. result.saveAsTextFile(args(3))
  37. }
  38. }

复制代码

你可以从这里下载代码、编译好的jar包和运行脚本。

这个程序直接使用Spark编写有些麻烦,可以直接在Shark上编写HQL实现,Shark是基于Spark的类似Hive的交互式查询引擎,具体可参考:Shark。

4. 总结

Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照官方实例编写程序,包括Scala、Java和Python三种语言实例。

Spark开发语言Scala语言相关推荐

  1. Apache Spark学习:利用Scala语言开发Spark应用程序

    Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...

  2. windows下spark开发环境配置

    --本篇随笔由同事葛同学提供. windows下spark开发环境配置 特注:windows下开发spark不需要在本地安装hadoop,但是需要winutils.exe.hadoop.dll等文件, ...

  3. Spark开发——Spark简介及入门

    目录 什么是Spark? Spark有哪些特点和优势 1.计算速度 2.易用性 3.通用性 4.兼容性 Spark架构 Spark基本概念 Spark结构设计 使用Scala语言实现Spark本地词频 ...

  4. Spark的安装与使用 第1关:Scala语言开发环境的部署

    Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上.随着开发者对Scala的兴趣日增,以及越来越多的工具支持,无疑Scala语言将成为你手上一件必不可少的工具. ...

  5. Spark快速大数据分析——Scala语言基础(壹)

    Spark快速大数据分析--Scala语言基础(壹) 文章目录 Spark快速大数据分析--Scala语言基础(壹) 前记 Scala的历史 环境搭建: 1.SBT构建工具和REPL: 2.使用IDE ...

  6. Spark编程基础-(二)Scala语言基础

    1. Scala语言概述 1.1 计算机的起源 阿隆佐邱奇设计了演算的系统,形式系统. 阿兰图灵提出图灵机. 冯诺依曼是计算机体系结构的奠基者.1945年提出计算机体系结构. 图1 冯诺依曼体系结构 ...

  7. 大数据Spark高可用环境之Scala语言的下载安装

    大数据Spark高可用环境之Scala语言的下载安装 这里写目录标题 大数据Spark高可用环境之Scala语言的下载安装 1.安装Scala 1.1 Scala的下载 1.2 传入XShell 2. ...

  8. Scala语言开发环境的部署

    任务描述 本关任务:安装与配置Scala开发环境. 相关知识 Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上.随着开发者对Scala的兴趣日增,以及越来越多的 ...

  9. Scala语言简介以及开发环境部署

    一.概述 1.1 为什么学习scala Spark就是使用Scala编写的,为了更好的学习Spark,需要掌握Scala这门语言 Spark的兴起,带动了Scala语言的发展 1.2 Scala与Ja ...

最新文章

  1. 清华大学矣晓沅:“九歌”——基于深度学习的中国古典诗歌自动生成系统
  2. 024_html列表
  3. Linux Centos7 下安装Mysql - 8.0.15
  4. android 相机 全功能,一加7系首个Android 11公测代码暗示了相机应用的诸多功能更新...
  5. 亿级搜索系统的基石,如何保障实时数据质量?
  6. 以及其任何超类对此上下文都是未知的_web前端入门到实战:Javascript 中的「上下文」你只需要看这一篇
  7. 给 IConfiguration 写一个 GetAppSetting 扩展方法
  8. IDEA 常用配置以及快捷
  9. html设置渐变色背景图片,css中渐变色作为背景图来使用总结
  10. xpath 取标签下所有文字内容_如何理解葡萄酒标签上的所有内容(下)
  11. SOLIDWORKS产品设计学习
  12. [渝粤题库]西北工业大学中国古代法制史
  13. Ubuntu18.04和Win10共享文件夹
  14. paypal支付接口说明
  15. 计算机组成原理实训重要吗,计算机组成原理实训_报告.doc
  16. APP开放源码第一弹《纳豆》
  17. 折半查找法(二分法)流程图
  18. 基于人脸的常见表情识别——模型搭建、训练与测试¶
  19. 利用Python实现员工信息管理系统 64行超简
  20. 英语学习详细笔记(二)be动词,一般动词

热门文章

  1. 2022年高压电工考试技巧及高压电工复审考试
  2. Cointelegraph中文HUB | Roxe:全球支付网络的倡导者和领先者暨团队亚太见面会
  3. 爱情中的决策树和贝叶斯
  4. C++上机练习:调整方阵
  5. 将Excel表格导入mysql数据表_将excel表导入数据库的方法步骤
  6. android桌面widget怎么设置它大小与屏幕同宽啊,理解与应用Android桌面组件AppWidget...
  7. CSDN“重兵”出击2018TokenSky区块链大会首尔站,区块链布局升级
  8. Flutter全局路由封装及路由栈维护
  9. 清华大学计算机学院刘凯,刘 凯-清华大学化学工程系
  10. NeuroImage:MEG/EEG数据中常见的错误