Spark2.1.1

一 Spark Submit本地解析

1.1 现象

提交命令:

spark-submit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

进程:

hadoop 225653 0.0 0.0 11256 364 ? S Aug24 0:00 bash /$spark-dir/bin/spark-class org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

hadoop 225654 0.0 0.0 34424 2860 ? Sl Aug24 0:00 /$jdk_dir/bin/java -Xmx128m -cp /spark-dir/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

1.2 执行过程

1.2.1 脚本执行

-bash-4.1$ cat bin/spark-submit
#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

注释:这里执行了另一个脚本spark-class,具体如下:

-bash-4.1$ cat bin/spark-class

...

build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}

CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")

...

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

注释:这里执行java class: org.apache.spark.launcher.Main,并传入参数,具体如下:

1.2.2 代码执行

org.apache.spark.launcher.Main
...builder = new SparkSubmitCommandBuilder(help);...List<String> cmd = builder.buildCommand(env);...List<String> bashCmd = prepareBashCommand(cmd, env);for (String c : bashCmd) {System.out.print(c);System.out.print('\0');}...

注释:其中会调用SparkSubmitCommandBuilder来生成Spark Submit命令,具体如下:

org.apache.spark.launcher.SparkSubmitCommandBuilder
...private List<String> buildSparkSubmitCommand(Map<String, String> env)
...addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
...String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
...if (isClientMode) {
...addOptionString(cmd, driverExtraJavaOptions);
...}
...addPermGenSizeOpt(cmd);cmd.add("org.apache.spark.deploy.SparkSubmit");cmd.addAll(buildSparkSubmitArgs());return cmd;...

注释:这里创建了本地命令,其中java class:org.apache.spark.deploy.SparkSubmit,同时会把各种JavaOptions放到启动命令里(比如SPARK_JAVA_OPTS,DRIVER_EXTRA_JAVA_OPTIONS等),具体如下:

org.apache.spark.deploy.SparkSubmit
  def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args) //parse command line parameterif (appArgs.verbose) {// scalastyle:off println
