一 .前言

而LinuxContainerExecutor则以应用程序拥有者的身份启动和停止Container, 因此更加安全, 此外, LinuxContainerExecutor允许用户通过Cgroups对CPU资源进行隔离 .

与DefaultContainerExecutor的逻辑类似, 但不同之处是它以应用程序所属用户身份运行该脚本, 这是通过调用一个采用C语言实现的setuid可执行程序container-executor完成的。 LinuxContainerExecutor 是Hadoop引入安全机制后加入的,后面将要提到的CPU资源隔离机制也是在该ContainerExecutor中实现的。

流程执行是以独立于平台的方式通过 . 其实主要是执行launch_container.sh 脚本.
主要注意的是两个地方:

  1. 其实就是构造并执行launch_container.sh 脚本. 在脚本里有需要执行的任务.
  2. launch_container.sh 脚本用exprot输出了很多环境变量,
    但是因为执行的时候使用 setsid 指令. 单独打开了一个新会话. 所以里面的环境变量不会影响到外部程序.
  • default_container_executor_session.sh
#!/bin/bashecho $$ > /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.tmp
/bin/mv -f /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.tmp /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid
exec setsid /bin/bash "/opt/hadoop/yarndata/usercache/henghe/appcache/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/launch_container.sh"
  • default_container_executor.sh
#!/bin/bash
/bin/bash "/opt/hadoop/yarndata/usercache/henghe/appcache/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/default_container_executor_session.sh"
rc=$?
echo $rc > "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode.tmp"
/bin/mv -f "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode.tmp" "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode"
exit $rc

这里用到了setsid: 新会话中运行程序 [launch_container.sh中exprot输出的环境变量,只会影响到本次会话.]
setsid主要是重新创建一个session,子进程从父进程继承了SessionID、进程组ID和打开的终端,子进程如果要脱离父进程,不受父进程控制,我们可以用这个setsid命令 .


export 功能说明:设置或显示环境变量。
语  法:export [-fnp][变量名称]=[变量设置值]
补充说明:在shell中执行程序时,shell会提供一组环境变量。export可新增,修改或删除环境变量,供后续执行的程序使用。export的效力仅限于该次登陆操作。
参  数:
-f  代表[变量名称]中为函数名称。
-n  删除指定的变量。变量实际上并未删除,只是不会输出到后续指令的执行环境中。
-p  列出所有的shell赋予程序的环境变量。

二 .Container执行顺序

ContainerLaunch#call负责初始化&启动Container.
整体的步骤&逻辑关系如下:

三 .prepareContainer

准备Container的执行环境

3.1. 执行环境入口 ContainerLaunch#prepareContainer .

  private void prepareContainer(Map<Path, List<String>> localResources,List<String> containerLocalDirs) throws IOException {exec.prepareContainer(new ContainerPrepareContext.Builder().setContainer(container).setLocalizedResources(localResources).setUser(container.getUser()).setContainerLocalDirs(containerLocalDirs).setCommands(container.getLaunchContext().getCommands()).build());}

3.2. 接口定义

  /*** Prepare the container prior to the launch environment being written.* @param ctx Encapsulates information necessary for launching containers.* @throws IOException if errors occur during container preparation*/public void prepareContainer(ContainerPrepareContext ctx) throwsIOException{}

3.3. ContainerPrepareContext 信息

在启动脚本/参数写入之前准备container环境。 通过ContainerPrepareContext.Builder 构建.

// 容器信息private final Container container;// 资源文件路径private final Map<Path, List<String>> localizedResources;// 用户private final String user;// 容器工作目录private final List<String> containerLocalDirs;// 执行命令private final List<String> commands;

3.4. prepareContainer 实现

