Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]
一 .前言
而LinuxContainerExecutor则以应用程序拥有者的身份启动和停止Container, 因此更加安全, 此外, LinuxContainerExecutor允许用户通过Cgroups对CPU资源进行隔离 .
与DefaultContainerExecutor的逻辑类似, 但不同之处是它以应用程序所属用户身份运行该脚本, 这是通过调用一个采用C语言实现的setuid可执行程序container-executor完成的。 LinuxContainerExecutor 是Hadoop引入安全机制后加入的,后面将要提到的CPU资源隔离机制也是在该ContainerExecutor中实现的。
流程执行是以独立于平台的方式通过 . 其实主要是执行launch_container.sh 脚本.
主要注意的是两个地方:
- 其实就是构造并执行launch_container.sh 脚本. 在脚本里有需要执行的任务.
- launch_container.sh 脚本用exprot输出了很多环境变量,
但是因为执行的时候使用 setsid 指令. 单独打开了一个新会话. 所以里面的环境变量不会影响到外部程序.
- default_container_executor_session.sh
#!/bin/bashecho $$ > /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.tmp
/bin/mv -f /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.tmp /opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid
exec setsid /bin/bash "/opt/hadoop/yarndata/usercache/henghe/appcache/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/launch_container.sh"
- default_container_executor.sh
#!/bin/bash
/bin/bash "/opt/hadoop/yarndata/usercache/henghe/appcache/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/default_container_executor_session.sh"
rc=$?
echo $rc > "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode.tmp"
/bin/mv -f "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode.tmp" "/opt/hadoop/yarndata/nmPrivate/application_1610438405292_0008/container_e07_1610438405292_0008_01_000001/container_e07_1610438405292_0008_01_000001.pid.exitcode"
exit $rc
这里用到了setsid: 新会话中运行程序 [launch_container.sh中exprot输出的环境变量,只会影响到本次会话.]
setsid主要是重新创建一个session,子进程从父进程继承了SessionID、进程组ID和打开的终端,子进程如果要脱离父进程,不受父进程控制,我们可以用这个setsid命令 .
export 功能说明:设置或显示环境变量。
语 法:export [-fnp][变量名称]=[变量设置值]
补充说明:在shell中执行程序时,shell会提供一组环境变量。export可新增,修改或删除环境变量,供后续执行的程序使用。export的效力仅限于该次登陆操作。
参 数:
-f 代表[变量名称]中为函数名称。
-n 删除指定的变量。变量实际上并未删除,只是不会输出到后续指令的执行环境中。
-p 列出所有的shell赋予程序的环境变量。
二 .Container执行顺序
ContainerLaunch#call负责初始化&启动Container.
整体的步骤&逻辑关系如下:
三 .prepareContainer
准备Container的执行环境
3.1. 执行环境入口 ContainerLaunch#prepareContainer .
private void prepareContainer(Map<Path, List<String>> localResources,List<String> containerLocalDirs) throws IOException {exec.prepareContainer(new ContainerPrepareContext.Builder().setContainer(container).setLocalizedResources(localResources).setUser(container.getUser()).setContainerLocalDirs(containerLocalDirs).setCommands(container.getLaunchContext().getCommands()).build());}
3.2. 接口定义
/*** Prepare the container prior to the launch environment being written.* @param ctx Encapsulates information necessary for launching containers.* @throws IOException if errors occur during container preparation*/public void prepareContainer(ContainerPrepareContext ctx) throwsIOException{}
3.3. ContainerPrepareContext 信息
在启动脚本/参数写入之前准备container环境。 通过ContainerPrepareContext.Builder 构建.
// 容器信息private final Container container;// 资源文件路径private final Map<Path, List<String>> localizedResources;// 用户private final String user;// 容器工作目录private final List<String> containerLocalDirs;// 执行命令private final List<String> commands;
3.4. prepareContainer 实现
@Overridepublic void prepareContainer(ContainerPrepareContext ctx) throws IOException {// 构建 ContainerRuntimeContextContainerRuntimeContext.Builder builder =new ContainerRuntimeContext.Builder(ctx.getContainer());// 设置资源文件/用户/路径/执行命令/ContainerId 等信息builder.setExecutionAttribute(LOCALIZED_RESOURCES,ctx.getLocalizedResources()).setExecutionAttribute(USER, ctx.getUser()).setExecutionAttribute(CONTAINER_LOCAL_DIRS,ctx.getContainerLocalDirs()).setExecutionAttribute(CONTAINER_RUN_CMDS, ctx.getCommands()).setExecutionAttribute(CONTAINER_ID_STR,ctx.getContainer().getContainerId().toString());try {linuxContainerRuntime.prepareContainer(builder.build());} catch (ContainerExecutionException e) {throw new IOException("Unable to prepare container: ", e);}}
四 .writeLaunchEnv [ContainerExecutor 实现]
向container启动脚本写入环境信息.
这个方法是由抽象类ContainerExecutor#writeLaunchEnv 负责实现的.
此方法将容器的启动环境写出到指定的路径。
- 操作步骤
1.输出脚本的头信息
2.设置快速失败并检查退出状态(exit codes).
3.输出运行日志
4.输出异常日志
5.设置环境变量
6.设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
7.设置debug 信息
8.确定目录内容
9.输出启动脚本
4.1. 代码实现
/*** 此方法将容器的启动环境写出到指定的路径。* This method writes out the launch environment of a container to a specified path.** @param out the output stream to which the environment is written (usually* a script file which will be executed by the Launcher)* @param environment the environment variables and their values* @param resources the resources which have been localized for this* container. Symlinks will be created to these localized resources* @param command the command that will be run* @param logDir the log dir to which to copy debugging information* @param user the username of the job owner* @param outFilename the path to which to write the launch environment* @param nmVars the set of environment vars that are explicitly set by NM* @throws IOException if any errors happened writing to the OutputStream,* while creating symlinks*/@VisibleForTestingpublic void writeLaunchEnv(OutputStream out, Map<String, String> environment,Map<Path, List<String>> resources, List<String> command, Path logDir,String user, String outFilename, LinkedHashSet<String> nmVars)throws IOException {ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
// # 输出脚本的头信息
// #!/bin/bash// Add "set -o pipefail -e" to validate launch_container script.sb.setExitOnFailure();
// #快速失败并检查退出状态(exit codes).
// set -o pipefail -e//Redirect stdout and stderr for launch_container scriptsb.stdout(logDir, CONTAINER_PRE_LAUNCH_STDOUT);
// #输出运行日志
// export PRELAUNCH_OUT="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.out"
// exec >"${PRELAUNCH_OUT}"sb.stderr(logDir, CONTAINER_PRE_LAUNCH_STDERR);
// #输出异常日志
// export PRELAUNCH_ERR="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.err"
// exec 2>"${PRELAUNCH_ERR}"if (environment != null) {sb.echo("Setting up env variables");
// # 设置环境变量
// echo "Setting up env variables"// 白名单环境变量被特别处理。// 仅当环境中尚未定义它们时才添加它们。// 使用特殊的语法添加它们,以防止它们掩盖可能在容器映像(例如docker映像)中显式设置的变量。// 将这些放在其他之前,以确保使用正确的使用。// Whitelist environment variables are treated specially.// Only add them if they are not already defined in the environment.// Add them using special syntax to prevent them from eclipsing variables that may be set explicitly in the container image (e.g, in a docker image).// Put these before the others to ensure the correct expansion is used.sb.echo("Setting up env variables#whitelistVars");for(String var : whitelistVars) {if (!environment.containsKey(var)) {String val = getNMEnvVar(var);if (val != null) {sb.whitelistedEnv(var, val);}}}
// # 设置环境变量#白名单变量
// echo "Setting up env variables#whitelistVars"
// export JAVA_HOME=${JAVA_HOME:-"/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home"}
// export HADOOP_COMMON_HOME=${HADOOP_COMMON_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
// export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/opt/tools/hadoop-3.2.1/etc/hadoop"}
// export HADOOP_HOME=${HADOOP_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
// export PATH=${PATH:-"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/tools/apache-maven-3.6.3/bin:/opt/tools/scala-2.12.10/bin:/usr/local/mysql-5.7.28-macos10.14-x86_64/bin:/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home/bin:/opt/tools/hadoop-3.2.1/bin:/opt/tools/hadoop-3.2.1/etc/hadoop:henghe:/opt/tools/ozone-1.0.0/bin:/opt/tools/spark-2.4.5/bin:/opt/tools/spark-2.4.5/conf:/opt/tools/redis-5.0.7/src:/opt/tools/datax/bin:/opt/tools/apache-ant-1.9.6/bin:/opt/tools/hbase-2.0.2/bin"}sb.echo("Setting up env variables#env");// 现在编写由nodemanager显式设置的变量,保留它们的写入顺序。// Now write vars that were set explicitly by nodemanager, preserving the order they were written in.for (String nmEnvVar : nmVars) {sb.env(nmEnvVar, environment.get(nmEnvVar));}
// # 设置环境变量#环境变量
// echo "Setting up env variables#env"
// export HADOOP_TOKEN_FILE_LOCATION="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001/container_tokens"
// export CONTAINER_ID="container_1611681788558_0001_01_000001"
// export NM_PORT="62016"
// export NM_HOST="boyi-pro.lan"
// export NM_HTTP_PORT="8042"
// export LOCAL_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001"
// export LOCAL_USER_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/"
// export LOG_DIRS="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001"
// export USER="henghe"
// export LOGNAME="henghe"
// export HOME="/home/"
// export PWD="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001"
// export JVM_PID="$$"
// export MALLOC_ARENA_MAX="4"sb.echo("Setting up env variables#remaining");// 现在写入剩余的环境变量// Now write the remaining environment variables.for (Map.Entry<String, String> env : sb.orderEnvByDependencies(environment).entrySet()) {if (!nmVars.contains(env.getKey())) {sb.env(env.getKey(), env.getValue());}}
// # 设置环境变量#剩余环境变量
// echo "Setting up env variables#remaining"
// export SPARK_YARN_STAGING_DIR="hdfs://localhost:8020/user/henghe/.sparkStaging/application_1611681788558_0001"
// export APPLICATION_WEB_PROXY_BASE="/proxy/application_1611681788558_0001"
// export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__"
// export APP_SUBMIT_TIME_ENV="1611681915166"
// export SPARK_USER="henghe"
// export PYTHONHASHSEED="0"}if (resources != null) {sb.echo("Setting up job resources");Map<Path, Path> symLinks = resolveSymLinks(resources, user);for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {// 链接环境变量sb.symlink(symLink.getKey(), symLink.getValue());}
// # 设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
// echo "Setting up job resources"
// mkdir -p __spark_libs__
// ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/35/spark-examples_2.11-2.4.5.jar" "__app__.jar"
// mkdir -p __spark_libs__
// ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/180/__spark_conf__.zip" "__spark_conf__"
// # 此处省略N多
// # mkdir -p __spark_libs__
// # ln -sf --"xxxxx" "__spark_libs__/xxxxx.jar"}// dump 调试信息(如果已配置)// dump debugging information if configuredif (getConf() != null && getConf().getBoolean(YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO, YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO)) {// # 设置debug 信息sb.echo("Copying debugging information");sb.copyDebugInformation(new Path(outFilename), new Path(logDir, outFilename));sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS));// # 设置debug 信息
// echo "Copying debugging information"// # Creating copy of launch script
// cp "launch_container.sh" "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"
// chmod 640 "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"// # 确定目录内容
// # Determining directory contents
// echo "ls -l:" 1>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
// ls -l 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
// echo "find -L . -maxdepth 5 -ls:" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
// find -L . -maxdepth 5 -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
// echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
// find -L . -maxdepth 5 -type l -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"}sb.echo("Launching container");
// echo "Launching container"// 启动containersb.command(command);
// #输出启动脚本
// exec /bin/bash -c "
// $JAVA_HOME/bin/java
// -server
// -Xmx1024m
// -Djava.io.tmpdir=$PWD/tmp
// -Dspark.yarn.app.container.log.dir=/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001
// org.apache.spark.deploy.yarn.ApplicationMaster
// --class 'org.apache.spark.examples.SparkPi'
// --jar file:/opt/tools/spark-2.4.5/examples/jars/spark-examples_2.11-2.4.5.jar
// --arg '10'
// --properties-file $PWD/__spark_conf__/__spark_conf__.properties
// 1> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stdout
// 2> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stderr"//最终输入内容LOG.warn("ContainerExecutor#writeLaunchEnv : " + sb.toString());PrintStream pout = null;try {pout = new PrintStream(out, false, "UTF-8");sb.write(pout);} finally {if (out != null) {out.close();}}}
4.2. 输出启动脚本
#输出脚本的头信息
#!/bin/bash#快速失败并检查退出状态(exit codes).
set -o pipefail -e#输出运行日志
export PRELAUNCH_OUT="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.out"
exec >"${PRELAUNCH_OUT}"
#输出异常日志
export PRELAUNCH_ERR="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/prelaunch.err"
exec 2>"${PRELAUNCH_ERR}"# 设置环境变量
echo "Setting up env variables"
# 设置环境变量#白名单变量
echo "Setting up env variables#whitelistVars"
export JAVA_HOME=${JAVA_HOME:-"/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home"}
export HADOOP_COMMON_HOME=${HADOOP_COMMON_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/opt/tools/hadoop-3.2.1/etc/hadoop"}
export HADOOP_HOME=${HADOOP_HOME:-"/opt/workspace/apache/hadoop-3.2.1-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/../../../../hadoop-common-project/hadoop-common/target"}
export PATH=${PATH:-"/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/opt/tools/apache-maven-3.6.3/bin:/opt/tools/scala-2.12.10/bin:/usr/local/mysql-5.7.28-macos10.14-x86_64/bin:/Library/java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home/bin:/opt/tools/hadoop-3.2.1/bin:/opt/tools/hadoop-3.2.1/etc/hadoop:henghe:/opt/tools/ozone-1.0.0/bin:/opt/tools/spark-2.4.5/bin:/opt/tools/spark-2.4.5/conf:/opt/tools/redis-5.0.7/src:/opt/tools/datax/bin:/opt/tools/apache-ant-1.9.6/bin:/opt/tools/hbase-2.0.2/bin"}# 设置环境变量#环境变量
echo "Setting up env variables#env"
export HADOOP_TOKEN_FILE_LOCATION="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001/container_tokens"
export CONTAINER_ID="container_1611681788558_0001_01_000001"
export NM_PORT="62016"
export NM_HOST="boyi-pro.lan"
export NM_HTTP_PORT="8042"
export LOCAL_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001"
export LOCAL_USER_DIRS="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/"
export LOG_DIRS="/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001"
export USER="henghe"
export LOGNAME="henghe"
export HOME="/home/"
export PWD="/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1611681788558_0001/container_1611681788558_0001_01_000001"
export JVM_PID="$$"
export MALLOC_ARENA_MAX="4"# 设置环境变量#剩余环境变量
echo "Setting up env variables#remaining"
export SPARK_YARN_STAGING_DIR="hdfs://localhost:8020/user/henghe/.sparkStaging/application_1611681788558_0001"
export APPLICATION_WEB_PROXY_BASE="/proxy/application_1611681788558_0001"
export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__"
export APP_SUBMIT_TIME_ENV="1611681915166"
export SPARK_USER="henghe"
export PYTHONHASHSEED="0"# 设置资源文件[通过ln -sf 构建所需jar文件/配置文件的软连接. ]
echo "Setting up job resources"
mkdir -p __spark_libs__
ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/35/spark-examples_2.11-2.4.5.jar" "__app__.jar"
mkdir -p __spark_libs__
ln -sf -- "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/180/__spark_conf__.zip" "__spark_conf__"
# 此处省略N多
# mkdir -p __spark_libs__
# ln -sf --"xxxxx" "__spark_libs__/xxxxx.jar"# 设置debug 信息
echo "Copying debugging information"
# Creating copy of launch script
cp "launch_container.sh" "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"
chmod 640 "/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/launch_container.sh"# 确定目录内容
# Determining directory contents
echo "ls -l:" 1>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
ls -l 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
echo "find -L . -maxdepth 5 -ls:" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
find -L . -maxdepth 5 -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
find -L . -maxdepth 5 -type l -ls 1>>"/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/directory.info"
#输出启动脚本
echo "Launching container"
exec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx1024m -Djava.io.tmpdir=$PWD/tmp -Dspark.yarn.app.container.log.dir=/opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.examples.SparkPi' --jar file:/opt/tools/spark-2.4.5/examples/jars/spark-examples_2.11-2.4.5.jar --arg '10' --properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stdout 2> /opt/tools/hadoop-3.2.1/logs/userlogs/application_1611681788558_0001/container_1611681788558_0001_01_000001/stderr"
五 .activateContainer
这个方法很简单,就是将container的信息加入缓存.
- 缓存属性
// ContainerId 对应的 pid 存储文件路径private final ConcurrentMap<ContainerId, Path> pidFiles = new ConcurrentHashMap<>();
- 实现方法
/*** Mark the container as active.** @param containerId the container ID* @param pidFilePath the path where the executor should write the PID* of the launched process*/public void activateContainer(ContainerId containerId, Path pidFilePath) {try {writeLock.lock();this.pidFiles.put(containerId, pidFilePath);} finally {writeLock.unlock();}}
六 .deactivateContainer
跟 activateContainer 相反, 从缓存中移除container 信息
/*** Mark the container as inactive. For inactive containers this method has no effect.** @param containerId the container ID*/public void deactivateContainer(ContainerId containerId) {try {writeLock.lock();this.pidFiles.remove(containerId);} finally {writeLock.unlock();}}
七 .launchContainer
ContainerLaunch#call
// 启动Containerret = launchContainer(new ContainerStartContext.Builder().setContainer(container).setLocalizedResources(localResources).setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath).setNmPrivateTokensPath(nmPrivateTokensPath).setUser(user).setAppId(appIdStr).setContainerWorkDir(containerWorkDir).setLocalDirs(localDirs).setLogDirs(logDirs).setFilecacheDirs(filecacheDirs).setUserLocalDirs(userLocalDirs).setContainerLocalDirs(containerLocalDirs).setContainerLogDirs(containerLogDirs).setUserFilecacheDirs(userFilecacheDirs).setApplicationLocalDirs(applicationLocalDirs).build());
7.1. 抽象类定义
/*** 在节点上启动container , 在container退出之前处于阻塞状态.* Launch the container on the node. This is a blocking call and returns only* when the container exits.* @param ctx Encapsulates information necessary for launching containers.* @return the return status of the launch* @throws IOException if the container launch fails* @throws ConfigurationException if config error was found*/public abstract int launchContainer(ContainerStartContext ctx) throwsIOException, ConfigurationException;
7.2.ContainerStartContext
启动的时候,需要一个入参ContainerStartContext . 主要是构建Container 所需的路径函数.
// 容器container container_1612228249380_0002_01_000001private final Container container;// 本地资源文件 数据结构: Map<Path, List<String>>// 以一条数据举例 : {Path@9415} "/opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jar" -> {ArrayList@9416} size = 1// key : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jar// value : ArrayList : 0 -> /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecache/141/commons-lang-2.6.jarprivate final Map<Path, List<String>> localizedResources;// 启动脚本位置nmPrivateContainerScriptPath// /opt/tools/hadoop-3.2.1/local-dirs/nmPrivate/application_1612228249380_0002/container_1612228249380_0002_01_000001/launch_container.shprivate final Path nmPrivateContainerScriptPath;// token文件位置// /opt/tools/hadoop-3.2.1/local-dirs/nmPrivate/application_1612228249380_0002/container_1612228249380_0002_01_000001/container_1612228249380_0002_01_000001.tokensprivate final Path nmPrivateTokensPath;// 用户 : hengheprivate final String user;// application ID : application_1612228249380_0002private final String appId;// 当前容器工作目录 : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002/container_1612228249380_0002_01_000001private final Path containerWorkDir;// 本地工作目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/local-dirsprivate final List<String> localDirs;// 日志目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/logs/userlogsprivate final List<String> logDirs;// 公共缓存目录private final List<String> filecacheDirs;// 用户本地目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/local-dirs/filecacheprivate final List<String> userLocalDirs;// 容器本地目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002/private final List<String> containerLocalDirs;// 容器日志目录[是一个集合存在多个] : /opt/tools/hadoop-3.2.1/logs/userlogs/application_1612228249380_0002/container_1612228249380_0002_01_000001private final List<String> containerLogDirs;// 用户文件缓存目录 : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/filecacheprivate final List<String> userFilecacheDirs;// application本地目录 : /opt/tools/hadoop-3.2.1/local-dirs/usercache/henghe/appcache/application_1612228249380_0002private final List<String> applicationLocalDirs;
7.3. 执行图
处理资源文件引用 -> 处理pidFilePath -> 执行 linuxContainerRuntime.launchContainer
7.4. 代码
- launchContainer
private int handleLaunchForLaunchType(ContainerStartContext ctx,ApplicationConstants.ContainerLaunchType type) throws IOException,ConfigurationException {//获取容器Container container = ctx.getContainer();// 获取用户String user = ctx.getUser();// 验证用户名权限, 是否可以执行任务.verifyUsernamePattern(user);// 获取ContainerIdContainerId containerId = container.getContainerId();// 处理资源引用resourcesHandler.preExecute(containerId, container.getResource());String resourcesOptions = resourcesHandler.getResourcesOption(containerId);String tcCommandFile = null;List<String> numaArgs = null;try {// 处理资源文件if (resourceHandlerChain != null) {List<PrivilegedOperation> ops = resourceHandlerChain.preStart(container);if (ops != null) {List<PrivilegedOperation> resourceOps = new ArrayList<>();resourceOps.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,resourcesOptions));for (PrivilegedOperation op : ops) {switch (op.getOperationType()) {case ADD_PID_TO_CGROUP:resourceOps.add(op);break;case TC_MODIFY_STATE:tcCommandFile = op.getArguments().get(0);break;case ADD_NUMA_PARAMS:numaArgs = op.getArguments();break;default:LOG.warn("PrivilegedOperation type unsupported in launch: "+ op.getOperationType());}}if (resourceOps.size() > 1) {//squash resource operationstry {PrivilegedOperation operation = PrivilegedOperationExecutor.squashCGroupOperations(resourceOps);resourcesOptions = operation.getArguments().get(0);} catch (PrivilegedOperationException e) {LOG.error("Failed to squash cgroup operations!", e);throw new ResourceHandlerException("Failed to squash cgroup operations!");}}}}} catch (ResourceHandlerException e) {LOG.error("ResourceHandlerChain.preStart() failed!", e);throw new IOException("ResourceHandlerChain.preStart() failed!", e);}try {// 处理pidFilePathPath pidFilePath = getPidFilePath(containerId);if (pidFilePath != null) {ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) {linuxContainerRuntime.relaunchContainer(runtimeContext);} else {// 交由具体的执行linuxContainerRuntime.launchContainer(runtimeContext);}} else {LOG.info("Container was marked as inactive. Returning terminated error");return ContainerExecutor.ExitCode.TERMINATED.getExitCode();}} catch (ContainerExecutionException e) {return handleExitCode(e, container, containerId);} finally {resourcesHandler.postExecute(containerId);postComplete(containerId);}return 0;}
八. cleanupBeforeRelaunch
清理Container,删除各种引用资源 . …
/*** Perform any cleanup before the next launch of the container.* @param container container*/public void cleanupBeforeRelaunch(Container container)throws IOException, InterruptedException {if (container.getLocalizedResources() != null) {// 获取资源软链Map<Path, Path> symLinks = resolveSymLinks( container.getLocalizedResources(), container.getUser());for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {LOG.debug("{} deleting {}", container.getContainerId(), symLink.getValue());// 删除资源deleteAsUser(new DeletionAsUserContext.Builder().setUser(container.getUser()).setSubDir(symLink.getValue()).build());}}}
Hadoop3.2.1 【 YARN 】源码分析 : LinuxContainerExecutor 浅析 [ 一 ]相关推荐
- YARN源码分析(一)-----ApplicationMaster
前言 在之前两周主要学了HDFS中的一些模块知识,其中的许多都或多或少有我们借鉴学习的地方,现在将目光转向另外一个块,被誉为MRv2,就是yarn,在Yarn中,解决了MR中JobTracker单点的 ...
- YARN源码分析—AM-RM通信协议,获得资源
在上几篇博文中分析了YARN调度模拟器SLS的源码,重点分析了AM与RM之间的通信协议. 接下来分析在YARN项目中,AM-RM通信如何实现的. 注意点:在YARN中,真正已经实现的只有RM和NM,而 ...
- Hadoop3.2.1 【 YARN 】源码分析 : ContainerManager浅析
一. 前言 Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagementProtocol#startContainer向NodeManage ...
- Hadoop 3.2.1 【 YARN 】源码分析 : DefaultContainerExecutor 浅析
一 .前言 DefaultContainerExecuter 类提供通用的container 执行服务. 负责启动Container . 是默认实现, 未提供任何权安全措施, 它以NodeManage ...
- Spark 源码分析
2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...
- spark 源码分析之二十 -- Stage的提交
引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...
- spark读取文件源码分析-1
文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...
- spark 调度模块详解及源码分析
spark 调度模块详解及源码分析 @(SPARK)[spark] spark 调度模块详解及源码分析 一概述 一三个主要的类 1class DAGScheduler 2trait TaskSched ...
- 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析
我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...
最新文章
- Java飞行记录器 JRockit Flight Recorder JFR诊断JVM的历史性能和操作
- UI培训分享:如何成为一名优秀的UI设计师
- jittor和pytorch生成网络对比之began
- eclipse下的spring环境配置
- 在C#中利用Keep-Alive处理Socket网络异常断开的方法
- C# 系统应用之鼠标模拟技术及自动操作鼠标
- 【飞鸽传书3.0】飞鸽传书3.0关键字布局
- c++switch语句中不能进行变量定义吗_Go 指南--控制流语句
- mysql列的数值型,字符型,日期型
- 网站移动端500错误_PC网站和移动端网站有什么差异?
- 罗斯蒙特电磁流量计8723说明书_罗斯蒙特电磁流量计8732E型的性能规格
- 安装mysql 错误重新安装
- 开源及第三方软件管理体系
- sokit socket调试工具
- 服务器端jQuery – phpQuery简要
- 利用AirPlayer空中播放PC服务器视频、音乐、图片
- Vue/ElementUI上传文件检验
- 武林传奇之七剑下天山java游戏开发商_武林传奇2 之七剑下天山的配方
- SL8100 宽电压100V降压12V-24V大功率LED照明降压恒流驱动芯片IC
- LATEX 排版问题记录
热门文章
- 微信小程序 | 借ChatGPT之手重构社交聊天小程序
- 为什么css放头部、js放尾部
- “程序设计与算法训练”课程设计:“BP神经网络的实现”(C++类封装实现)
- c++生成DLL文件(visual studio 2019)面向小白萌新
- 网络篇 网络设备的基本配置09
- Python贪吃蛇游戏详细代码和注释
- taro 项目中接入EChart图表库
- 使用mac聚焦搜索无法搜索软件的情况
- hp服务器系统安装xp糸统,惠普笔记本怎么安装XPghost版系统教程
- 数据维度太多,咋办?我们整理了17种表现形式