本篇博客,Alice为大家带来关于如何在IDEA上编写Spark程序的教程。

文章目录

  • 写在前面
  • 准备材料
  • 图解WordCount
  • pom.xml
  • 本地执行
  • 集群上运行
  • Java8版[了解]

写在前面

本次讲解我会通过一个非常经典的案例,同时也是在学MapReduce入门时少不了的一个例子——WordCount 来完成不同场景下Spark程序代码的书写。大家可以在敲代码时可以思考这样一个问题,用Spark是不是真的比MapReduce简便?

准备材料

wordcount.txt

hello me you her
hello you her
hello her
hello

图解WordCount

pom.xml

  • 创建Maven项目并补全目录、配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.czxy</groupId><artifactId>spark_demo</artifactId><version>1.0-SNAPSHOT</version><!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>http://repository.jboss.com/nexus/content/groups/public</url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.2.0</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><!-- <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.0</version></dependency>--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
  • maven-assembly-plugin和maven-shade-plugin的区别

可以参考这篇博客https://blog.csdn.net/lisheng19870305/article/details/88300951

本地执行

package com.czxy.scalaimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/** @Auther: Alice菌* @Date: 2020/2/19 08:39* @Description:流年笑掷 未来可期。以梦为马,不负韶华!*/
/*** 本地运行*/
object Spark_wordcount {def main(args: Array[String]): Unit = {// 1.创建SparkContextvar config = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(config)sc.setLogLevel("WARN")// 2.读取文件// A Resilient Distributed Dataset (RDD)弹性分布式数据集// 可以简单理解为分布式的集合,但是Spark对它做了很多的封装// 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile("G:\\2020干货\\Spark\\wordcount.txt")// 3.处理数据// 3.1 对每一行数据按空格切分并压平形成一个新的集合中// flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))// 3.2 每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))// 3.3 根据key进行聚合,统计每个单词的数量// wordAndOneRDD.reduceByKey((a,b)=>a+b)// 第一个_: 之前累加的结果// 第二个_: 当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)// 4. 收集结果val result: Array[(String, Int)] = wordAndCount.collect()// 控制台打印结果result.foreach(println)}
}

运行的结果:

集群上运行

package com.czxy.scalaimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/** @Auther: Alice菌* @Date: 2020/2/19 09:12* @Description:流年笑掷 未来可期。以梦为马,不负韶华!*/
/*** 集群运行*/
object Spark_wordcount_cluster {def main(args: Array[String]): Unit = {// 1. 创建SparkContextval config = new SparkConf().setAppName("wc")val sc = new SparkContext(config)sc.setLogLevel("WARN")// 2. 读取文件// A Resilient Distributed Dataset (RDD) 弹性分布式数据集// 可以简单理解为分布式的集合,但是spark对它做了很多的封装// 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile(args(0)) // 文件输入路径// 3. 处理数据// 3.1对每一行数据按照空格进行切分并压平形成一个新的集合// flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))// 3.2 每个单词记为1val wordAndOneRDD = wordRDD.map((_,1))// 3.3 根据key进行聚合,统计每个单词的数量// wordAndOneRDD.reduceByKey((a,b)=>a+b)// 第一个_:之前累加的结果// 第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)wordAndCount.saveAsTextFile(args(1)) // 文件输出路径}
}
  • 打包

  • 上传
  • 执行命令提交到Spark-HA集群
/export/servers/spark/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output4
  • 执行命令提交到YARN集群
/export/servers/spark/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output5

这里我们提交到YARN集群

运行结束后在hue中查看结果


Java8版[了解]

Spark是用Scala实现的,而scala作为基于JVM的语言,与Java有着良好集成关系。用Java语言来写前面的案例同样非常简单,只不过会有点冗长。

package com.czxy.scala;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;/*** @Auther: Alice菌* @Date: 2020/2/21 09:48* @Description: 流年笑掷 未来可期。以梦为马,不负韶华!*/
public class Spark_wordcount_java8 {public static void main(String[] args){SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> fileRDD = jsc.textFile("G:\\2020干货\\Spark\\wordcount.txt");JavaRDD<String> wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(w -> new Tuple2<>(w, 1));JavaPairRDD<String, Integer> wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b);//wordAndCount.collect().forEach(t->System.out.println(t));wordAndCount.collect().forEach(System.out::println);//函数式编程的核心思想:行为参数化!}}

运行后的结果是一样的。


本次的分享就到这里,受益的小伙伴或对大数据技术感兴趣的朋友记得点赞关注Alice哟(^U^)ノ~YO

如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)相关推荐