@Overridepublic void prepareContainer(ContainerPrepareContext ctx) throws IOException {// 构建 ContainerRuntimeContextContainerRuntimeContext.Builder builder =new ContainerRuntimeContext.Builder(ctx.getContainer());// 设置资源文件/用户/路径/执行命令/ContainerId 等信息builder.setExecutionAttribute(LOCALIZED_RESOURCES,ctx.getLocalizedResources()).setExecutionAttribute(USER, ctx.getUser()).setExecutionAttribute(CONTAINER_LOCAL_DIRS,ctx.getContainerLocalDirs()).setExecutionAttribute(CONTAINER_RUN_CMDS, ctx.getCommands()).setExecutionAttribute(CONTAINER_ID_STR,ctx.getContainer().getContainerId().toString());try {linuxContainerRuntime.prepareContainer(builder.build());} catch (ContainerExecutionException e) {throw new IOException("Unable to prepare container: ", e);}}

四 .writeLaunchEnv [ContainerExecutor 实现]

向container启动脚本写入环境信息.

这个方法是由抽象类ContainerExecutor#writeLaunchEnv 负责实现的.

此方法将容器的启动环境写出到指定的路径。

  • 操作步骤
    1.输出脚本的头信息
    2.设置快速失败并检查退出状态(exit codes).
    3.输出运行日志
    4.输出异常日志
    5.设置环境变量
    6.设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
    7.设置debug 信息
    8.确定目录内容
    9.输出启动脚本

4.1. 代码实现

/*** 此方法将容器的启动环境写出到指定的路径。* This method writes out the launch environment of a container to a specified path.** @param out the output stream to which the environment is written (usually* a script file which will be executed by the Launcher)* @param environment the environment variables and their values* @param resources the resources which have been localized for this* container. Symlinks will be created to these localized resources* @param command the command that will be run* @param logDir the log dir to which to copy debugging information* @param user the username of the job owner* @param outFilename the path to which to write the launch environment* @param nmVars the set of environment vars that are explicitly set by NM* @throws IOException if any errors happened writing to the OutputStream,* while creating symlinks*/@VisibleForTestingpublic void writeLaunchEnv(OutputStream out, Map<String, String> environment,Map<Path, List<String>> resources, List<String> command, Path logDir,String user, String outFilename, LinkedHashSet<String> nmVars)throws IOException {ContainerLaunch.ShellScriptBuilder sb =  ContainerLaunch.ShellScriptBuilder.create();
//    # 输出脚本的头信息
//    #!/bin/bash// Add "set -o pipefail -e" to validate launch_container script.sb.setExitOnFailure();
//    #快速失败并检查退出状态(exit codes).
//    set -o pipefail -e//Redirect stdout and stderr for launch_container scriptsb.stdout(logDir, CONTAINER_PRE_LAUNCH_STDOUT);
//    #输出运行日志
//    export PRELAUNCH_OUT="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.out"
//    exec >"${PRELAUNCH_OUT}"sb.stderr(logDir, CONTAINER_PRE_LAUNCH_STDERR);
//    #输出异常日志
//    export PRELAUNCH_ERR="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.err"
//    exec 2>"${PRELAUNCH_ERR}"if (environment != null) {sb.echo("Setting up env variables");
//      # 设置环境变量
//      echo "Setting up env variables"// 白名单环境变量被特别处理。// 仅当环境中尚未定义它们时才添加它们。// 使用特殊的语法添加它们,以防止它们掩盖可能在容器映像(例如docker映像)中显式设置的变量。// 将这些放在其他之前,以确保使用正确的使用。// Whitelist environment variables are treated specially.// Only add them if they are not already defined in the environment.// Add them using special syntax to prevent them from eclipsing variables that may be set explicitly in the container image (e.g, in a docker image).// Put these before the others to ensure the correct expansion is used.sb.echo("Setting up env variables#whitelistVars");for(String var : whitelistVars) {if (!environment.containsKey(var)) {String val = getNMEnvVar(var);if (val != null) {sb.whitelistedEnv(var, val);}}}
//      # 设置环境变量#白名单变量
//      echo "Setting up env variables#whitelistVars"
//      export JAVA_HOME=${JAVA_HOME:-"/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home"}
//      export HADOOP_COMMON_HOME=${HADOOP_COMMON_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
//      export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/opt/tools/hadoop-3.2.1/etc/hadoop"}
//      export HADOOP_HOME=${HADOOP_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
//      export PATH=${PATH:-"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/tools/apache-maven-3.6.3/bin:/opt/tools/scala-2.12.10/bin:/usr/local/mysql-5.7.28-macos10.14-x86_64/bin:/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home/bin:/opt/tools/hadoop-3.2.1/bin:/opt/tools/hadoop-3.2.1/etc/hadoop:henghe:/opt/tools/ozone-1.0.0/bin:/opt/tools/spark-2.4.5/bin:/opt/tools/spark-2.4.5/conf:/opt/tools/redis-5.0.7/src:/opt/tools/datax/bin:/opt/tools/apache-ant-1.9.6/bin:/opt/tools/hbase-2.0.2/bin"}sb.echo("Setting up env variables#env");// 现在编写由nodemanager显式设置的变量,保留它们的写入顺序。// Now write vars that were set explicitly by nodemanager, preserving the order they were written in.for (String nmEnvVar : nmVars) {sb.env(nmEnvVar, environment.get(nmEnvVar));}
//      # 设置环境变量#环境变量
//      echo "Setting up env variables#env"
//      export HADOOP_TOKEN_FILE_LOCATION="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001/container_tokens"
//      export CONTAINER_ID="container_1611681788558_0001_01_000001"
//      export NM_PORT="62016"
//      export NM_HOST="boyi-pro.lan"
//      export NM_HTTP_PORT="8042"
//      export LOCAL_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001"
//      export LOCAL_USER_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/"
//      export LOG_DIRS="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001"
//      export USER="henghe"
//      export LOGNAME="henghe"
//      export HOME="/home/"
//      export PWD="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001"
//      export JVM_PID="$$"
//      export MALLOC_ARENA_MAX="4"sb.echo("Setting up env variables#remaining");// 现在写入剩余的环境变量// Now write the remaining environment variables.for (Map.Entry<String, String> env :  sb.orderEnvByDependencies(environment).entrySet()) {if (!nmVars.contains(env.getKey())) {sb.env(env.getKey(), env.getValue());}}
//      # 设置环境变量#剩余环境变量
//      echo "Setting up env variables#remaining"
//      export SPARK_YARN_STAGING_DIR="hdfs://localhost:8020/user/henghe/.sparkStaging/application_1611681788558_0001"
//      export APPLICATION_WEB_PROXY_BASE="/proxy/application_1611681788558_0001"
//      export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__"
//      export APP_SUBMIT_TIME_ENV="1611681915166"
//      export SPARK_USER="henghe"
//      export PYTHONHASHSEED="0"}if (resources != null) {sb.echo("Setting up job resources");Map<Path, Path> symLinks = resolveSymLinks(resources, user);for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {// 链接环境变量sb.symlink(symLink.getKey(), symLink.getValue());}
//      # 设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
//      echo "Setting up job resources"
//      mkdir -p __spark_libs__
//      ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/35/spark-examples_2.11-2.4.5.jar" "__app__.jar"
//      mkdir -p __spark_libs__
//      ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/180/__spark_conf__.zip" "__spark_conf__"
//      # 此处省略N多
//      # mkdir -p __spark_libs__
//      # ln -sf --"xxxxx"  "__spark_libs__/xxxxx.jar"}// dump 调试信息(如果已配置)// dump debugging information if configuredif (getConf() != null && getConf().getBoolean(YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO,  YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) {//    # 设置debug 信息sb.echo("Copying debugging information");sb.copyDebugInformation(new Path(outFilename),  new Path(logDir, outFilename));sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS));//      # 设置debug 信息
//      echo "Copying debugging information"//      # Creating copy of launch script
//      cp "launch_container.sh" "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"
//      chmod 640 "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"//      # 确定目录内容
//      # Determining directory contents
//      echo "ls -l:" 1>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
//      ls -l 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
//      echo "find -L . -maxdepth 5 -ls:" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
//      find -L . -maxdepth 5 -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
//      echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
//      find -L . -maxdepth 5 -type l -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"}sb.echo("Launching container");
//    echo "Launching container"// 启动containersb.command(command);
//    #输出启动脚本
//    exec /bin/bash -c "
//      $JAVA_HOME/bin/java
//      -server
//      -Xmx1024m
//      -Djava.io.tmpdir=$PWD/tmp
//      -Dspark.yarn.app.container.log.dir=/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001
//      org.apache.spark.deploy.yarn.ApplicationMaster
//        --class 'org.apache.spark.examples.SparkPi'
//        --jar file:/opt/tools/spark-2.4.5/examples/jars/spark-examples_2.11-2.4.5.jar
//        --arg '10'
//        --properties-file $PWD/__spark_conf__/__spark_conf__.properties
//      1> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stdout
//      2> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stderr"//最终输入内容LOG.warn("ContainerExecutor#writeLaunchEnv : " + sb.toString());PrintStream pout = null;try {pout = new PrintStream(out, false, "UTF-8");sb.write(pout);} finally {if (out != null) {out.close();}}}

4.2. 输出启动脚本

#输出脚本的头信息
#!/bin/bash#快速失败并检查退出状态(exit codes).
set -o pipefail -e#输出运行日志
export PRELAUNCH_OUT="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.out"
exec >"${PRELAUNCH_OUT}"
#输出异常日志
export PRELAUNCH_ERR="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.err"
exec 2>"${PRELAUNCH_ERR}"# 设置环境变量
echo "Setting up env variables"
# 设置环境变量#白名单变量
echo "Setting up env variables#whitelistVars"
export JAVA_HOME=${JAVA_HOME:-"/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home"}
export HADOOP_COMMON_HOME=${HADOOP_COMMON_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/opt/tools/hadoop-3.2.1/etc/hadoop"}
export HADOOP_HOME=${HADOOP_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
export PATH=${PATH:-"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/tools/apache-maven-3.6.3/bin:/opt/tools/scala-2.12.10/bin:/usr/local/mysql-5.7.28-macos10.14-x86_64/bin:/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home/bin:/opt/tools/hadoop-3.2.1/bin:/opt/tools/hadoop-3.2.1/etc/hadoop:henghe:/opt/tools/ozone-1.0.0/bin:/opt/tools/spark-2.4.5/bin:/opt/tools/spark-2.4.5/conf:/opt/tools/redis-5.0.7/src:/opt/tools/datax/bin:/opt/tools/apache-ant-1.9.6/bin:/opt/tools/hbase-2.0.2/bin"}# 设置环境变量#环境变量
echo "Setting up env variables#env"
export HADOOP_TOKEN_FILE_LOCATION="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001/container_tokens"
export CONTAINER_ID="container_1611681788558_0001_01_000001"
export NM_PORT="62016"
export NM_HOST="boyi-pro.lan"
export NM_HTTP_PORT="8042"
export LOCAL_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001"
export LOCAL_USER_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/"
export LOG_DIRS="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001"
export USER="henghe"
export LOGNAME="henghe"
export HOME="/home/"
export PWD="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001"
export JVM_PID="$$"
export MALLOC_ARENA_MAX="4"# 设置环境变量#剩余环境变量
echo "Setting up env variables#remaining"
export SPARK_YARN_STAGING_DIR="hdfs://localhost:8020/user/henghe/.sparkStaging/application_1611681788558_0001"
export APPLICATION_WEB_PROXY_BASE="/proxy/application_1611681788558_0001"
export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__"
export APP_SUBMIT_TIME_ENV="1611681915166"
export SPARK_USER="henghe"
export PYTHONHASHSEED="0"# 设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
echo "Setting up job resources"
mkdir -p __spark_libs__
ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/35/spark-examples_2.11-2.4.5.jar" "__app__.jar"
mkdir -p __spark_libs__
ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/180/__spark_conf__.zip" "__spark_conf__"
# 此处省略N多
# mkdir -p __spark_libs__
# ln -sf --"xxxxx"  "__spark_libs__/xxxxx.jar"# 设置debug 信息
echo "Copying debugging information"
# Creating copy of launch script
cp "launch_container.sh" "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"
chmod 640 "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"# 确定目录内容
# Determining directory contents
echo "ls -l:" 1>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
ls -l 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
echo "find -L . -maxdepth 5 -ls:" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
find -L . -maxdepth 5 -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
find -L . -maxdepth 5 -type l -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
#输出启动脚本
echo "Launching container"
exec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx1024m -Djava.io.tmpdir=$PWD/tmp -Dspark.yarn.app.container.log.dir=/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.examples.SparkPi' --jar file:/opt/tools/spark-2.4.5/examples/jars/spark-examples_2.11-2.4.5.jar --arg '10' --properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stdout 2> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stderr"

五 .activateContainer

这个方法很简单,就是将container的信息加入缓存.

  • 缓存属性
  // ContainerId 对应的 pid 存储文件路径private final ConcurrentMap<ContainerId, Path> pidFiles =  new ConcurrentHashMap<>();
  • 实现方法
/*** Mark the container as active.** @param containerId the container ID* @param pidFilePath the path where the executor should write the PID* of the launched process*/public void activateContainer(ContainerId containerId, Path pidFilePath) {try {writeLock.lock();this.pidFiles.put(containerId, pidFilePath);} finally {writeLock.unlock();}}

六 .deactivateContainer

跟 activateContainer 相反, 从缓存中移除container 信息

/*** Mark the container as inactive. For inactive containers this method has no effect.** @param containerId the container ID*/public void deactivateContainer(ContainerId containerId) {try {writeLock.lock();this.pidFiles.remove(containerId);} finally {writeLock.unlock();}}

七 .launchContainer

ContainerLaunch#call

//    启动Containerret = launchContainer(new ContainerStartContext.Builder().setContainer(container).setLocalizedResources(localResources).setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath).setNmPrivateTokensPath(nmPrivateTokensPath).setUser(user).setAppId(appIdStr).setContainerWorkDir(containerWorkDir).setLocalDirs(localDirs).setLogDirs(logDirs).setFilecacheDirs(filecacheDirs).setUserLocalDirs(userLocalDirs).setContainerLocalDirs(containerLocalDirs).setContainerLogDirs(containerLogDirs).setUserFilecacheDirs(userFilecacheDirs).setApplicationLocalDirs(applicationLocalDirs).build());

7.1. 抽象类定义

/*** 在节点上启动container , 在container退出之前处于阻塞状态.* Launch the container on the node. This is a blocking call and returns only* when the container exits.* @param ctx Encapsulates information necessary for launching containers.* @return the return status of the launch* @throws IOException if the container launch fails* @throws ConfigurationException if config error was found*/public abstract int launchContainer(ContainerStartContext ctx) throwsIOException, ConfigurationException;

7.2.ContainerStartContext

启动的时候,需要一个入参ContainerStartContext . 主要是构建Container 所需的路径函数.

// 容器container  container_1612228249380_0002_01_000001private final Container container;// 本地资源文件  数据结构:  Map<Path, List<String>>// 以一条数据举例 :  {Path@9415} "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jar" -> {ArrayList@9416}  size = 1// key   :  /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jar// value :  ArrayList :  0 ->  /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jarprivate final Map<Path, List<String>> localizedResources;// 启动脚本位置nmPrivateContainerScriptPath// /opt/tools/hadoop-3.2.1/local-dirs/nmPrivate/application_1612228249380_0002/container_1612228249380_0002_01_000001/launch_container.shprivate final Path nmPrivateContainerScriptPath;// token文件位置// /opt/tools/hadoop-3.2.1/local-dirs/nmPrivate/application_1612228249380_0002/container_1612228249380_0002_01_000001/container_1612228249380_0002_01_000001.tokensprivate final Path nmPrivateTokensPath;// 用户  : hengheprivate final String user;// application ID : application_1612228249380_0002private final String appId;// 当前容器工作目录  : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002/container_1612228249380_0002_01_000001private final Path containerWorkDir;// 本地工作目录[是一个集合存在多个]   :  /opt/tools/hadoop-3.2.1/local-dirsprivate final List<String> localDirs;// 日志目录[是一个集合存在多个]   : /opt/tools/hadoop-3.2.1/logs/userlogsprivate final List<String> logDirs;// 公共缓存目录private final List<String> filecacheDirs;// 用户本地目录[是一个集合存在多个]   :  /opt/tools/hadoop-3.2.1/local-dirs/filecacheprivate final List<String> userLocalDirs;// 容器本地目录[是一个集合存在多个]   : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002/private final List<String> containerLocalDirs;// 容器日志目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/logs/userlogs/application_1612228249380_0002/container_1612228249380_0002_01_000001private final List<String> containerLogDirs;// 用户文件缓存目录 : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecacheprivate final List<String> userFilecacheDirs;// application本地目录 : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002private final List<String> applicationLocalDirs;

7.3. 执行图

处理资源文件引用 -> 处理pidFilePath -> 执行 linuxContainerRuntime.launchContainer

7.4. 代码

  • launchContainer
private int handleLaunchForLaunchType(ContainerStartContext ctx,ApplicationConstants.ContainerLaunchType type) throws IOException,ConfigurationException {//获取容器Container container = ctx.getContainer();// 获取用户String user = ctx.getUser();// 验证用户名权限, 是否可以执行任务.verifyUsernamePattern(user);// 获取ContainerIdContainerId containerId = container.getContainerId();// 处理资源引用resourcesHandler.preExecute(containerId,  container.getResource());String resourcesOptions = resourcesHandler.getResourcesOption(containerId);String tcCommandFile = null;List<String> numaArgs = null;try {// 处理资源文件if (resourceHandlerChain != null) {List<PrivilegedOperation> ops = resourceHandlerChain.preStart(container);if (ops != null) {List<PrivilegedOperation> resourceOps = new ArrayList<>();resourceOps.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,resourcesOptions));for (PrivilegedOperation op : ops) {switch (op.getOperationType()) {case ADD_PID_TO_CGROUP:resourceOps.add(op);break;case TC_MODIFY_STATE:tcCommandFile = op.getArguments().get(0);break;case ADD_NUMA_PARAMS:numaArgs = op.getArguments();break;default:LOG.warn("PrivilegedOperation type unsupported in launch: "+ op.getOperationType());}}if (resourceOps.size() > 1) {//squash resource operationstry {PrivilegedOperation operation = PrivilegedOperationExecutor.squashCGroupOperations(resourceOps);resourcesOptions = operation.getArguments().get(0);} catch (PrivilegedOperationException e) {LOG.error("Failed to squash cgroup operations!", e);throw new ResourceHandlerException("Failed to squash cgroup operations!");}}}}} catch (ResourceHandlerException e) {LOG.error("ResourceHandlerChain.preStart() failed!", e);throw new IOException("ResourceHandlerChain.preStart() failed!", e);}try {// 处理pidFilePathPath pidFilePath = getPidFilePath(containerId);if (pidFilePath != null) {ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) {linuxContainerRuntime.relaunchContainer(runtimeContext);} else {// 交由具体的执行linuxContainerRuntime.launchContainer(runtimeContext);}} else {LOG.info("Container was marked as inactive. Returning terminated error");return ContainerExecutor.ExitCode.TERMINATED.getExitCode();}} catch (ContainerExecutionException e) {return handleExitCode(e, container, containerId);} finally {resourcesHandler.postExecute(containerId);postComplete(containerId);}return 0;}

八. cleanupBeforeRelaunch

清理Container,删除各种引用资源 . …

/*** Perform any cleanup before the next launch of the container.* @param container         container*/public void cleanupBeforeRelaunch(Container container)throws IOException, InterruptedException {if (container.getLocalizedResources() != null) {// 获取资源软链Map<Path, Path> symLinks = resolveSymLinks( container.getLocalizedResources(), container.getUser());for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {LOG.debug("{} deleting {}", container.getContainerId(),  symLink.getValue());// 删除资源deleteAsUser(new DeletionAsUserContext.Builder().setUser(container.getUser()).setSubDir(symLink.getValue()).build());}}}

Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]相关推荐

  1. YARN源码分析(一)-----ApplicationMaster

    前言 在之前两周主要学了HDFS中的一些模块知识,其中的许多都或多或少有我们借鉴学习的地方,现在将目光转向另外一个块,被誉为MRv2,就是yarn,在Yarn中,解决了MR中JobTracker单点的 ...

  2. YARN源码分析—AM-RM通信协议,获得资源

    在上几篇博文中分析了YARN调度模拟器SLS的源码,重点分析了AM与RM之间的通信协议. 接下来分析在YARN项目中,AM-RM通信如何实现的. 注意点:在YARN中,真正已经实现的只有RM和NM,而 ...

  3. Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析

    一. 前言 Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagementProtocol#startContainer向NodeManage ...

  4. Hadoop 3.2.1 【 YARN 】源码分析 : DefaultContainerExecutor 浅析

    一 .前言 DefaultContainerExecuter 类提供通用的container 执行服务. 负责启动Container . 是默认实现, 未提供任何权安全措施, 它以NodeManage ...

  5. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  6. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  7. spark读取文件源码分析-1

    文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...

  8. spark 调度模块详解及源码分析

    spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...

  9. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

最新文章

  1. Java飞行记录器 JRockit Flight Recorder JFR诊断JVM的历史性能和操作
  2. UI培训分享:如何成为一名优秀的UI设计师
  3. jittor和pytorch生成网络对比之began
  4. eclipse下的spring环境配置
  5. 在C#中利用Keep-Alive处理Socket网络异常断开的方法
  6. C# 系统应用之鼠标模拟技术及自动操作鼠标
  7. 【飞鸽传书3.0】飞鸽传书3.0关键字布局
  8. c++switch语句中不能进行变量定义吗_Go 指南--控制流语句
  9. mysql列的数值型,字符型,日期型
  10. 网站移动端500错误_PC网站和移动端网站有什么差异?
  11. 罗斯蒙特电磁流量计8723说明书_罗斯蒙特电磁流量计8732E型的性能规格
  12. 安装mysql 错误重新安装
  13. 开源及第三方软件管理体系
  14. sokit socket调试工具
  15. 服务器端jQuery – phpQuery简要
  16. 利用AirPlayer空中播放PC服务器视频、音乐、图片
  17. Vue/ElementUI上传文件检验
  18. 武林传奇之七剑下天山java游戏开发商_武林传奇2 之七剑下天山的配方
  19. SL8100 宽电压100V降压12V-24V大功率LED照明降压恒流驱动芯片IC
  20. LATEX 排版问题记录

热门文章

  1. 微信小程序 | 借ChatGPT之手重构社交聊天小程序
  2. 为什么css放头部、js放尾部
  3. “程序设计与算法训练”课程设计:“BP神经网络的实现”(C++类封装实现)
  4. c++生成DLL文件(visual studio 2019)面向小白萌新
  5. 网络篇 网络设备的基本配置09
  6. Python贪吃蛇游戏详细代码和注释
  7. taro 项目中接入EChart图表库
  8. 使用mac聚焦搜索无法搜索软件的情况
  9. hp服务器系统安装xp糸统,惠普笔记本怎么安装XPghost版系统教程
  10. 数据维度太多,咋办?我们整理了17种表现形式