Flink1.15源码

Flink1.15源码

  • Flink1.15源码
  • flink入口类(bin/flink)
  • parseAndRun(args)
    • A、run(per-job&yarn-session)
      • 1、ProgramOptions.create(封装参数)
        • 1.1、ProgramOptions
      • 2、getJobJarAndDependencies(返回可用jar包)
        • 2.1、getJobJarAndDependencies
      • 3、executeProgram
        • 3.1、ClientUtils.executeProgram
        • 3.2、invokeInteractiveModeForExecution
        • 3.3、callMainMethod
    • B、parseAndRun(run-application)
      • 1、ApplicationClusterDeployer
        • 1.1、ApplicationDeployer(注意看注释信息)
        • 1.2、ClusterSpecification
        • 1.3、clusterDescriptor.deployApplicationCluster
          • 1.3.1、return deployInternal(...)
            • 1.3.1.1 getCurrentFreeClusterResources
            • 1.3.1.2 startAppMaster(很长。慢慢看)

flink入口类(bin/flink)

"org.apache.flink.client.cli.CliFrontend"
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);// 1. System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)找到配置文件路径final String configurationDirectory = getConfigurationDirectoryFromEnv();// 2. 加载全局配置final Configuration configuration =GlobalConfiguration.loadConfiguration(configurationDirectory);// 3. 加载命令行参数配置final List<CustomCommandLine> customCommandLines =loadCustomCommandLines(configuration, configurationDirectory);int retCode = 31;try {final CliFrontend cli = new CliFrontend(configuration, customCommandLines);SecurityUtils.install(new SecurityConfiguration(cli.configuration));// 解析参数并执行retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) {final Throwable strippedThrowable =ExceptionUtils.stripException(t, UndeclaredThrowableException.class);LOG.error("Fatal error while running command line interface.", strippedThrowable);strippedThrowable.printStackTrace();} finally {System.exit(retCode);}

parseAndRun(args)

    // 核对是否有参数if (args.length < 1) {CliFrontendParser.printHelp(customCommandLines);System.out.println("Please specify an action.");return 1;}// 提交模式String action = args[0];switch (action) {case ACTION_RUN: // action为'run',其中有两种提交模式:'-t per-job' 和'-t yarn-session'run(params);return 0;case ACTION_RUN_APPLICATION: // action为'run-application',提交模式为:'-t yarn-application'runApplication(params);return 0;...}

