在spark中,支持4中运行模式:

  • Local:往往使用本地开发的时候使用。
  • StandAlone:是spark自带的,如果一个集群是StandAlone模式的话,那么就需要在多台机器上同时部署Spark环境。缺点是改一个机器的配置,其余所有机器的配置都需要同步才生效。
  • YARN:推荐使用YARN,统一使用YARN进行整个集群作业(MR、Spark)的资源调度。
  • Mesos

不管使用什么模式,spark应用程序的代码是一样的,只需要在提交的时候通过--master参数来指定运行模式即可。

Spark支持可插拔的集群管理模式。
对于Yarn而言,Spark Application仅仅是一个客户端而已。

Spark On Yarn两种模式

Driver的运行位置

  • Client:Driver运行在Client端(提交Spark作业的机器),client会和请求到的container进行通信,来完成作业的调度和执行,Client是不能退出的。ApplicationMaster的职责就是到Yarn Resource Manager中申请资源。
  • Cluster:Driver运行在ApplicationMaster中,Client只要提交完作业就可以关掉,因为作业已经在Yarn中运行了。ApplicationMaster的职责不仅要去Yarn Resource Manager中申请资源,还要处理作业的调度。

运行输出日志位置

  • client:日志信息在控制台,便于测试
  • Cluster:运行在终端,看不到的。因为日志在Driver上,只能通过yarn -logs applicationId来进行查看。

Spark On Yarn执行命令

命令:

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # 省略,则默认是Client模式
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4

首先启动Yarn

~/app/hadoop-2.6.0-cdh5.15.1/sbin中启动start-all.sh,浏览器中访问:

Client运行模式

执行:

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4

出现报错信息:

Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

如果想运行在Yarn之上,那么就必须设置HADOOP_CONF_DIR 或者是YARN_CONF_DIR
两种解决方式:

  • 方式一:添加环境变量HADOOP_CONF_DIR=/home/iie4bu/app/hadoop-2.6.0-cdh5.15.1/etc
  • 方式二:修改spark-env.sh,在这个文件中添加HADOOP_CONF_DIR

修改完之后,再次运行上面的命令,控制台输出结果:

Cluster运行模式

执行:

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \ # 设置partition数量,partition代表并行度,默认200
/home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \
4

控制台中没有输出结果:

但是可以看到applicationId
通过执行命令:yarn logs -applicationId application_1586230035025_0002,可以看到执行结果:

将之前的项目打包

之前的项目代码SparkStatCleanJobYARN修改如下:

package cn.ac.iie.logimport org.apache.spark.sql.{SaveMode, SparkSession}/*** 使用Spark完成我们的数据清洗操作, 运行在Yarn之上*/
object SparkStatCleanJobYARN {def main(args: Array[String]): Unit = {if(args.length != 2){println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")System.exit(1)}val Array(inputPath, outputPath) = argsval spark = SparkSession.builder().getOrCreate()val acessRDD = spark.sparkContext.textFile(inputPath)// acessRDD.take(10).foreach(println)// RDD => DFval accessDF = spark.createDataFrame(acessRDD.map(x => AccessConvertUtil.parseLog(x)), AccessConvertUtil.struct)// accessDF.printSchema()// accessDF.show(false)accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(outputPath)spark.stop()}}

对于TopNStatJobYARN代码修改如下:

package cn.ac.iie.logimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}import scala.collection.mutable.ListBuffer/*** TopN 统计spark作业, 运行在Yarn上*/
object TopNStatJobYARN {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").getOrCreate()if(args.length != 2){println("Usage: TopNStatJobYARN <inputPath> <day>")System.exit(1)}val Array(inputPath, day) = argsval accessDF = spark.read.format("parquet").load(inputPath)// accessDF.printSchema()// accessDF.show(false)//val day = "20190702"StatDao.deleteDay(day)// 最受欢迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF, day)// 按照地市进行统计TopN课程cityTypeAccessTopNStat(spark, accessDF,day)// 按照流量进行统计netTypeTrafficTopNStat(spark, accessDF, day)spark.stop}/*** 按流量进行统计* @param spark* @param accessDF*/def netTypeTrafficTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val trafficsTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(sum("num").as("traffics")).orderBy(desc("traffics"))// .show(false)// 将统计结果写入到Mysql中try {trafficsTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeTrafficsStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval traffics = info.getAs[Long]("traffics")list.append(DayNetTypeTrafficsStat(day, uid, traffics))})StatDao.insertDayNetTypeTrafficsAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 按照地市进行统计Top3课程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函数在Spark SQL的使用val top3DF = cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3")//.show(false)// 将统计结果写入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 最受欢迎的TopN netType** @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val wifiAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(count("uid").as("times")).orderBy(desc("times"))wifiAccessTopNDF.show(false)//    accessDF.createOrReplaceTempView("access_logs")
//    val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc")
//    wifiAccessTopNDF.show(false)// 将统计结果写入到Mysql中try {wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}
}

appName和master,可以在命令行端进行指定。
因为我们的后台环境中,已经有相关的spark环境,没有ipdatabase,mysql等我们环境的依赖库,所以需要把spark依赖去掉。
对应的pom.xml文件内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ac.iie.spark</groupId><artifactId>sql</artifactId><version>1.0-SNAPSHOT</version><inceptionYear>2008</inceptionYear><properties><scala.version>2.11.12</scala.version><spark.version>2.4.5</spark.version></properties><dependencies><!-- Scala --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><scope>provided</scope></dependency><!-- Spark --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><!-- hive --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>LATEST</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.ggstar</groupId><artifactId>ipdatabase</artifactId><version>1.0</version></dependency><!-- jdbc --><dependency><groupId>org.spark-project.hive</groupId><artifactId>hive-jdbc</artifactId><version>1.2.1.spark2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.6</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><!-- If you have classpath issue like NoDefClassError,... --><!-- useManifestOnlyJar>false</useManifestOnlyJar --><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass></mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>

而且需要添加相应的plugin
使用命令mvn assembly:assembly

将jar文件上传到服务器中

将原始数据上传到hdfs中
执行命令:

spark-submit \
--class cn.ac.iie.log.SparkStatCleanJobYARN \
--name SparkStatCleanJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--files /home/iie4bu/lib/ipDatabase.csv,/home/iie4bu/lib/ipRegion.xlsx \
/home/iie4bu/lib/sql-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://manager:9000/imooc/input hdfs://manager:9000/imooc/clean

注意:--files的使用

查看我们导入的文件内容:
使用spark-shell:./spark-shell --master local[2] --jars /home/iie4bu/software/mysql-connector-java-5.1.35.jar

scala> spark.read.format("parquet").load("hdfs://manager:9000/imooc/clean/day=20190730/part-00000-52c8b5fd-8496-4402-a5a8-31159291a5bc.c000.snappy.parquet").show(false)


说明数据清洗成功。
接下来统计作业:

spark-submit --class cn.ac.iie.log.TopNStatJobYARN --name TopNStatJobYARN --master yarn --executor-memory 1G --num-executors 1 /home/iie4bu/lib/sql-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://manager:9000/imooc/clean/ 20190702

可以看到mysql中的数据已经成功添加进去了。

Spark On Yarn 运行项目相关推荐

