目录

Spark应用开发-基于IDEA

创建工程

WordCount本地运行

WordCount集群运行

注意

修改代码如下

打成jar包

改名

上传jar包

提交到Yarn

WordCount-Java8版[了解]

说明:

WordCount流程图解

WordCount,主要流程如下图所示:


Spark应用开发-基于IDEA

实际开发Spark 应用程序使用IDEA集成开发环境,Spark课程所有代码均使用Scala语言开发,利用函数式编程分析处理数据,更加清晰简洁。

企业中也使用Java语言开发Spark程序,但较少,后续也可以给大家演示

创建工程

创建Maven Project工程

添加依赖至POM文件中,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>spark_v8_bak</artifactId><version>1.0-SNAPSHOT</version><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>apache</id><url>https://repository.apache.org/content/repositories/snapshots/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><properties><encoding>UTF-8</encoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.8</scala.version><hadoop.version>2.7.5</hadoop.version><spark.version>2.4.5</spark.version></properties><dependencies><!--依赖Scala语言--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!--SparkCore依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><!--SparkSQL依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><!--SparkSQL+ Hive依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_2.11</artifactId><version>${spark.version}</version></dependency><!-- spark-streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>${spark.version}</version></dependency><!--spark-streaming+Kafka依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><!--StructuredStreaming+Kafka依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.5</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

WordCount本地运行

http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html

package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkContextval conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")//设置运行参数val sc: SparkContext = new SparkContext(conf)//创建scsc.setLogLevel("WARN") //设置日志级别//2.读取文本文件//RDD:A Resilient Distributed Dataset (RDD)//弹性分布式数据集,我们可以把它理解为一个分布式的集合//Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy//RDD[每一行数据]val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")//3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合//3.1每一行按" "切分//RDD[单词]val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行//3.2每个单词记为1//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))//(hello,1),(hello,1),(hello,1),(hello,1)val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词//3.3按照单词进行聚合//reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作//reduceByKey即根据key进行reduce(聚合)//_+_//第1个_表示之前聚合的历史值//第2个_表示当前这一次操作的值//RDD[(hello,4)]....val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.将结果收集到本地,变为本地集合val result: Array[(String, Int)] = resultRDD.collect()//5.打印//result.foreach(println)println(result.toBuffer)//array转为buffer可以直接打印内容//为了测试,线程休眠,查看WEB UI界面Thread.sleep(1000 * 120)//6.关闭sc.stop()}
}

WordCount集群运行

注意

写入HDFS如果存在权限问题:

进行如下设置:

hadoop fs -chmod -R 777  /

并在代码中添加:

System.setProperty("HADOOP_USER_NAME", "root")

修改代码如下

将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。先修改代码,通过master设置运行模式及传递处理数据路径,代码如下:

package cn.itcast.helloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//为了程序健壮性,判断是否传递参数if(args.length != 2){println("Usage: SparkSubmit <input> <output>............")System.exit(1)//非0表示非正常退出程序}//1.创建SparkContextval conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")//设置运行参数val sc: SparkContext = new SparkContext(conf)//创建scsc.setLogLevel("WARN") //设置日志级别//2.读取文本文件//RDD:A Resilient Distributed Dataset (RDD)//弹性分布式数据集,我们可以把它理解为一个分布式的集合//Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy//RDD[每一行数据]val fileRDD: RDD[String] = sc.textFile(args(0))//3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合//3.1每一行按" "切分//RDD[单词]val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行//3.2每个单词记为1//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))//(hello,1),(hello,1),(hello,1),(hello,1)val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词//3.3按照单词进行聚合//reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作//reduceByKey即根据key进行reduce(聚合)//_+_//第1个_表示之前聚合的历史值//第2个_表示当前这一次操作的值//RDD[(hello,4)]....val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.将结果收集到本地,变为本地集合//val result: Array[(String, Int)] = resultRDD.collect()//5.输出//result.foreach(println)//println(result.toBuffer)//array转为buffer可以直接打印内容resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}")//文件输出路径//为了测试,线程休眠,查看WEB UI界面Thread.sleep(1000 * 120)//6.关闭sc.stop()}
}

打成jar包

改名

上传jar包

上传至HDFS文件系统目录【/spark/apps/】下,方便在其他机器提交任务时也可以读取。

创建HDFS目录

hdfs dfs -mkdir -p /spark/apps/

上传jar包

hdfs dfs -put /root/wc.jar /spark/apps/

提交到Yarn

SPARK_HOME=/export/server/spark${SPARK_HOME}/bin/spark-submit \--master yarn \--deploy-mode cluster \--driver-memory 512m \--executor-memory 512m \--num-executors 1 \--total-executor-cores 2 \--class cn.itcast.hello.WordCount \hdfs://node1:8020/spark/apps/wc.jar \hdfs://node1:8020/wordcount/input/words.txt hdfs://node1:8020/wordcount/output

http://node1:8088/cluster

​​​​​​​WordCount-Java8[了解]

说明:

Scala中函数的本质是对象