printStream.println(appArgs)// scalastyle:on println
}appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}private def submit(args: SparkSubmitArguments): Unit = {val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //merge all parameters from: command line, properties file, system property, etc...
def doRunMain(): Unit = {...runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)...}...private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments): (Seq[String], Seq[String], Map[String, String], String) = {if (deployMode == CLIENT || isYarnCluster) {childMainClass = args.mainClass...if (isYarnCluster) {childMainClass = "org.apache.spark.deploy.yarn.Client"...private def runMain(childArgs: Seq[String],childClasspath: Seq[String],sysProps: Map[String, String],childMainClass: String,verbose: Boolean): Unit = {// scalastyle:off printlnif (verbose) {printStream.println(s"Main class:\n$childMainClass")printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")printStream.println(s"System properties:\n${sysProps.mkString("\n")}")printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")printStream.println("\n")}// scalastyle:on println
val loader =if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {new ChildFirstURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)} else {new MutableURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)}Thread.currentThread.setContextClassLoader(loader)for (jar <- childClasspath) {addJarToClasspath(jar, loader)}for ((key, value) <- sysProps) {System.setProperty(key, value)}var mainClass: Class[_] = nulltry {mainClass = Utils.classForName(childMainClass)} catch {...val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)...mainMethod.invoke(null, childArgs.toArray)...

注释:这里首先会解析命令行参数,比如mainClass,准备运行环境包括System Property以及classpath等,然后使用一个新的classloader:ChildFirstURLClassLoader来加载用户的mainClass,然后反射调用mainClass的main方法,这样用户的app.package.AppClass的main方法就开始执行了。

org.apache.spark.SparkConf
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {import SparkConf._/** Create a SparkConf that loads defaults from system properties and the classpath */def this() = this(true)...if (loadDefaults) {loadFromSystemProperties(false)}private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {// Load any spark.* system propertiesfor ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {set(key, value, silent)}this}

注释:这里可以看到spark是怎样加载配置的

1.2.3 --verbose

spark-submit --master local[*] --class app.package.AppClass --jars /$other-dir/other.jar  --driver-memory 1g --verbose app-1.0.jar

输出示例:

Main class:
app.package.AppClass
Arguments:

System properties:
spark.executor.logs.rolling.maxSize -> 1073741824
spark.driver.memory -> 1g
spark.driver.extraLibraryPath -> /$hadoop-dir/lib/native
spark.eventLog.enabled -> true
spark.eventLog.compress -> true
spark.executor.logs.rolling.time.interval -> daily
SPARK_SUBMIT -> true
spark.app.name -> app.package.AppClass
spark.driver.extraJavaOptions -> -XX:+PrintGCDetails -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:-UseCompressedClassPointers -XX:CompressedClassSpaceSize=3G -XX:+PrintGCTimeStamps -Xloggc:/export/Logs/hadoop/g1gc.log
spark.jars -> file:/$other-dir/other.jar
spark.sql.adaptive.enabled -> true
spark.submit.deployMode -> client
spark.executor.logs.rolling.maxRetainedFiles -> 10
spark.executor.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
spark.eventLog.dir -> hdfs://myhdfs/spark/history
spark.master -> local[*]
spark.sql.crossJoin.enabled -> true
spark.driver.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
Classpath elements:
file:/$other-dir/other.jar
file:/app-1.0.jar

启动时添加--verbose参数后,可以输出所有的运行时信息,有助于判断问题。

转载于:https://www.cnblogs.com/barneywill/p/9820684.html

【原创】大数据基础之Spark(1)Spark Submit即Spark任务提交过程相关推荐

  1. 如何用形象的比喻大数据的技术生态Hadoop、Hive、Spark 之间是什么关系?

    最近我的同学给我发了一篇特别有意思的关于大数据的技术生态Hadoop.Hive.Spark 关系的解读文章.个人觉得非常有意思,通俗易懂,我转载到这里,希望大家一起学习. Luis 大数据 255 人 ...

  2. 好程序员大数据教程:SparkShell和IDEA中编写Spark程序

    好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spa ...

  3. 赵强老师:大数据从入门到精通(20)Spark RDD-赵强老师-专题视频课程

    赵强老师:大数据从入门到精通(20)Spark RDD-116人已学习 课程收益     本系列课程将基于RedHat Linux 7.4版本.Hadoop 2.7.3.Spark 2 版本全面介绍大 ...

  4. 《大数据原理与应用》林子雨:一. 大数据基础

    <大数据原理与应用>林子雨:一. 大数据基础 思维导图 PS:边学边记 重点: 理解大数据的概念及其处理架构Hadoop 难点: 掌握大数据处理架构Hadoop的安装与使用方法 第1章 大 ...

  5. 大数据技术原理与应用 第一篇 大数据基础

    目录 第一章 大数据概述 一. 大数据时代 1.1 三次信息化浪潮 1.2 信息科技发展 1.3 数据产生方式的变革 1.4 大数据的影响 二. 大数据的概念 2.1 大数据的特征 2.2 大数据关键 ...

  6. 大数据基础入门 ------文章来源于:某个入门课程

    文章目录 第一课:大数据基础入门 什么是大数据? java和大数据的关系 学习大数据需要的基础和路线 第二课:Hadoop的背景起源 一 分布式存储 如何解决大数据的存储?(HDFS) 第三课: ha ...

  7. 大数据_03【大数据基础知识】

    大数据_03 [大数据基础知识] 01 大数据概述 02 什么是大数据?(Big Data) 03 传统数据与大数据的对比 04 大数据的特点 4.1 传统数据与大数据处理服务器系统安装对比 4.2 ...

  8. 大数据基础篇~JavaSE第一章

    大数据基础篇教程分享01 目前计算机专业现状,java开发工程师虽然容易找工作(这个也是在你在大学期间java基础学习的比较好,然后会一些框架,才能找到工作)但是java开发常常熬夜,楼主就是卷不动j ...

  9. 大数据基础——Hadoop大数据平台搭建

    文章目录 前言 Hadoop大数据平台搭建 一.Hadoop原理和功能介绍 二.Hadoop安装部署 三.Hadoop常用操作 总结 前言 分布式机器学习为什么需求大数据呢?随着海量用户数据的积累,单 ...

  10. 大数据基础之Hadoop(三)—— MapReduce

    作者:duktig 博客:https://duktig.cn (文章首发) 优秀还努力.愿你付出甘之如饴,所得归于欢喜. 本篇文章源码参看:https://github.com/duktig666/b ...

最新文章

  1. word 语音识别的数据丢失
  2. 《系统集成项目管理工程师》必背100个知识点-57沟通管理
  3. tr69 GatewayInfo 节点添加
  4. 我想和你一起去这样一个地方
  5. 第一篇:Dapper快速学习
  6. nyoj239 月老的难题 二分图 匈牙利算法
  7. php用wordanalysis抓取姓名_利用vba查询/抓取 外部数据
  8. 事件处理-注册时间 // 事件处理-修饰符 // 事件处理-键盘事件的修饰符 // 事件处理-系统修饰符 // 事件处理-鼠标修饰符
  9. 左侧栏下拉框HTML代码,html5下拉菜单代码
  10. java编写两邮件传输,JAVA邮件发送(文字+图片+附件)【源码】
  11. C++ 转 Python 这三年,我都经历了什么?
  12. matlab2014调用vs2015进行混合编译生成mex文件
  13. jzoj2941. 贿赂
  14. MySQL-第十二篇管理结果集
  15. 如何将文件地址转为url_如何快速替换WordPress站点新旧URL地址?
  16. 通达OA - 数据备份与恢复指南
  17. 基于matlab的中值滤波算法浅析
  18. 蓝桥杯基础练习python
  19. 基于多视图几何的三维重建
  20. 详解EBS接口开发之销售订单挑库发放

热门文章

  1. AcWing 900. 整数划分(完全背包计数问题)
  2. AcWing 848. 有向图的拓扑序列(拓扑排序模板)
  3. 使用 anacoda 安装scrapy
  4. 统计自然语言处理基础_聚类
  5. ES6 iterator 迭代器
  6. oracle之 监听器无法启动的几个原因总结
  7. 如何修改一个类的私有成员?
  8. switch分解试验部分-LAB8:SVI实验
  9. Android开发者网址导航
  10. 让你更好的使用jQuery插件