.

  • 一 .前言
  • 二 .启动解析
    • 2.1. StreamExecutionEnvironment#execute
    • 2.2. StreamExecutionEnvironment#executeAsync
    • 2.3. YarnClusterDescriptor#deployJobCluster
    • 2.4. YarnClusterDescriptor#deployInternal [核心]
    • 2.5. StreamExecutionEnvironment#startAppMaster
  • 三. 提交的文件清单
    • 3.1.提交的安装包清单
    • 3.2.HDFS上的程序清单

一 .前言

接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一] .
CliFrontend类最终会调用我们自己写的代码,入口类是main方法.

  • 整体流程图
  • 细节图

整体代码如下:

package org.apache.flink.streaming.examples.socket;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** Implements a streaming windowed version of the "WordCount" program.** <p>This program connects to a server socket and reads strings from the socket. The easiest way to* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via** <pre>* nc -l 12345 on Linux or nc -l -p 12345 on Windows* </pre>** <p>and run this example with the hostname and the port as arguments.*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {public static void main(String[] args) throws Exception {// the host and the port to connect tofinal String hostname;final int port;try {final ParameterTool params = ParameterTool.fromArgs(args);hostname = params.has("hostname") ? params.get("hostname") : "localhost";port = params.getInt("port");} catch (Exception e) {System.err.println("No port specified. Please run 'SocketWindowWordCount "+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "+ "and port is the address of the text server");System.err.println("To start a simple text server, run 'netcat -l <port>' and "+ "type the input text into the command line");return;}// get the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// get input data by connecting to the socketDataStream<String> text = env.socketTextStream(hostname, port, "\n");// parse the data, group it, window it, and aggregate the countsDataStream<WordWithCount> windowCounts =text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) {for (String word : value.split("\\s")) {out.collect(new WordWithCount(word, 1L));}}}).keyBy(value -> value.word).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WordWithCount>() {@Overridepublic WordWithCount reduce(WordWithCount a, WordWithCount b) {return new WordWithCount(a.word, a.count + b.count);}});// print the results with a single thread, rather than in parallelwindowCounts.print().setParallelism(1);env.execute("Socket Window WordCount");}// ------------------------------------------------------------------------/** Data type for words with count. */public static class WordWithCount {public String word;public long count;public WordWithCount() {}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return word + " : " + count;}}
}

二 .启动解析

2.1. StreamExecutionEnvironment#execute

在查看SocketWindowWordCount类的时候, 可以知道,这就是一个每5秒统计一次单词数量的类.
我们都知道, Flink代码最终执行的入口是env.execute, 所以我们从这里开始看.

  • StreamExecutionEnvironment#execute
    在这里我们看到首先通过getStreamGraph(jobName) 生成StreamGraph , 然后交由execute(StreamGraph streamGraph) 开始执行
/*** Triggers the program execution. The environment will execute all parts of the program that* have resulted in a "sink" operation. Sink operations are for example printing results or* forwarding them to a message queue.** <p>The program execution will be logged and displayed with the provided name** @param jobName Desired name of the job* @return The result of the job execution, containing elapsed time and accumulators.* @throws Exception which occurs during job execution.*/public JobExecutionResult execute(String jobName) throws Exception {Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");// 获取 getStreamGraph 继续执行...return execute(getStreamGraph(jobName));}/*** Triggers the program execution. The environment will execute all parts of the program that* have resulted in a "sink" operation. Sink operations are for example printing results or* forwarding them to a message queue.** @param streamGraph the stream graph representing the transformations* @return The result of the job execution, containing elapsed time and accumulators.* @throws Exception which occurs during job execution.*/@Internalpublic JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// ----------------------// 提交代码的streamGraph// ----------------------final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// get() on the JobExecutionResult Future will throw an ExecutionException. This// behaviour was largely not there in Flink versions before the PipelineExecutor// refactoring so we should strip that exception.Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);// never reached, only make javac happyreturn null;}}

在接下来我们就看到 executeAsync(streamGraph);

2.2. StreamExecutionEnvironment#executeAsync

这个方法是通过PipelineExecutorFactory获取对应的YarnJobClusterExecutorFactory .
然后再由YarnJobClusterExecutorFactory获取执行的容器 YarnJobClusterExecutor

@Overridepublic CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader)throws Exception {// 流图 抓换为 作业图// JobGraph(jobId: 536af83b56ddfc2ef4ffda8b43a21e15)final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);// 获取yarn集群的描述符try (final ClusterDescriptor<ClusterID> clusterDescriptor =clusterClientFactory.createClusterDescriptor(configuration)) {//    {//            taskmanager.memory.process.size=1728m,//            jobmanager.execution.failover-strategy=region,//            jobmanager.rpc.address=localhost,//            execution.target=yarn-per-job,//            jobmanager.memory.process.size=1600m,//            jobmanager.rpc.port=6123,//            execution.savepoint.ignore-unclaimed-state=false,//            execution.attached=true,//            execution.shutdown-on-attached-exit=false,//            pipeline.jars=[file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar],//            parallelism.default=1,//            taskmanager.numberOfTaskSlots=1,//            pipeline.classpaths=[],//            $internal.deployment.config-dir=/opt/tools/flink-1.12.2/conf,//            $internal.yarn.log-config-file=/opt/tools/flink-1.12.2/conf/log4j.properties//    }final ExecutionConfigAccessor configAccessor =ExecutionConfigAccessor.fromConfiguration(configuration);// 获取指定资源的描述//    masterMemoryMB = 1600//    taskManagerMemoryMB = 1728//    slotsPerTaskManager = 1final ClusterSpecification clusterSpecification =clusterClientFactory.getClusterSpecification(configuration);// 部署集群....final ClusterClientProvider<ClusterID> clusterClientProvider =clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));}}

2.3. YarnClusterDescriptor#deployJobCluster

直接调用deployInternal 方法开始 : 开始部署 yarn per-job cluster

@Overridepublic ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)throws ClusterDeploymentException {try {// 开始部署 yarn   per-job clusterreturn deployInternal(clusterSpecification,"Flink per-job cluster",// 这个数yarn集群入口的名字 !!!!!!getYarnJobClusterEntrypoint(),jobGraph,detached);} catch (Exception e) {throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);}}

2.4. YarnClusterDescriptor#deployInternal [核心]

/*** This method will block until the ApplicationMaster/JobManager have been deployed on YARN.** @param clusterSpecification Initial cluster specification for the Flink cluster to be*     deployed* @param applicationName name of the Yarn application to start* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none* @param detached True if the cluster should be started in detached mode*/private ClusterClientProvider<ApplicationId> deployInternal(ClusterSpecification clusterSpecification,String applicationName,String yarnClusterEntrypoint,@Nullable JobGraph jobGraph,boolean detached)throws Exception {// 安心配置信息相关.....// currentUser: sysadmin (auth:SIMPLE)final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {boolean useTicketCache =flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user "+ "does not have Kerberos credentials or delegation tokens!");}}// 开启一系列检测, 是否有足够的资源启动集群 (jar ,  conf , yarn core)isReadyForDeployment(clusterSpecification);// ------------------ Check if the specified queue exists --------------------// 检测队列是否存在...checkYarnQueues(yarnClient);// ------------------ Check if the YARN ClusterClient has the requested resources// --------------// 构建 application 的 客户端// Create application via yarnClientfinal YarnClientApplication yarnApplication = yarnClient.createApplication();// 获取响应信息.//    application_id {//        id: 10//        cluster_timestamp: 1615446205104//    }//    maximumCapability {//        memory: 8192//        virtual_cores: 4//        resource_value_map {//            key: "memory-mb"//            value: 8192//            units: "Mi"//            type: COUNTABLE//        }//        resource_value_map {//            key: "vcores"//            value: 4//            units: ""//            type: COUNTABLE//        }//    }final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();// 获取yarn集群资源的最大值...// <memory:8192, vCores:4>Resource maxRes = appResponse.getMaximumResourceCapability();final ClusterResourceDescription freeClusterMem;try {// 获取空间内存大小....// freeClusterMem ://    totalFreeMemory = 104857600//    containerLimit = 104857600//    nodeManagersFree = {int[1]@4603}freeClusterMem = getCurrentFreeClusterResources(yarnClient);} catch (YarnException | IOException e) {failSessionDuringDeployment(yarnClient, yarnApplication);throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);}// 获取yarn最小分配的内存大小, 默认 1024MBfinal int yarnMinAllocationMB =yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);// 如果最小分配的内存资源 < 0 , 抛出异常....if (yarnMinAllocationMB <= 0) {throw new YarnDeploymentException("The minimum allocation memory "+ "("+ yarnMinAllocationMB+ " MB) configured via '"+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB+ "' should be greater than 0.");}final ClusterSpecification validClusterSpecification;try {// 开启验证, 验证资源是否满足需求...validClusterSpecification =validateClusterResources(clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);} catch (YarnDeploymentException yde) {failSessionDuringDeployment(yarnClient, yarnApplication);throw yde;}LOG.info("Cluster specification: {}", validClusterSpecification);// 开始获取设定的启动模式... : NORMALfinal ClusterEntrypoint.ExecutionMode executionMode =detached? ClusterEntrypoint.ExecutionMode.DETACHED: ClusterEntrypoint.ExecutionMode.NORMAL;// 设置启动模式 : internal.cluster.execution-mode =>flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());// 开始启动AppMaster, 获取响应信息ApplicationReport report =startAppMaster(flinkConfiguration,applicationName,yarnClusterEntrypoint,jobGraph,yarnClient,yarnApplication,validClusterSpecification);// 输出application id, 主要用去取消任务...// print the application id for user to cancel themselves.if (detached) {final ApplicationId yarnApplicationId = report.getApplicationId();logDetachedClusterInformation(yarnApplicationId, LOG);}// 设置集群配置...setClusterEntrypointInfoToConfig(report);return () -> {try {// 构建反馈的集群客户端...return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());} catch (Exception e) {throw new RuntimeException("Error while creating RestClusterClient.", e);}};}

2.5. StreamExecutionEnvironment#startAppMaster

提交任务, 启动AppMaster .

   private ApplicationReport startAppMaster(Configuration configuration,String applicationName,String yarnClusterEntrypoint,JobGraph jobGraph,YarnClient yarnClient,YarnClientApplication yarnApplication,ClusterSpecification clusterSpecification)throws Exception {// ------------------ Initialize the file systems -------------------------// 初始化文件系统org.apache.flink.core.fs.FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));// 获取文件系统: LocalFileSystemfinal FileSystem fs = FileSystem.get(yarnConfiguration);// 硬编码检查GoogleHadoopFileSystem, 因为他没有复写getScheme 方法// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")&& fs.getScheme().startsWith("file")) {LOG.warn("The file system scheme is '"+ fs.getScheme()+ "'. This indicates that the "+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."+ "The Flink YARN client needs to store its files in a distributed file system");}ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();/// jarfinal List<Path> providedLibDirs =Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);// 文件上传工具//    fileSystem = {LocalFileSystem@5001}//    applicationId = {ApplicationIdPBImpl@5007} "application_1615446205104_0010"//    homeDir = {Path@5067} "file:/Users/sysadmin"//    applicationDir = {Path@5068} "file:/Users/sysadmin/.flink/application_1615446205104_0010"//    providedSharedLibs = {Collections$UnmodifiableMap@5069}  size = 0//    localResources = {HashMap@5070}  size = 2//           "log4j.properties" -> {LocalResourcePBImpl@5080} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0010/log4j.properties" } size: 2620 timestamp: 1615717946000 type: FILE visibility: APPLICATION"//           "SocketWindowWordCount.jar" -> {LocalResourcePBImpl@5082} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0010/SocketWindowWordCount.jar" } size: 14708 timestamp: 1615717946000 type: FILE visibility: APPLICATION"//    fileReplication = 3//    remotePaths = {ArrayList@5071}  size = 2//           0 = {Path@5086} "file:/opt/tools/flink-1.12.2/conf/log4j.properties"//           1 = {Path@5087} "file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar"//    envShipResourceList = {ArrayList@5072}  size = 2//           0 = {YarnLocalResourceDescriptor@5091} "YarnLocalResourceDescriptor{key=log4j.properties, path=file:/Users/sysadmin/.flink/application_1615446205104_0010/log4j.properties, size=2620, modificationTime=1615717946000, visibility=APPLICATION, type=FILE}"//           1 = {YarnLocalResourceDescriptor@5092} "YarnLocalResourceDescriptor{key=SocketWindowWordCount.jar, path=file:/Users/sysadmin/.flink/application_1615446205104_0010/SocketWindowWordCount.jar, size=14708, modificationTime=1615717946000, visibility=APPLICATION, type=FILE}"//    flinkDist = nullfinal YarnApplicationFileUploader fileUploader =YarnApplicationFileUploader.from(fs,getStagingDir(fs),providedLibDirs,appContext.getApplicationId(),getFileReplication());// The files need to be shipped and added to classpath.// /opt/tools/flink-1.12.2/conf/log4j.propertiesSet<File> systemShipFiles = new HashSet<>(shipFiles.size());for (File file : shipFiles) {systemShipFiles.add(file.getAbsoluteFile());}// 获取日志配置目录final String logConfigFilePath =  configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);if (logConfigFilePath != null) {systemShipFiles.add(new File(logConfigFilePath));}// Set-up ApplicationSubmissionContext for the application// 获取 ApplicationId : application_1615446205104_0010final ApplicationId appId = appContext.getApplicationId();// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------setHAClusterIdIfNotSet(configuration, appId);if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {// 是否高可用 :  做大重试次数 :  yarn.application-attempts =>  2// activate re-execution of failed applicationsappContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(),YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));activateHighAvailabilitySupport(appContext);} else {// 设置重试次数为 1// set number of application retries to 1 in the default caseappContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));}// 处理用户jar// 0 : file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jarfinal Set<Path> userJarFiles = new HashSet<>();if (jobGraph != null) {userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));}// 0 : file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jarfinal List<URI> jarUrls =  ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);if (jarUrls != null&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));}// only for per job modeif (jobGraph != null) {for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :jobGraph.getUserArtifacts().entrySet()) {// 仅仅更新本地文件 ???// only upload local filesif (!Utils.isRemotePath(entry.getValue().filePath)) {Path localPath = new Path(entry.getValue().filePath);Tuple2<Path, Long> remoteFileInfo = fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());jobGraph.setUserArtifactRemotePath( entry.getKey(), remoteFileInfo.f0.toString());}}jobGraph.writeUserArtifactEntriesToConfiguration();}if (providedLibDirs == null || providedLibDirs.isEmpty()) {addLibFoldersToShipFiles(systemShipFiles);}// Register all files in provided lib dirs as local resources with public visibility// and upload the remaining dependencies as local resources with APPLICATION visibility.// systemClassPaths://    0 = ""//    1 = "SocketWindowWordCount.jar"final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();final List<String> uploadedDependencies =fileUploader.registerMultipleLocalResources(systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.FILE);systemClassPaths.addAll(uploadedDependencies);// upload and register ship-only files// Plugin files only need to be shipped and should not be added to classpath.if (providedLibDirs == null || providedLibDirs.isEmpty()) {Set<File> shipOnlyFiles = new HashSet<>();addPluginsFoldersToShipFiles(shipOnlyFiles);fileUploader.registerMultipleLocalResources(shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.FILE);}if (!shipArchives.isEmpty()) {fileUploader.registerMultipleLocalResources(shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.ARCHIVE);}// 上传&注册用户jar : SocketWindowWordCount.jar// Upload and register user jarsfinal List<String> userClassPaths =fileUploader.registerMultipleLocalResources(userJarFiles,userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR: Path.CUR_DIR,LocalResourceType.FILE);if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {systemClassPaths.addAll(userClassPaths);}// normalize classpath by sortingCollections.sort(systemClassPaths);Collections.sort(userClassPaths);// classpath assemblerStringBuilder classPathBuilder = new StringBuilder();if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {for (String userClassPath : userClassPaths) {classPathBuilder.append(userClassPath).append(File.pathSeparator);}}for (String classPath : systemClassPaths) {classPathBuilder.append(classPath).append(File.pathSeparator);}// Setup jar for ApplicationMasterfinal YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator);// write job graph to tmp file and add it to local resource// TODO: server use user main method to generate job graphif (jobGraph != null) {File tmpJobGraphFile = null;try {tmpJobGraphFile = File.createTempFile(appId.toString(), null);try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);ObjectOutputStream obOutput = new ObjectOutputStream(output)) {obOutput.writeObject(jobGraph);}final String jobGraphFilename = "job.graph";configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);fileUploader.registerSingleLocalResource(jobGraphFilename,new Path(tmpJobGraphFile.toURI()),"",LocalResourceType.FILE,true,false);classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);} catch (Exception e) {LOG.warn("Add job graph to local resource fail.");throw e;} finally {if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());}}}// 上传flink的配置文件....// Upload the flink configuration// write out configuration fileFile tmpConfigurationFile = null;try {tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);String flinkConfigKey = "flink-conf.yaml";fileUploader.registerSingleLocalResource(flinkConfigKey,new Path(tmpConfigurationFile.getAbsolutePath()),"",LocalResourceType.FILE,true,true);classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);} finally {if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());}}if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {for (String userClassPath : userClassPaths) {classPathBuilder.append(userClassPath).append(File.pathSeparator);}}// 权限相关的验证......// To support Yarn Secure Integration Test Scenario// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have// the Yarn site XML// and KRB5 configuration files. We are adding these files as container local resources for// the container// applications (JM/TMs) to have proper secure cluster setupPath remoteYarnSiteXmlPath = null;if (System.getenv("IN_TESTS") != null) {File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);LOG.info("Adding Yarn configuration {} to the AM container local resource bucket",f.getAbsolutePath());Path yarnSitePath = new Path(f.getAbsolutePath());remoteYarnSiteXmlPath =fileUploader.registerSingleLocalResource(Utils.YARN_SITE_FILE_NAME,yarnSitePath,"",LocalResourceType.FILE,false,false).getPath();if (System.getProperty("java.security.krb5.conf") != null) {configuration.set(SecurityOptions.KERBEROS_KRB5_PATH,System.getProperty("java.security.krb5.conf"));}}// kerberos 验证......Path remoteKrb5Path = null;boolean hasKrb5 = false;String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {final File krb5 = new File(krb5Config);LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket",krb5.getAbsolutePath());final Path krb5ConfPath = new Path(krb5.getAbsolutePath());remoteKrb5Path =fileUploader.registerSingleLocalResource(Utils.KRB5_FILE_NAME,krb5ConfPath,"",LocalResourceType.FILE,false,false).getPath();hasKrb5 = true;}Path remotePathKeytab = null;String localizedKeytabPath = null;String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);if (keytab != null) {boolean localizeKeytab =flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);localizedKeytabPath =flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);if (localizeKeytab) {// Localize the keytab to YARN containers via local resource.LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);remotePathKeytab =fileUploader.registerSingleLocalResource(localizedKeytabPath,new Path(keytab),"",LocalResourceType.FILE,false,false).getPath();} else {// // Assume Keytab is pre-installed in the container.localizedKeytabPath =flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);}}final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);final ContainerLaunchContext amContainer =setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);// setup security tokensif (UserGroupInformation.isSecurityEnabled()) {// set HDFS delegation tokens when security is enabledLOG.info("Adding delegation token to the AM container.");List<Path> yarnAccessList =ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.YARN_ACCESS, Path::new);Utils.setTokensFor(amContainer,ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),yarnConfiguration);}amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());// 上传完成,关闭连接...fileUploader.close();// Setup CLASSPATH and environment variables for ApplicationMaster// 创建AM的环境信息.final Map<String, String> appMasterEnv = new HashMap<>();// 设置用户指定的环境变量// set user specified app master environment variablesappMasterEnv.putAll(ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));// 设置Flink的路径// set Flink app class path : _FLINK_CLASSPATH -> :// SocketWindowWordCount.jar:flink-dist_2.11-1.12.2.jar:job.graph:flink-conf.yaml:appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());// set Flink on YARN internal configuration values//    _FLINK_DIST_JAR -> YarnLocalResourceDescriptor{//        key=flink-dist_2.11-1.12.2.jar,//        path=file:/Users/sysadmin/.flink/application_1615446205104_0011/flink-dist_2.11-1.12.2.jar,//        size=114224188,//        modificationTime=1615718665000,//        visibility=APPLICATION,//        type=FILE//    }appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());// _APP_ID -> application_1615446205104_0011appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());// _CLIENT_HOME_DIR -> file:/Users/sysadminappMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());//    _CLIENT_SHIP_FILES -> YarnLocalResourceDescriptor{//        key=log4j.properties,//                path=file:/Users/sysadmin/.flink/application_1615446205104_0011/log4j.properties, size=2620,//                modificationTime=1615718651000,//                visibility=APPLICATION,//                type=FILE//    };////    YarnLocalResourceDescriptor{//        key=SocketWindowWordCount.jar,//                path=file:/Users/sysadmin/.flink/application_1615446205104_0011/SocketWindowWordCount.jar,//                size=14708,//                modificationTime=1615718656000,//                visibility=APPLICATION,//                type=FILE//    };//    YarnLocalResourceDescriptor{//        key=flink-conf.yaml,//                path=file:/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_0011-flink-conf.yaml3338435768180930238.tmp,//                size=775,//                modificationTime=1615718773000,//                visibility=APPLICATION, type=FILE//    }appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));// _FLINK_YARN_FILES -> file:/Users/sysadmin/.flink/application_1615446205104_0011appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());// HADOOP_USER_NAME -> sysadmin// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_nameappMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());if (localizedKeytabPath != null) {appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);if (remotePathKeytab != null) {appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());}}// To support Yarn Secure Integration Test Scenarioif (remoteYarnSiteXmlPath != null) {appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());}if (remoteKrb5Path != null) {appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());}// set classpath from YARN configuration////    "_APP_ID" -> "application_1615446205104_0011"//    "_FLINK_YARN_FILES" -> "file:/Users/sysadmin/.flink/application_1615446205104_0011"//    "HADOOP_USER_NAME" -> "sysadmin"//    "_FLINK_CLASSPATH" -> ":SocketWindowWordCount.jar:flink-dist_2.11-1.12.2.jar:job.graph:flink-conf.yaml:"//    "_CLIENT_SHIP_FILES" -> "YarnLocalResourceDescriptor{key=log4j.properties, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/log4j.properties, size=2620, modificationTime=1615718651000, visibility=APPLICATION, type=FILE};YarnLocalResourceDescriptor{key=SocketWindowWordCount.jar, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/SocketWindowWordCount.jar, size=14708, modificationTime=1615718656000, visibility=APPLICATION, type=FILE};YarnLocalResourceDescriptor{key=flink-conf.yaml, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_0011-flink-conf.yaml3338435768180930238.tmp, size=775, modificationTime=1615718773000, visibility=APPLICATION, type=FILE}"//    "_FLINK_DIST_JAR" -> "YarnLocalResourceDescriptor{key=flink-dist_2.11-1.12.2.jar, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/flink-dist_2.11-1.12.2.jar, size=114224188, modificationTime=1615718665000, visibility=APPLICATION, type=FILE}"//    "_CLIENT_HOME_DIR" -> "file:/Users/sysadmin"//    "CLASSPATH" -> ":SocketWindowWordCount.jar:flink-dist_2.11-1.12.2.jar:job.graph:flink-conf.yaml::$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*"//Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);// 设置环境信息到amContaineramContainer.setEnvironment(appMasterEnv);// Set up resource type requirements for ApplicationMaster//    Resource capability://    proto = {YarnProtos$ResourceProto@5995} "memory: 1600\nvirtual_cores: 1\nresource_value_map {\n  key: "memory-mb"\n  value: 1600\n  units: "Mi"\n  type: COUNTABLE\n}\nresource_value_map {\n  key: "vcores"\n  value: 1\n  units: ""\n  type: COUNTABLE\n}\n"//    builder = {YarnProtos$ResourceProto$Builder@5996}//    viaProto = true//    resources = {ResourceInformation[2]@5997}//    0 = {ResourceInformation@6009} "name: memory-mb, units: Mi, type: COUNTABLE, value: 1600, minimum allocation: 0, maximum allocation: 9223372036854775807"//    1 = {ResourceInformation@6010} "name: vcores, units: , type: COUNTABLE, value: 1, minimum allocation: 0, maximum allocation: 9223372036854775807"Resource capability = Records.newRecord(Resource.class);capability.setMemory(clusterSpecification.getMasterMemoryMB());capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));// Flink per-job clusterfinal String customApplicationName = customName != null ? customName : applicationName;appContext.setApplicationName(customApplicationName);appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");appContext.setAMContainerSpec(amContainer);appContext.setResource(capability);// Set priority for application :  -1int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);if (priorityNum >= 0) {Priority priority = Priority.newInstance(priorityNum);appContext.setPriority(priority);}if (yarnQueue != null) {////设置队列...appContext.setQueue(yarnQueue);}setApplicationNodeLabel(appContext);setApplicationTags(appContext);// add a hook to clean up in case deployment failsThread deploymentFailureHook =new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());Runtime.getRuntime().addShutdownHook(deploymentFailureHook);LOG.info("Submitting application master " + appId);//  amContainer :////    proto = {YarnProtos$ContainerLaunchContextProto@6013} "localResources {\n         key: "flink-conf.yaml"\n         value {\n                  resource {\n                           scheme: "file"\n                           port: -1\n                           file: "/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_0011-flink-conf.yaml3338435768180930238.tmp"\n                  }\n                  size: 775\n                  timestamp: 1615718773000\n                  type: FILE\n                  visibility: APPLICATION\n         }\n}\nlocalResources {\n         key: "flink-dist_2.11-1.12.2.jar"\n         value {\n                  resource {\n                           scheme: "file"\n                           port: -1\n                           file: "/Users/sysadmin/.flink/application_1615446205104_0011/flink-dist_2.11-1.12.2.jar"\n                  }\n                  size: 114224188\n                  timestamp: 1615718665000\n                  type: FILE\n                  visibility: APPLICATION\n         }\n}\nlocalResources {\n         key: "log4j.properties"\n         value {\n                  resource {\n                           scheme: "file"\n                           port: -1\n                           file: "/Users/sysadmin/.flink/application_1615446205104_0011/log4j.properties"\n                  }\n                  size: 2620\n                  timestamp: 1615718651000\n                  type: FILE\n                  visibility: APPLICATION\n         }\n}\nlocalResources {\n         key: "job.graph"\n         value {\n                  resource {"//    builder = {YarnProtos$ContainerLaunchContextProto$Builder@6014}//    viaProto = true//    localResources = {HashMap@6015}         size = 5//    "flink-conf.yaml" -> {LocalResourcePBImpl@6039} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_0011-flink-conf.yaml3338435768180930238.tmp" } size: 775 timestamp: 1615718773000 type: FILE visibility: APPLICATION"//    "flink-dist_2.11-1.12.2.jar" -> {LocalResourcePBImpl@6041} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0011/flink-dist_2.11-1.12.2.jar" } size: 114224188 timestamp: 1615718665000 type: FILE visibility: APPLICATION"//    "log4j.properties" -> {LocalResourcePBImpl@6043} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0011/log4j.properties" } size: 2620 timestamp: 1615718651000 type: FILE visibility: APPLICATION"//    "job.graph" -> {LocalResourcePBImpl@6044} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_00114338987264188416150.tmp" } size: 47295 timestamp: 1615718765000 type: FILE visibility: APPLICATION"//    "SocketWindowWordCount.jar" -> {LocalResourcePBImpl@6046} "resource { scheme: "file" port: -1 file: "/Users/sysadmin/.flink/application_1615446205104_0011/SocketWindowWordCount.jar" } size: 14708 timestamp: 1615718656000 type: FILE visibility: APPLICATION"//    tokens = null//    tokensConf = null//    serviceData = null//    environment = {HashMap@6016}         size = 8//    "_APP_ID" -> "application_1615446205104_0011"//    "_FLINK_YARN_FILES" -> "file:/Users/sysadmin/.flink/application_1615446205104_0011"//    "HADOOP_USER_NAME" -> "sysadmin"//    "_FLINK_CLASSPATH" -> ":SocketWindowWordCount.jar:flink-dist_2.11-1.12.2.jar:job.graph:flink-conf.yaml:"//    "_CLIENT_SHIP_FILES" -> "YarnLocalResourceDescriptor{key=log4j.properties, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/log4j.properties, size=2620, modificationTime=1615718651000, visibility=APPLICATION, type=FILE};YarnLocalResourceDescriptor{key=SocketWindowWordCount.jar, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/SocketWindowWordCount.jar, size=14708, modificationTime=1615718656000, visibility=APPLICATION, type=FILE};YarnLocalResourceDescriptor{key=flink-conf.yaml, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/application_1615446205104_0011-flink-conf.yaml3338435768180930238.tmp, size=775, modificationTime=1615718773000, visibility=APPLICATION, type=FILE}"//    "_FLINK_DIST_JAR" -> "YarnLocalResourceDescriptor{key=flink-dist_2.11-1.12.2.jar, path=file:/Users/sysadmin/.flink/application_1615446205104_0011/flink-dist_2.11-1.12.2.jar, size=114224188, modificationTime=1615718665000, visibility=APPLICATION, type=FILE}"//    "_CLIENT_HOME_DIR" -> "file:/Users/sysadmin"//    "CLASSPATH" -> ":SocketWindowWordCount.jar:flink-dist_2.11-1.12.2.jar:job.graph:flink-conf.yaml::$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*"//    commands = {ArrayList@6017}         size = 1//    0 = "$JAVA_HOME/bin/java -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file="<LOG_DIR>/jobmanager.log" -Dlog4j.configuration=file:log4j.properties -Dlog4j.configurationFile=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b 1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"//    applicationACLS = null//    containerRetryContext = null//提交应用...yarnClient.submitApplication(appContext);LOG.info("Waiting for the cluster to be allocated");final long startTime = System.currentTimeMillis();ApplicationReport report;YarnApplicationState lastAppState = YarnApplicationState.NEW;loop:while (true) {try {report = yarnClient.getApplicationReport(appId);} catch (IOException e) {throw new YarnDeploymentException("Failed to deploy the cluster.", e);}YarnApplicationState appState = report.getYarnApplicationState();LOG.debug("Application State: {}", appState);switch (appState) {case FAILED:case KILLED:throw new YarnDeploymentException("The YARN application unexpectedly switched to state "+ appState+ " during deployment. \n"+ "Diagnostics from YARN: "+ report.getDiagnostics()+ "\n"+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"+ "yarn logs -applicationId "+ appId);// break ..case RUNNING:LOG.info("YARN application has been deployed successfully.");break loop;case FINISHED:LOG.info("YARN application has been finished successfully.");break loop;default:if (appState != lastAppState) {LOG.info("Deploying cluster, current state " + appState);}if (System.currentTimeMillis() - startTime > 60000) {LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");}}lastAppState = appState;Thread.sleep(250);}// since deployment was successful, remove the hookShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);return report;}

三. 提交的文件清单

3.1.提交的安装包清单

作用
SocketWindowWordCount.jar 主程序包
log4j.properties 日志配置
flink-dist_2.11-1.12.2.jar flink依赖包
application_1615446205104_00132401475399357854657.tmp 二进制文件,作用未知
application_1615446205104_0013-flink-conf.yaml7045906712053828949.tmp 配置信息

application_1615446205104_0013-flink-conf.yaml7045906712053828949.tmp 信息如下 :

taskmanager.memory.process.size: 1728m
internal.jobgraph-path: job.graph
jobmanager.execution.failover-strategy: region
high-availability.cluster-id: application_1615446205104_0013
jobmanager.rpc.address: localhost
execution.target: yarn-per-job
jobmanager.memory.process.size: 1600m
jobmanager.rpc.port: 6123
execution.savepoint.ignore-unclaimed-state: false
execution.attached: true
internal.cluster.execution-mode: NORMAL
execution.shutdown-on-attached-exit: false
pipeline.jars: file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar
parallelism.default: 1
taskmanager.numberOfTaskSlots: 1
pipeline.classpaths:
$internal.deployment.config-dir: /opt/tools/flink-1.12.2/conf
$inclassloader.check-leaked-classloadeternal.yarn.log-config-file: /opt/tools/flink-1.12.2/conf/log4j.properties

3.2.HDFS上的程序清单

  • application_1615446205104_0004
BoYi-Pro:.flink sysadmin$ hadoop fs -ls /user/sysadmin/.flink/application_1615446205104_0004
Found 7 items
-rw-r--r--   1 sysadmin supergroup      14708 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/SocketWindowWordCount.jar
-rw-r--r--   1 sysadmin supergroup        775 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/application_1615446205104_0004-flink-conf.yaml6535310756280875271.tmp
-rw-r--r--   1 sysadmin supergroup      47295 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/application_1615446205104_00041541924063981626988.tmp
-rw-r--r--   1 sysadmin supergroup  114224188 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/flink-dist_2.11-1.12.2.jar
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib
-rw-r--r--   1 sysadmin supergroup       2620 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/log4j.properties
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins
  • application_1615446205104_0004/lib
BoYi-Pro:.flink sysadmin$ hadoop fs -ls /user/sysadmin/.flink/application_1615446205104_0004/lib
Found 10 items
-rw-r--r--   1 sysadmin supergroup      91745 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-csv-1.12.2.jar
-rw-r--r--   1 sysadmin supergroup     137004 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-json-1.12.2.jar
-rw-r--r--   1 sysadmin supergroup   59381853 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar
-rw-r--r--   1 sysadmin supergroup    7709741 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-shaded-zookeeper-3.4.14.jar
-rw-r--r--   1 sysadmin supergroup   40316352 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-table-blink_2.11-1.12.2.jar
-rw-r--r--   1 sysadmin supergroup   36149872 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/flink-table_2.11-1.12.2.jar
-rw-r--r--   1 sysadmin supergroup      67114 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/log4j-1.2-api-2.12.1.jar
-rw-r--r--   1 sysadmin supergroup     276771 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/log4j-api-2.12.1.jar
-rw-r--r--   1 sysadmin supergroup    1674433 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/log4j-core-2.12.1.jar
-rw-r--r--   1 sysadmin supergroup      23518 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/lib/log4j-slf4j-impl-2.12.1.jar
  • application_1615446205104_0004/plugins
BoYi-Pro:.flink sysadmin$ hadoop fs -ls /user/sysadmin/.flink/application_1615446205104_0004/plugins
Found 10 items
-rw-r--r--   1 sysadmin supergroup       8196 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/.DS_Store
-rw-r--r--   1 sysadmin supergroup        654 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/README.txt
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/external-resource-gpu
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-datadog
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-graphite
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-influx
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-jmx
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-prometheus
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-slf4j
drwx------   - sysadmin supergroup          0 2021-03-11 18:20 /user/sysadmin/.flink/application_1615446205104_0004/plugins/metrics-statsd

Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二]相关推荐

  1. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  2. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  3. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  4. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文.这里是补充专栏.请看原文 2. 前言 主要针对yarn-per-job模式进行代码分析. ...

  5. 【flink】Flink 1.12.2 源码浅析 : Task数据输入

    1.概述 转载:Flink 1.12.2 源码浅析 : Task数据输入 在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应 ...

  6. 【flink】Flink 1.12.2 源码浅析 :Task数据输出

    1.概述 转载:Flink 1.12.2 源码浅析 :Task数据输出 Stream的计算模型采用的是PUSH模式, 上游主动向下游推送数据, 上下游之间采用生产者-消费者模式, 下游收到数据触发计算 ...

  7. 【flink】Flink 1.12.2 源码浅析 : StreamTask 浅析

    1.概述 转载:Flink 1.12.2 源码浅析 : StreamTask 浅析 在Task类的doRun方法中, 首先会构建一个运行环境变量RuntimeEnvironment . 然后会调用lo ...

  8. 【flink】Flink 1.12.2 源码浅析 : Task 浅析

    1.概述 转载:Flink 1.12.2 源码浅析 : Task 浅析 Task 表示TaskManager上并行 subtask 的一次执行. Task封装了一个Flink operator(也可能 ...

  9. 【Flink】Flink 1.12.2 源码浅析 : TaskExecutor

    1.概述 转载:Flink 1.12.2 源码浅析 : TaskExecutor TaskExecutor 是TaskManger的具体实现. 二 .TaskExecutorGateway TaskE ...

最新文章

  1. 1.9 GRU 单元-深度学习第五课《序列模型》-Stanford吴恩达教授
  2. docker部署nginx并且挂载文件夹和文件
  3. SpringBoot 整合Security——自定义表单登录
  4. Leet Code OJ 242. Valid Anagram [Difficulty: Easy]
  5. java plus方法_Java.math.BigDecimal.plus()方法实例
  6. oracle scn与数据恢复,[Oracle] SCN与数据恢复的关系
  7. numpy数组按某一维度相加_Python数据分析之NumPy(高级篇)
  8. shell中执行某条语句失败能不能重复执行_如何理解Mysql中的事务隔离级别?
  9. 51 Nod 1161 Partial sums
  10. mysql 刘道成视频教程 第4-8课 --- 数据类型
  11. javaweb(06) 初步了解HTTP协议
  12. 一、Oracle学习笔记:认识数据库
  13. alibaba/Sentinel 分布式 系统流量防卫兵
  14. 欢迎使用Markdown编辑器
  15. 申报指南|教你如何优雅地报名、提交项目申请书
  16. KDevelop开发环境的搭建
  17. 最新公开获取网站访客QQ开源代码成功率90%免费使用
  18. 广州市越秀区2021-2022学年九年级第一学期期末考试英语试题
  19. JQuery实现slideToggle()滑动的效果
  20. A Love Letter To Josephine

热门文章

  1. Navicat中设计表时int类型的长度说明
  2. MNL——多项Logit模型学习笔记(三)二项Logit模型、Gumble分布以及Logistic分布
  3. Cisco 642-655 考试权威题库免费下载
  4. java.io和java.nio性能简单对比
  5. Linux修改静态IP
  6. RLException: [xx.launch] is neither a launch file in package [x] nor is [x] a launch file name的解决方法
  7. Java实现图片上传到服务器
  8. 内网穿透工具---frp使用教程
  9. Oracle 字段自增
  10. restcontrol 注解