  1. 如何在Hadoop上编写MapReduce程序

    1. 概述 1970年,IBM的研究员E.F.Codd博士在刊物<Communication of the ACM>上发表了一篇名为"A Relational Model of ...

  2. 微信小程序开发的三种模式

    摘要:截止到2018年6月底,正式上线发布的微信小程序已超过100万个.而越来越多的公司也已经在做微信小程序开发,许多人会觉得"微信小程序开发是开发者们的专利".答案是否定的,今天 ...

  3. 使用Scala编写Spark程序求基站下移动用户停留时长TopN

    使用Scala编写Spark程序求基站下移动用户停留时长TopN 1. 需求:根据手机基站日志计算停留时长的TopN 我们的手机之所以能够实现移动通信,是因为在全国各地有许许多多的基站,只要手机一开机 ...

  4. spark调用python程序包_pycharm编写spark程序,导入pyspark包的3中实现方法

    一种方法: File --> Default Setting --> 选中Project Interpreter中的一个python版本-->点击右边锯齿形图标(设置)-->选 ...

  5. 用java编写spark程序,简单示例及运行

     最近因为工作需要,研究了下spark,因为scala还不熟,所以先学习了java的spark程序写法,下面是我的简单测试程序的代码,大部分函数的用法已在注释里面注明. 我的环境:hadoop 2 ...

  6. 好程序员大数据教程:SparkShell和IDEA中编写Spark程序

    好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spa ...

  7. Eclipse helios 上编写arduino程序并进行烧录

    源:Eclipse helios 上编写arduino程序并进行烧录

  8. android用什么更新应用程序,如何在Android上更新应用程序 教你如何更新安卓手机APP...

    您从Play商店下载的大多数Android应用程序都会出于各种原因而定期提供更新:添加功能,错误修复,提高安全性...了解如何在Android智能手机上管理应用程序以及如何使它们保持最新. 如果您想要 ...

  9. DolphiScheduler平台上运行spark程序时,外部参数设置

    DolphiScheduler平台上运行spark程序时,外部参数设置 近期使用DS平台执行spark程序,遇到了部分参数设置的问题,代码中需要外部传入一个参数procDate(处理日期),具体设置如 ...

最新文章

  1. 面对996,程序员如何利用“碎片时间”涨薪?
  2. strlen函数,strcat函数,strcpy函数,strncpy函数,strcmp函数
  3. 笔记-知识产权与标准化知识-GB/T-12504-1990计算机软件质量保证计划规范
  4. 技术系列课|从NE264到NE265:视频编码技术缔造美好生活
  5. 非阻塞IO与异步IO
  6. “北斗女神”徐颖:科研时间一定大于996
  7. java 获取类方法_Java之反射机制三:获取类的方法
  8. 诗与远方:无题(十一)
  9. Linux与shell编程之一: Linux基础知识总结
  10. MATLAB——判断两个矩阵的元素是否完全相同
  11. html中.inner样式,关于通过innerHTML插入样式的问题
  12. django学习日志(模板的渲染过程)第八部分:字符串数据转义
  13. 期待只在最美的时光遇见你
  14. 微信服务器拒绝发送离线文件,解答:微信怎么接收QQ离线消息?-qq离线文件
  15. linux版qq怎么传文件,QQ for linux终于能在线传送文件了~
  16. java移动端部署_飞桨实战笔记:自编写模型如何在服务器和移动端部署
  17. transformer t5 relative position代码解读
  18. 基于Google Earth Engine的Landsat单窗算法地表温度(LST)反演
  19. 02简单数值特征的技巧
  20. XML是什么?有什么用?

热门文章

  1. labview 多位组合成1个字、1个字拆成多位
  2. 真的醉了!28天肝完阿里面试通关宝典,分享一点面试小经验
  3. ThinkPhp6+Vue智慧城市后台管理系统
  4. Keil 官网下载PACK包的地址
  5. 96微信编辑器如何调整文章格式?
  6. 年薪20-50万 | 艾视维科技“3D视觉算法工程师”岗位招人啦!
  7. office__让word记录并显示修改标记及接受(去除)修改标记
  8. HDU 5983(模拟魔方 模拟)
  9. 基于高德地图SDK实现跑步路线踩点
  10. cad线段总和lisp_CAD中数字求和