Java8中函数的本质可以理解为匿名内部类对象,即Java8中的函数本质也是对象

Java8中的函数式编程的语法,lambda表达式

(参数)->{函数体}

书写原则:能省则省,不能省则加上

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;public class WordCountJava8 {public static void main(String[] args){//1.创建scSparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);jsc.setLogLevel("WARN");//2.读取文件JavaRDD<String> fileRDD = jsc.textFile("data/input/words.txt");//3.处理数据//3.1每一行按照" "切割//java8中的函数格式: (参数列表)->{函数体;}  注意:原则也是能省则省//public interface FlatMapFunction<T, R> extends Serializable {//  Iterator<R> call(T t) throws Exception;//}//通过查看源码,我们发现,flatMap中需要的函数的参数是T(就是String)//返回值是Iterator//所以我们在函数体里面要返回IteratorJavaRDD<String> wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());//3.2每个单词记为1 (word,1)//public interface PairFunction<T, K, V> extends Serializable {//  Tuple2<K, V> call(T t) throws Exception;//}JavaPairRDD<String, Integer> wordAndOneRDD = wordRDD.mapToPair(word -> new Tuple2<>(word, 1));//3.3按照key进行聚合//public interface Function2<T1, T2, R> extends Serializable {//  R call(T1 v1, T2 v2) throws Exception;//}JavaPairRDD<String, Integer> wrodAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);//4.收集结果并输出List<Tuple2<String, Integer>> result = wrodAndCountRDD.collect();//result.forEach(t->System.out.println(t));result.forEach(System.out::println);//函数式编程的思想:行为参数化,你要干嘛,把要做的事情当作参数进行传递就可以了//5.关闭jsc.stop();}
}

​​​​​​​WordCount流程图解

WordCount,主要流程如下图所示:

2021年大数据Spark(十一):应用开发基于IDEA集成环境相关推荐

  1. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  2. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

  3. 2021年大数据Spark(三十一):Spark On Hive

    目录 Spark On Hive spark-sql中集成Hive Spark代码中集成Hive Spark On Hive Spark SQL模块从发展来说,从Apache Hive框架而来,发展历 ...

  4. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  5. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  6. 2021年大数据Spark(二十三):SparkSQL 概述

    目录 SparkSQL 概述 前世今生 Shark 框架-淘汰了 SparkSQL 模块 Hive 与 SparkSQL 官方定义 第一.针对结构化数据处理,属于Spark框架一个部分 第二.抽象数据 ...

  7. 2021年大数据Spark(一):框架概述

    目录 Spark框架概述 Spark 是什么 分布式内存迭代计算框架 官方定义: Spark框架概述 Spark 是加州大学伯克利分校AMP实验室(Algorithms Machines and Pe ...

  8. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  9. 2021年大数据Spark(四十四):Structured Streaming概述

    Apache Spark在2016年的时候启动了Structured Streaming项目,一个基于Spark SQL的全新流计算引擎Structured Streaming,让用户像编写批处理程序 ...

最新文章

  1. 接入网易云信IM即时通讯的微信小程序聊天室
  2. 基于国密算法SM2SSL证书的https加密,如何实现?
  3. Java MySql 连接数据库
  4. 【rzxt】windows7怎么设置桌面背景 如何快速道下一个背景
  5. 【Java】网络编程
  6. iis7 64位 操作excel的一系列问题(未完待续)
  7. C/S模型之TCP协议
  8. 巅峰对决 Spring Boot VS .NET 6
  9. 文本特征提取专题_以python为工具【Python机器学习系列(十二)】
  10. 华为是不是培养人工智能人才花费最大的公司?
  11. 企业网站专业性诊断评价
  12. 怎么打开unity tweak tool
  13. python链家网爬虫_python3编写爬虫程序获取链家网租房信息
  14. 2022 年排名前 10 的聊天机器人[示例]
  15. 苹果开放降级通道_今天下午 iOS 降级通道打开?骗子!
  16. 《钢铁是怎样炼成的》的读后感作文5000字
  17. 不写情书,程序员为什么还要学写作?
  18. 因为 ‘PRIMARY‘ 文件组已满。请删除不需要的文件、删除文件组中的对象、将其他文件添加到文件组或为文件组中的现有文件启用自动增长
  19. 导出模型中顶点与其对应的uv坐标
  20. win10 修复打印机服务器,修复:打印机始终在Win10上打印两份

热门文章

  1. 【实验楼】python简明教程
  2. 2022-2028年中国刨花板市场投资分析及前景预测报告(全卷)
  3. 2022-2028年中国复合软管行业市场行情动态及发展趋向分析报告
  4. 利用牛顿法求平方根-Go语言实现
  5. python版本控制神器Virtualenvwrapper的使用
  6. oracle dba_tables各字段含义
  7. NLP突破性成果 BERT 模型详细解读 bert参数微调
  8. 自研GPU之火(续)
  9. NVIDIA TensorRT:可编程推理加速器
  10. vsftpd的主配置文件是什么linux,linux下vsftpd配置文件选项详细说明