  1. Spark面试:Spark on yarn 运行流程

    一: Spark on YARN能让Spark计算模型在云梯YARN集群上运行,直接读取云梯上的数据,并充分享受云梯YARN集群丰富的计算资源. 二: 基于YARN的Spark作业首先由客户端生成作业 ...

  2. Spark On Yarn 运行模式(详细)

    Spark 在spark 中,支持4中运行模式: 1)Local:开发时使用 2)Standalone: 是spark 自带的,如果一个集群是standalong 的话,那么就需要在多台机器上同时部署 ...

  3. 我的Bug日常:spark基于yarn运行时抛错,内存不足Required executor memory (1024 MB), offHeap memory (0) MB。问题已解决,亲测有用~~~

    问题描述: 在yarn集群上运行spark时,报了如下错误 主要内容: Exception in thread "main" java.lang.IllegalArgumentEx ...

  4. 本地提交spark_spark快速入门(三)-------spark部署及运行模式

    spark支持多种部署方案,包括spark自带的standalone资源调度模式(StandAlone):运行在hadoop的yarn资源调度框架中(SparkOnYARN):local本地模式:可以 ...

  5. 三, Spark 四种运行环境配置总结

    三, Spark 运行环境 Spark 的运行模式有 Local(也称单节点模式),Standalone(集群模式),Spark on Yarn(运行在Yarn上),Mesos以及K8s, Windo ...

  6. Spark On Yarn VCore Userd 值不正常(DefaultResourceCalculator / DominantResourceCalculator )

    Spark On Yarn  VCore Userd 值不正常,目前集群有两个任务再跑,每个任务使用1cores. 在执行下面的脚本的时候.资源使用如下图: 执行脚本: spark-submit \ ...

  7. spark 查看yarn日志_spark周边项目之Livy

    无Spark Client环境的部署实现 首先,熟悉spark开发的人都知道spark的部署模式分为三种,分别为Local.Standalone.YARN,通过YARN又分为YARN-Client和Y ...

  8. Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...

    目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...

  9. Spark 在YARN上运行

    在YARN上运行Spark 安全 在YARN上启动Spark 添加其他JAR 准备工作 组态 调试您的应用程序 Spark特性 重要笔记 的Kerberos YARN特定的Kerberos配置 Ker ...

最新文章

  1. R异常数据检测及处理方法
  2. [转]linux解压 tar命令
  3. 2021的第一个offer来自mbzuai
  4. SpringBoot_入门-微服务简介
  5. 设计模式的分类和六大设计原则
  6. Linux Server 安装 raid 1
  7. swt matlab 中 swa,Matlab小波工具箱的使用3
  8. JavaScript知识点之“事件机制”
  9. linux standby模式,搭建11g 单机 linux standby 操作文档
  10. python3—列表
  11. pytorch冻结模型
  12. KVM套件-linux基础
  13. [转载]数字全息与计算全息
  14. 用于实时实例分割的Deep Snake算法
  15. java三种功能加强模式
  16. Java常见问题之Data too long for column 'orResponse' at row 1
  17. 『为金融数据打标签』「2. 元标签方法」
  18. 数据库数据修改报错The instance of entity type ‘XXX‘ cannot be tracked
  19. AI 图像识别的测试
  20. 汽车学堂 自 动 驾 驶 决 策 与 控 制 算 法——现 代 控 制 理 论学习(一)

热门文章

  1. 验证视图状态 MAC 失败的解决办法
  2. SQL语句的基本语法一
  3. 实体类(VO,DO,DTO)的划分
  4. Django2.2-LookupError No installed app with label admin
  5. 从PHP5到PHP7自我封装MongoDB以及平滑升级
  6. php轻量级的性能分析工具xhprof的安装使用
  7. c语言的查询功能,求C语言实现查询功能(如果选择3,如何实现查询)
  8. PHP学级与年级的转换函数_PHP addslashes()和stripslashes():字符串转义与还原
  9. 如何看创建媒体日期_每天约4万个网约车投诉,看AI如何接招_媒体_澎湃新闻
  10. foreach ($cc as $key = $item);$item加与不加的区别