Spark On Yarn 运行项目
在spark中,支持4中运行模式:
- Local:往往使用本地开发的时候使用。
- StandAlone:是spark自带的,如果一个集群是StandAlone模式的话,那么就需要在多台机器上同时部署Spark环境。缺点是改一个机器的配置,其余所有机器的配置都需要同步才生效。
- YARN:推荐使用YARN,统一使用YARN进行整个集群作业(MR、Spark)的资源调度。
- Mesos
不管使用什么模式,spark应用程序的代码是一样的,只需要在提交的时候通过--master
参数来指定运行模式即可。
Spark支持可插拔的集群管理模式。
对于Yarn而言,Spark Application仅仅是一个客户端而已。
Spark On Yarn两种模式
Driver的运行位置
- Client:Driver运行在Client端(提交Spark作业的机器),client会和请求到的container进行通信,来完成作业的调度和执行,Client是不能退出的。ApplicationMaster的职责就是到Yarn Resource Manager中申请资源。
- Cluster:Driver运行在ApplicationMaster中,Client只要提交完作业就可以关掉,因为作业已经在Yarn中运行了。ApplicationMaster的职责不仅要去Yarn Resource Manager中申请资源,还要处理作业的调度。
运行输出日志位置
- client:日志信息在控制台,便于测试
- Cluster:运行在终端,看不到的。因为日志在Driver上,只能通过
yarn -logs applicationId
来进行查看。
Spark On Yarn执行命令
命令:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # 省略,则默认是Client模式
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4
首先启动Yarn
在~/app/hadoop-2.6.0-cdh5.15.1/sbin
中启动start-all.sh
,浏览器中访问:
Client运行模式
执行:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4
出现报错信息:
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
如果想运行在Yarn之上,那么就必须设置HADOOP_CONF_DIR
或者是YARN_CONF_DIR
两种解决方式:
- 方式一:添加环境变量
HADOOP_CONF_DIR=/home/iie4bu/app/hadoop-2.6.0-cdh5.15.1/etc
- 方式二:修改
spark-env.sh
,在这个文件中添加HADOOP_CONF_DIR
修改完之后,再次运行上面的命令,控制台输出结果:
Cluster运行模式
执行:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4
控制台中没有输出结果:
但是可以看到applicationId
通过执行命令:yarn logs -applicationId application_1586230035025_0002
,可以看到执行结果:
将之前的项目打包
之前的项目代码SparkStatCleanJobYARN
修改如下:
package cn.ac.iie.logimport org.apache.spark.sql.{SaveMode, SparkSession}/*** 使用Spark完成我们的数据清洗操作, 运行在Yarn之上*/
object SparkStatCleanJobYARN {def main(args: Array[String]): Unit = {if(args.length != 2){println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")System.exit(1)}val Array(inputPath, outputPath) = argsval spark = SparkSession.builder().getOrCreate()val acessRDD = spark.sparkContext.textFile(inputPath)// acessRDD.take(10).foreach(println)// RDD => DFval accessDF = spark.createDataFrame(acessRDD.map(x => AccessConvertUtil.parseLog(x)), AccessConvertUtil.struct)// accessDF.printSchema()// accessDF.show(false)accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(outputPath)spark.stop()}}
对于TopNStatJobYARN
代码修改如下:
package cn.ac.iie.logimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}import scala.collection.mutable.ListBuffer/*** TopN 统计spark作业, 运行在Yarn上*/
object TopNStatJobYARN {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").getOrCreate()if(args.length != 2){println("Usage: TopNStatJobYARN <inputPath> <day>")System.exit(1)}val Array(inputPath, day) = argsval accessDF = spark.read.format("parquet").load(inputPath)// accessDF.printSchema()// accessDF.show(false)//val day = "20190702"StatDao.deleteDay(day)// 最受欢迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF, day)// 按照地市进行统计TopN课程cityTypeAccessTopNStat(spark, accessDF,day)// 按照流量进行统计netTypeTrafficTopNStat(spark, accessDF, day)spark.stop}/*** 按流量进行统计* @param spark* @param accessDF*/def netTypeTrafficTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val trafficsTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(sum("num").as("traffics")).orderBy(desc("traffics"))// .show(false)// 将统计结果写入到Mysql中try {trafficsTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeTrafficsStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval traffics = info.getAs[Long]("traffics")list.append(DayNetTypeTrafficsStat(day, uid, traffics))})StatDao.insertDayNetTypeTrafficsAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 按照地市进行统计Top3课程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函数在Spark SQL的使用val top3DF = cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3")//.show(false)// 将统计结果写入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 最受欢迎的TopN netType** @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val wifiAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(count("uid").as("times")).orderBy(desc("times"))wifiAccessTopNDF.show(false)// accessDF.createOrReplaceTempView("access_logs")
// val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc")
// wifiAccessTopNDF.show(false)// 将统计结果写入到Mysql中try {wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}
}
appName和master,可以在命令行端进行指定。
因为我们的后台环境中,已经有相关的spark环境,没有ipdatabase,mysql
等我们环境的依赖库,所以需要把spark依赖去掉。
对应的pom.xml文件内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ac.iie.spark</groupId><artifactId>sql</artifactId><version>1.0-SNAPSHOT</version><inceptionYear>2008</inceptionYear><properties><scala.version>2.11.12</scala.version><spark.version>2.4.5</spark.version></properties><dependencies><!-- Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>provided</scope></dependency><!-- Spark --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><!-- hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>LATEST</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.ggstar</groupId><artifactId>ipdatabase</artifactId><version>1.0</version></dependency><!-- jdbc --><dependency><groupId>org.spark-project.hive</groupId><artifactId>hive-jdbc</artifactId><version>1.2.1.spark2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.6</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><!-- If you have classpath issue like NoDefClassError,... --><!-- useManifestOnlyJar>false</useManifestOnlyJar --><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass></mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>
而且需要添加相应的plugin
使用命令mvn assembly:assembly
将jar文件上传到服务器中
将原始数据上传到hdfs中
执行命令:
spark-submit \
--class cn.ac.iie.log.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /home/iie4bu/lib/ipDatabase.csv,/home/iie4bu/lib/ipRegion.xlsx \
/home/iie4bu/lib/sql-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://manager:9000/imooc/input hdfs://manager:9000/imooc/clean
注意:--files
的使用
查看我们导入的文件内容:
使用spark-shell:./spark-shell --master local[2] --jars /home/iie4bu/software/mysql-connector-java-5.1.35.jar
scala> spark.read.format("parquet").load("hdfs://manager:9000/imooc/clean/day=20190730/part-00000-52c8b5fd-8496-4402-a5a8-31159291a5bc.c000.snappy.parquet").show(false)
说明数据清洗成功。
接下来统计作业:
spark-submit --class cn.ac.iie.log.TopNStatJobYARN --name TopNStatJobYARN --master yarn --executor-memory 1G --num-executors 1 /home/iie4bu/lib/sql-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://manager:9000/imooc/clean/ 20190702
可以看到mysql中的数据已经成功添加进去了。
Spark On Yarn 运行项目相关推荐
- Spark面试:Spark on yarn 运行流程
一: Spark on YARN能让Spark计算模型在云梯YARN集群上运行,直接读取云梯上的数据,并充分享受云梯YARN集群丰富的计算资源. 二: 基于YARN的Spark作业首先由客户端生成作业 ...
- Spark On Yarn 运行模式(详细)
Spark 在spark 中,支持4中运行模式: 1)Local:开发时使用 2)Standalone: 是spark 自带的,如果一个集群是standalong 的话,那么就需要在多台机器上同时部署 ...
- 我的Bug日常:spark基于yarn运行时抛错,内存不足Required executor memory (1024 MB), offHeap memory (0) MB。问题已解决,亲测有用~~~
问题描述: 在yarn集群上运行spark时,报了如下错误 主要内容: Exception in thread "main" java.lang.IllegalArgumentEx ...
- 本地提交spark_spark快速入门(三)-------spark部署及运行模式
spark支持多种部署方案,包括spark自带的standalone资源调度模式(StandAlone):运行在hadoop的yarn资源调度框架中(SparkOnYARN):local本地模式:可以 ...
- 三, Spark 四种运行环境配置总结
三, Spark 运行环境 Spark 的运行模式有 Local(也称单节点模式),Standalone(集群模式),Spark on Yarn(运行在Yarn上),Mesos以及K8s, Windo ...
- Spark On Yarn VCore Userd 值不正常(DefaultResourceCalculator / DominantResourceCalculator )
Spark On Yarn VCore Userd 值不正常,目前集群有两个任务再跑,每个任务使用1cores. 在执行下面的脚本的时候.资源使用如下图: 执行脚本: spark-submit \ ...
- spark 查看yarn日志_spark周边项目之Livy
无Spark Client环境的部署实现 首先,熟悉spark开发的人都知道spark的部署模式分为三种,分别为Local.Standalone.YARN,通过YARN又分为YARN-Client和Y ...
- Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...
目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...
- Spark 在YARN上运行
在YARN上运行Spark 安全 在YARN上启动Spark 添加其他JAR 准备工作 组态 调试您的应用程序 Spark特性 重要笔记 的Kerberos YARN特定的Kerberos配置 Ker ...
最新文章
- R异常数据检测及处理方法
- [转]linux解压 tar命令
- 2021的第一个offer来自mbzuai
- SpringBoot_入门-微服务简介
- 设计模式的分类和六大设计原则
- Linux Server 安装 raid 1
- swt matlab 中 swa,Matlab小波工具箱的使用3
- JavaScript知识点之“事件机制”
- linux standby模式,搭建11g 单机 linux standby 操作文档
- python3—列表
- pytorch冻结模型
- KVM套件-linux基础
- [转载]数字全息与计算全息
- 用于实时实例分割的Deep Snake算法
- java三种功能加强模式
- Java常见问题之Data too long for column 'orResponse' at row 1
- 『为金融数据打标签』「2. 元标签方法」
- 数据库数据修改报错The instance of entity type ‘XXX‘ cannot be tracked
- AI 图像识别的测试
- 汽车学堂 自 动 驾 驶 决 策 与 控 制 算 法——现 代 控 制 理 论学习(一)
热门文章
- 验证视图状态 MAC 失败的解决办法
- SQL语句的基本语法一
- 实体类(VO,DO,DTO)的划分
- Django2.2-LookupError No installed app with label admin
- 从PHP5到PHP7自我封装MongoDB以及平滑升级
- php轻量级的性能分析工具xhprof的安装使用
- c语言的查询功能,求C语言实现查询功能(如果选择3,如何实现查询)
- PHP学级与年级的转换函数_PHP addslashes()和stripslashes():字符串转义与还原
- 如何看创建媒体日期_每天约4万个网约车投诉,看AI如何接招_媒体_澎湃新闻
- foreach ($cc as $key = $item);$item加与不加的区别