A、run(per-job&yarn-session)

    protected void run(String[] args) throws Exception {LOG.info("Running 'run' command.");// 添加配置参数:SAVEPOINT_PATH_OPTION,SAVEPOINT_ALLOW_NON_RESTORED_OPTION,SAVEPOINT_RESTORE_MODEfinal Options commandOptions = CliFrontendParser.getRunCommandOptions();// 合并配置参数'commandOptions'和命令行输入的'args'参数final CommandLine commandLine = getCommandLine(commandOptions, args, true);// evaluate help flag ,"Show the help message for the CLI Frontend or the action."if (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRun(customCommandLines);return;}// 验证参数是否可用final CustomCommandLine activeCommandLine =validateAndGetActiveCommandLine(checkNotNull(commandLine));// 封装参数final ProgramOptions programOptions = ProgramOptions.create(commandLine);// 返回可用jar包final List<URL> jobJars = getJobJarAndDependencies(programOptions);// 封装所有有效配置 -> Configurationfinal Configuration effectiveConfiguration =getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);LOG.debug("Effective executor configuration: {}", effectiveConfiguration);try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {executeProgram(effectiveConfiguration, program);}}

1、ProgramOptions.create(封装参数)

    public static ProgramOptions create(CommandLine line) throws CliArgsException {if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {return createPythonProgramOptions(line);} else {// 返回项目参数,如 class,jar, parallelismreturn new ProgramOptions(line);}}

1.1、ProgramOptions

    protected ProgramOptions(CommandLine line) throws CliArgsException {super(line);......this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);}

2、getJobJarAndDependencies(返回可用jar包)

    private List<URL> getJobJarAndDependencies(ProgramOptions programOptions)throws CliArgsException {// 入口类String entryPointClass = programOptions.getEntryPointClassName();// jar包路径String jarFilePath = programOptions.getJarFilePath();try {File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;return PackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);} ...}

2.1、getJobJarAndDependencies

    public static List<URL> getJobJarAndDependencies(File jarFile, @Nullable String entryPointClassName) throws ProgramInvocationException {URL jarFileUrl = loadJarFile(jarFile);List<File> extractedTempLibraries =jarFileUrl == null? Collections.emptyList(): extractContainedLibraries(jarFileUrl);List<URL> libs = new ArrayList<URL>(extractedTempLibraries.size() + 1);if (jarFileUrl != null) {libs.add(jarFileUrl);}for (File tmpLib : extractedTempLibraries) {try {libs.add(tmpLib.getAbsoluteFile().toURI().toURL());} catch (MalformedURLException e) {throw new RuntimeException("URL is invalid. This should not happen.", e);}}if (isPython(entryPointClassName)) {libs.add(PackagedProgramUtils.getPythonJar());}return libs;}

3、executeProgram

    protected void executeProgram(final Configuration configuration, final PackagedProgram program)throws ProgramInvocationException {ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);}

3.1、ClientUtils.executeProgram

    public static void executeProgram(...)throws ProgramInvocationException {......try {program.invokeInteractiveModeForExecution();} ...}

3.2、invokeInteractiveModeForExecution

    public void invokeInteractiveModeForExecution() throws ProgramInvocationException {...try {callMainMethod(mainClass, args);} ...}

3.3、callMainMethod

    private static void callMainMethod(Class<?> entryClass, String[] args)throws ProgramInvocationException {Method mainMethod;...try {mainMethod = entryClass.getMethod("main", String[].class);} ...try {// 调用提交任务中jar包的class方法mainMethod.invoke(null, (Object) args);} ...}

B、parseAndRun(run-application)

     // package org.apache.flink.client.cliprotected void runApplication(String[] args) throws Exception {LOG.info("Running 'run-application' command.");final Options commandOptions = CliFrontendParser.getRunCommandOptions();final CommandLine commandLine = getCommandLine(commandOptions, args, true);if (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRunApplication(customCommandLines);return;}final CustomCommandLine activeCommandLine =validateAndGetActiveCommandLine(checkNotNull(commandLine));//TODO ApplicationClusterDeployer是继承ApplicationDeployer// 这里是重点final ApplicationDeployer deployer =new ApplicationClusterDeployer(clusterClientServiceLoader);final ProgramOptions programOptions;final Configuration effectiveConfiguration;// No need to set a jarFile path for Pyflink job.if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);effectiveConfiguration =getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.emptyList());} else {programOptions = new ProgramOptions(commandLine);programOptions.validate();final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());effectiveConfiguration =getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.singletonList(uri.toString()));}final ApplicationConfiguration applicationConfiguration =new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());deployer.run(effectiveConfiguration, applicationConfiguration);}

1、ApplicationClusterDeployer

// package org.apache.flink.client.deployment.application.cli;
public class ApplicationClusterDeployer implements ApplicationDeployer {private static final Logger LOG = LoggerFactory.getLogger(ApplicationClusterDeployer.class);private final ClusterClientServiceLoader clientServiceLoader;public ApplicationClusterDeployer(final ClusterClientServiceLoader clientServiceLoader) {this.clientServiceLoader = checkNotNull(clientServiceLoader);}public <ClusterID> void run(final Configuration configuration,final ApplicationConfiguration applicationConfiguration)throws Exception {checkNotNull(configuration);checkNotNull(applicationConfiguration);LOG.info("Submitting application in 'Application Mode'.");// 获取ClusterClientFactory,ClusterClientFactory里面包含了clients创建在cluster的一些必要信息final ClusterClientFactory<ClusterID> clientFactory =clientServiceLoader.getClusterClientFactory(configuration);// clusterDescriptor用于在集群部署用于集群通信的客户端的描述器try (final ClusterDescriptor<ClusterID> clusterDescriptor =clientFactory.createClusterDescriptor(configuration)) {// 通过上文获取的clusterDescriptor,启动集群任务的一些基本信息final ClusterSpecification clusterSpecification =clientFactory.getClusterSpecification(configuration);// 启动任务clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);}}
}

1.1、ApplicationDeployer(注意看注释信息)

// package org.apache.flink.client.deployment;
@Internal
public interface ApplicationDeployer {/*** Submits a user program for execution and runs the main user method on the cluster.** @param configuration the configuration containing all the necessary information about*     submitting the user program.* @param applicationConfiguration an {@link ApplicationConfiguration} specific to the*     application to be executed.*/<ClusterID> void run(final Configuration configuration,final ApplicationConfiguration applicationConfiguration)throws Exception;
}

1.2、ClusterSpecification

// package org.apache.flink.client.deployment;
public final class ClusterSpecification {private final int masterMemoryMB;private final int taskManagerMemoryMB;private final int slotsPerTaskManager;private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) {this.masterMemoryMB = masterMemoryMB;this.taskManagerMemoryMB = taskManagerMemoryMB;this.slotsPerTaskManager = slotsPerTaskManager;}public int getMasterMemoryMB() {return masterMemoryMB;}public int getTaskManagerMemoryMB() {return taskManagerMemoryMB;}public int getSlotsPerTaskManager() {return slotsPerTaskManager;}@Overridepublic String toString() {return "ClusterSpecification{"+ "masterMemoryMB="+ masterMemoryMB+ ", taskManagerMemoryMB="+ taskManagerMemoryMB+ ", slotsPerTaskManager="+ slotsPerTaskManager+ '}';}/** Builder for the {@link ClusterSpecification} instance. */public static class ClusterSpecificationBuilder {private int masterMemoryMB = 768;private int taskManagerMemoryMB = 1024;private int slotsPerTaskManager = 1;public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {this.masterMemoryMB = masterMemoryMB;return this;}public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {this.taskManagerMemoryMB = taskManagerMemoryMB;return this;}public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {this.slotsPerTaskManager = slotsPerTaskManager;return this;}public ClusterSpecification createClusterSpecification() {return new ClusterSpecification(masterMemoryMB, taskManagerMemoryMB, slotsPerTaskManager);}}
}

1.3、clusterDescriptor.deployApplicationCluster

 // package org.apache.flink.yarn;    @Overridepublic ClusterClientProvider<ApplicationId> deployApplicationCluster(final ClusterSpecification clusterSpecification,final ApplicationConfiguration applicationConfiguration)throws ClusterDeploymentException {checkNotNull(clusterSpecification);checkNotNull(applicationConfiguration);final YarnDeploymentTarget deploymentTarget =YarnDeploymentTarget.fromConfig(flinkConfiguration);if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster."+ " Expected deployment.target="+ YarnDeploymentTarget.APPLICATION.getName()+ " but actual one was \""+ deploymentTarget.getName()+ "\"");}applicationConfiguration.applyToConfiguration(flinkConfiguration);// No need to do pipelineJars validation if it is a PyFlink job.if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {final List<String> pipelineJars =flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");}try {return deployInternal(clusterSpecification,"Flink Application Cluster",YarnApplicationClusterEntryPoint.class.getName(),null,false);} catch (Exception e) {throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);}}
1.3.1、return deployInternal(…)
 // package org.apache.flink.yarn;   private ClusterClientProvider<ApplicationId> deployInternal(ClusterSpecification clusterSpecification,String applicationName,String yarnClusterEntrypoint,@Nullable JobGraph jobGraph,boolean detached)throws Exception {...isReadyForDeployment(clusterSpecification);// ------------------ Check if the specified queue exists --------------------checkYarnQueues(yarnClient);// ------------------ Check if the YARN ClusterClient has the requested resources// 向yarn的resourcemanager(以下直接说成yarn)申请注册applicationfinal YarnClientApplication yarnApplication = yarnClient.createApplication();// yarn返回注册请求信息(资源等信息)final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();// 分配的最大资源Resource maxRes = appResponse.getMaximumResourceCapability();final ClusterResourceDescription freeClusterMem;try {// 查看集群资源信息: 1.3.1.1freeClusterMem = getCurrentFreeClusterResources(yarnClient);} catch (YarnException | IOException e) {failSessionDuringDeployment(yarnClient, yarnApplication);throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);}...final ClusterSpecification validClusterSpecification;// 这里就确定运行任务的资源了 master,taskManagertry {validClusterSpecification =validateClusterResources(clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);} catch (YarnDeploymentException yde) {failSessionDuringDeployment(yarnClient, yarnApplication);throw yde;}... // 启动AppMasterApplicationReport report =startAppMaster(flinkConfiguration,applicationName,yarnClusterEntrypoint,jobGraph,yarnClient,yarnApplication,validClusterSpecification);...}
1.3.1.1 getCurrentFreeClusterResources
 // package org.apache.flink.yarn;    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient)throws YarnException, IOException {// 可用节点List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);int totalFreeMemory = 0;int containerLimit = 0;int[] nodeManagersFree = new int[nodes.size()];// 从节点获取内存资源信息for (int i = 0; i < nodes.size(); i++) {NodeReport rep = nodes.get(i);int free =rep.getCapability().getMemory()- (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);nodeManagersFree[i] = free;totalFreeMemory += free;if (free > containerLimit) {containerLimit = free;}}// 返回总空闲内存资源,最大可用单节点资源,各节点可用资源return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);}
1.3.1.2 startAppMaster(很长。慢慢看)
//package org.apache.flink.yarn;
private ApplicationReport startAppMaster(Configuration configuration,String applicationName,String yarnClusterEntrypoint,JobGraph jobGraph,YarnClient yarnClient,YarnClientApplication yarnApplication,ClusterSpecification clusterSpecification)throws Exception {...final FileSystem fs = FileSystem.get(yarnConfiguration);...ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();final List<Path> providedLibDirs =Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);Path stagingDirPath = getStagingDir(fs);FileSystem stagingDirFs = stagingDirPath.getFileSystem(yarnConfiguration);final YarnApplicationFileUploader fileUploader =YarnApplicationFileUploader.from(stagingDirFs,stagingDirPath,providedLibDirs,appContext.getApplicationId(),getFileReplication());// The files need to be shipped and added to classpath.Set<File> systemShipFiles = new HashSet<>(shipFiles.size());for (File file : shipFiles) {systemShipFiles.add(file.getAbsoluteFile());}final String logConfigFilePath =configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);if (logConfigFilePath != null) {systemShipFiles.add(new File(logConfigFilePath));}// Set-up ApplicationSubmissionContext for the applicationfinal ApplicationId appId = appContext.getApplicationId();// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------setHAClusterIdIfNotSet(configuration, appId);if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {// activate re-execution of failed applicationsappContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(),YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));activateHighAvailabilitySupport(appContext);} else {// set number of application retries to 1 in the default caseappContext.setMaxAppAttempts(configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));}final Set<Path> userJarFiles = new HashSet<>();if (jobGraph != null) {userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));}final List<URI> jarUrls =ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);if (jarUrls != null&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));}// only for per job modeif (jobGraph != null) {for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :jobGraph.getUserArtifacts().entrySet()) {// only upload local filesif (!Utils.isRemotePath(entry.getValue().filePath)) {Path localPath = new Path(entry.getValue().filePath);Tuple2<Path, Long> remoteFileInfo =fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());}}jobGraph.writeUserArtifactEntriesToConfiguration();}if (providedLibDirs == null || providedLibDirs.isEmpty()) {addLibFoldersToShipFiles(systemShipFiles);}// Register all files in provided lib dirs as local resources with public visibility// and upload the remaining dependencies as local resources with APPLICATION visibility.final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();final List<String> uploadedDependencies =fileUploader.registerMultipleLocalResources(systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.FILE);systemClassPaths.addAll(uploadedDependencies);// upload and register ship-only files// Plugin files only need to be shipped and should not be added to classpath.if (providedLibDirs == null || providedLibDirs.isEmpty()) {Set<File> shipOnlyFiles = new HashSet<>();addPluginsFoldersToShipFiles(shipOnlyFiles);fileUploader.registerMultipleLocalResources(shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.FILE);}if (!shipArchives.isEmpty()) {fileUploader.registerMultipleLocalResources(shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.ARCHIVE);}// only for application mode// Python jar file only needs to be shipped and should not be added to classpath.if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)&& PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) {fileUploader.registerMultipleLocalResources(Collections.singletonList(new Path(PackagedProgramUtils.getPythonJar().toURI())),ConfigConstants.DEFAULT_FLINK_OPT_DIR,LocalResourceType.FILE);}// Upload and register user jarsfinal List<String> userClassPaths =fileUploader.registerMultipleLocalResources(userJarFiles,userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR: Path.CUR_DIR,LocalResourceType.FILE);// usrlib will be automatically shipped if it exists.if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {final Set<File> usrLibShipFiles = new HashSet<>();addUsrLibFolderToShipFiles(usrLibShipFiles);final List<String> usrLibClassPaths =fileUploader.registerMultipleLocalResources(usrLibShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),Path.CUR_DIR,LocalResourceType.FILE);userClassPaths.addAll(usrLibClassPaths);}if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {systemClassPaths.addAll(userClassPaths);}// normalize classpath by sortingCollections.sort(systemClassPaths);Collections.sort(userClassPaths);// classpath assemblerStringBuilder classPathBuilder = new StringBuilder();if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {for (String userClassPath : userClassPaths) {classPathBuilder.append(userClassPath).append(File.pathSeparator);}}for (String classPath : systemClassPaths) {classPathBuilder.append(classPath).append(File.pathSeparator);}// Setup jar for ApplicationMasterfinal YarnLocalResourceDescriptor localResourceDescFlinkJar =fileUploader.uploadFlinkDist(flinkJarPath);classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator);// write job graph to tmp file and add it to local resource// TODO: server use user main method to generate job graphif (jobGraph != null) {File tmpJobGraphFile = null;try {tmpJobGraphFile = File.createTempFile(appId.toString(), null);try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);ObjectOutputStream obOutput = new ObjectOutputStream(output)) {obOutput.writeObject(jobGraph);}final String jobGraphFilename = "job.graph";configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);fileUploader.registerSingleLocalResource(jobGraphFilename,new Path(tmpJobGraphFile.toURI()),"",LocalResourceType.FILE,true,false);classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);} catch (Exception e) {LOG.warn("Add job graph to local resource fail.");throw e;} finally {if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());}}}// Upload the flink configuration// write out configuration fileFile tmpConfigurationFile = null;try {tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);// remove localhost bind hosts as they render production clusters unusableremoveLocalhostBindHostSetting(configuration, JobManagerOptions.BIND_HOST);removeLocalhostBindHostSetting(configuration, TaskManagerOptions.BIND_HOST);// this setting is unconditionally overridden anyway, so we remove it for clarityconfiguration.removeConfig(TaskManagerOptions.HOST);BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);String flinkConfigKey = "flink-conf.yaml";fileUploader.registerSingleLocalResource(flinkConfigKey,new Path(tmpConfigurationFile.getAbsolutePath()),"",LocalResourceType.FILE,true,true);classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);} finally {if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());}}if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {for (String userClassPath : userClassPaths) {classPathBuilder.append(userClassPath).append(File.pathSeparator);}}// To support Yarn Secure Integration Test Scenario// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have// the Yarn site XML// and KRB5 configuration files. We are adding these files as container local resources for// the container// applications (JM/TMs) to have proper secure cluster setupPath remoteYarnSiteXmlPath = null;if (System.getenv("IN_TESTS") != null) {File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);LOG.info("Adding Yarn configuration {} to the AM container local resource bucket",f.getAbsolutePath());Path yarnSitePath = new Path(f.getAbsolutePath());remoteYarnSiteXmlPath =fileUploader.registerSingleLocalResource(Utils.YARN_SITE_FILE_NAME,yarnSitePath,"",LocalResourceType.FILE,false,false).getPath();if (System.getProperty("java.security.krb5.conf") != null) {configuration.set(SecurityOptions.KERBEROS_KRB5_PATH,System.getProperty("java.security.krb5.conf"));}}Path remoteKrb5Path = null;boolean hasKrb5 = false;String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {final File krb5 = new File(krb5Config);LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket",krb5.getAbsolutePath());final Path krb5ConfPath = new Path(krb5.getAbsolutePath());remoteKrb5Path =fileUploader.registerSingleLocalResource(Utils.KRB5_FILE_NAME,krb5ConfPath,"",LocalResourceType.FILE,false,false).getPath();hasKrb5 = true;}Path remotePathKeytab = null;String localizedKeytabPath = null;String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);if (keytab != null) {boolean localizeKeytab =flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);localizedKeytabPath =flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);if (localizeKeytab) {// Localize the keytab to YARN containers via local resource.LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);remotePathKeytab =fileUploader.registerSingleLocalResource(localizedKeytabPath,new Path(keytab),"",LocalResourceType.FILE,false,false).getPath();} else {// // Assume Keytab is pre-installed in the container.localizedKeytabPath =flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);}}final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);final ContainerLaunchContext amContainer =setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);// setup security tokensif (UserGroupInformation.isSecurityEnabled()) {// set HDFS delegation tokens when security is enabledLOG.info("Adding delegation token to the AM container.");final List<Path> pathsToObtainToken = new ArrayList<>();boolean fetchToken =configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);if (fetchToken) {List<Path> yarnAccessList =ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.YARN_ACCESS, Path::new);pathsToObtainToken.addAll(yarnAccessList);pathsToObtainToken.addAll(fileUploader.getRemotePaths());}Utils.setTokensFor(amContainer, pathsToObtainToken, yarnConfiguration, fetchToken);}amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());fileUploader.close();// Setup CLASSPATH and environment variables for ApplicationMasterfinal Map<String, String> appMasterEnv = new HashMap<>();// set user specified app master environment variablesappMasterEnv.putAll(ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));// set Flink app class pathappMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());// Set FLINK_OPT_DIR to `opt` folder under working dir in containerappMasterEnv.put(ENV_FLINK_OPT_DIR, Path.CUR_DIR + "/" + ConfigConstants.DEFAULT_FLINK_OPT_DIR);// set Flink on YARN internal configuration valuesappMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_nameappMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());if (localizedKeytabPath != null) {appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);if (remotePathKeytab != null) {appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());}}// To support Yarn Secure Integration Test Scenarioif (remoteYarnSiteXmlPath != null) {appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());}if (remoteKrb5Path != null) {appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());}// set classpath from YARN configurationUtils.setupYarnClassPath(yarnConfiguration, appMasterEnv);amContainer.setEnvironment(appMasterEnv);// Set up resource type requirements for ApplicationMasterResource capability = Records.newRecord(Resource.class);capability.setMemory(clusterSpecification.getMasterMemoryMB());capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));final String customApplicationName = customName != null ? customName : applicationName;appContext.setApplicationName(customApplicationName);appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");appContext.setAMContainerSpec(amContainer);appContext.setResource(capability);// Set priority for applicationint priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);if (priorityNum >= 0) {Priority priority = Priority.newInstance(priorityNum);appContext.setPriority(priority);}if (yarnQueue != null) {appContext.setQueue(yarnQueue);}setApplicationNodeLabel(appContext);setApplicationTags(appContext);// add a hook to clean up in case deployment failsThread deploymentFailureHook =new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());Runtime.getRuntime().addShutdownHook(deploymentFailureHook);LOG.info("Submitting application master " + appId);yarnClient.submitApplication(appContext);LOG.info("Waiting for the cluster to be allocated");final long startTime = System.currentTimeMillis();ApplicationReport report;YarnApplicationState lastAppState = YarnApplicationState.NEW;loop:while (true) {try {report = yarnClient.getApplicationReport(appId);} catch (IOException e) {throw new YarnDeploymentException("Failed to deploy the cluster.", e);}YarnApplicationState appState = report.getYarnApplicationState();LOG.debug("Application State: {}", appState);switch (appState) {case FAILED:case KILLED:throw new YarnDeploymentException("The YARN application unexpectedly switched to state "+ appState+ " during deployment. \n"+ "Diagnostics from YARN: "+ report.getDiagnostics()+ "\n"+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"+ "yarn logs -applicationId "+ appId);// break ..case RUNNING:LOG.info("YARN application has been deployed successfully.");break loop;case FINISHED:LOG.info("YARN application has been finished successfully.");break loop;default:if (appState != lastAppState) {LOG.info("Deploying cluster, current state " + appState);}if (System.currentTimeMillis() - startTime > 60000) {LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");}}lastAppState = appState;Thread.sleep(250);}// since deployment was successful, remove the hookShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);return report;}

flink1.15源码笔记(run模式简单带过,主要看run-application)相关推荐

  1. Flink1.15源码解析--启动TaskManager

    文章目录 一.前言 二.TaskManagerRunner 2.1.创建 TaskManagerRunner 2.1.1.创建 TaskExecutorService, 用于创建 TaskExecut ...

  2. 铁山靠!阿里P9架构师写的这份JDK源码笔记,竟直接带火了GitHub

    众所周知,阅读源码是一件很累的事情,但是同时也是一件能让我们收获很多东西的事情.比如,有些原理搞不懂,但是通过阅读源码就可以让我们恍然大悟. 同时当下的面试也是十分重视考察源码,所以现在去阅读源码已经 ...

  3. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  4. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文.这里是补充专栏.请看原文 2. 前言 主要针对yarn-per-job模式进行代码分析. ...

  5. redis源码笔记 - 刘浩de技术博客 - 博客园

    redis源码笔记 - 刘浩de技术博客 - 博客园 redis源码笔记 - 刘浩de技术博客 - 博客园 redis源码笔记 记录发现的一个hiredis的bug 摘要: hiredis是redis ...

  6. android4.0.3源码之硬件gps简单移植

    [转]我和菜鸟一起学android4.0.3源码之硬件gps简单移植 2013-7-5阅读94 评论0 关于android定位方式 android 定位一般有四种方法,这四种方式分别是GPS定位.WI ...

  7. SpringBoot源码笔记分析

    SpringBoot源码笔记分析 1.基础 1.配置SpringBoot热部署 1.引入依赖 <dependency><groupId>org.springframework. ...

  8. vue 计算属性_lt;Vue 源码笔记系列6gt;计算属性 computed 的实现

    1. 前言 原文发布在语雀: <Vue 源码笔记系列6>计算属性 computed 的实现 · 语雀​www.yuque.com 上一章我们已经学习过 watch,这一章就来看一下计算属性 ...

  9. vue如何让一句代码只执行一次_lt;Vue 源码笔记系列4gt;异步更新队列与$nextTick...

    1. 前言 原文发布在语雀: <Vue 源码笔记系列4>异步更新队列与$nextTick · 语雀​www.yuque.com 上一章我们讲到了修改数据是如何触发渲染函数的观察者,最终调用 ...

最新文章

  1. 在Centos 6.5 上面配置 SVN
  2. java ee监听器编程,java EE开发之Servlet第四课:监听器(Listener)
  3. JAVA获取JVM内存空间和物理内存空间
  4. 【记录】IDEA未正确关闭导致打开报错,进不了主界面,含解决办法
  5. html5boder属性,你未必知道的CSS小知识:border属性比你想象的要复杂
  6. Autodesk FBX
  7. nexus、maven私服仓库(一)
  8. 《Linux shell变量总结回顾》RHEL6(转)
  9. nodeJS丶Buff使用及相关API
  10. 联想启天m430安装黑苹果 10500 big sur 11.6
  11. HP台式计算机不能启动,惠普电脑不能启动怎么处理
  12. java面向对象题目_经典面向对象题,用Java做,要详细点的。
  13. 在京东工作是一种什么样的体验
  14. C语言|《C Primer Plus》|数据类型
  15. 小明左、右手中分别拿两张纸牌(比如:黑桃10和红桃8,数字10和8可通过键盘录入),要求编写代码交换小明手中的牌
  16. 规划xr871实现儿童故事机的基本功能
  17. Android磁盘管理-之vold源码分析(2)
  18. htonl,htons 和 ntohs相关问题
  19. Oracle删除表空间导致报错无法登录解决
  20. 不可或缺的ASP.NET内置对象

热门文章

  1. Stm32 四位数码管
  2. 电信,请您挺起胸膛赚钱
  3. TypeScript学习02-- 面向对象(完结版)
  4. 从零手写一个深拷贝(进阶篇)
  5. Protege初学者
  6. BI Publisher论坛
  7. 【字节跳动笔试题】万万没想到之聪明的编辑
  8. 详解shell中的几种标准输出重定向方式
  9. 如果当初丘处机没有路过牛家村会怎样?
  10. css 背景颜色默认,CSS 基础——颜色和背景属性