本文主要研究一下flink JobManager的High Availability

配置

flink-conf.yaml

high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share
  • high-availability的可选值为NONE或者zookeeper;high-availability.zookeeper.quorum用于指定zookeeper的peers;high-availability.zookeeper.path.root用于指定在zookeeper的root node路径;high-availability.cluster-id用于指定当前cluster的node名称,该cluster node位于root node下面;high-availability.storageDir用于指定JobManager metadata的存储路径

masters文件

localhost:8081
localhost:8082
  • masters文件用于指定jobmanager的地址

HighAvailabilityMode

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java

public enum HighAvailabilityMode {NONE(false),ZOOKEEPER(true),FACTORY_CLASS(true);private final boolean haActive;HighAvailabilityMode(boolean haActive) {this.haActive = haActive;}/*** Return the configured {@link HighAvailabilityMode}.** @param config The config to parse* @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not* configured.*/public static HighAvailabilityMode fromConfig(Configuration config) {String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);if (haMode == null) {return HighAvailabilityMode.NONE;} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {// Map old default to new defaultreturn HighAvailabilityMode.NONE;} else {try {return HighAvailabilityMode.valueOf(haMode.toUpperCase());} catch (IllegalArgumentException e) {return FACTORY_CLASS;}}}/*** Returns true if the defined recovery mode supports high availability.** @param configuration Configuration which contains the recovery mode* @return true if high availability is supported by the recovery mode, otherwise false*/public static boolean isHighAvailabilityModeActivated(Configuration configuration) {HighAvailabilityMode mode = fromConfig(configuration);return mode.haActive;}
}
  • HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability

HighAvailabilityOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java

@PublicEvolving
@ConfigGroups(groups = {@ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {// ------------------------------------------------------------------------//  Required High Availability Options// ------------------------------------------------------------------------/*** Defines high-availability mode used for the cluster execution.* A value of "NONE" signals no highly available setup.* To enable high-availability, set this mode to "ZOOKEEPER".* Can also be set to FQN of HighAvailability factory class.*/@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)public static final ConfigOption<String> HA_MODE =key("high-availability").defaultValue("NONE").withDeprecatedKeys("recovery.mode").withDescription("Defines high-availability mode used for the cluster execution." +" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");/*** The ID of the Flink cluster, used to separate multiple Flink clusters* Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.*/public static final ConfigOption<String> HA_CLUSTER_ID =key("high-availability.cluster-id").defaultValue("/default").withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace").withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +" Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");/*** File system path (URI) where Flink persists metadata in high-availability setups.*/@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)public static final ConfigOption<String> HA_STORAGE_PATH =key("high-availability.storageDir").noDefaultValue().withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir").withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");// ------------------------------------------------------------------------//  Recovery Options// ------------------------------------------------------------------------/*** Optional port (range) used by the job manager in high-availability mode.*/public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =key("high-availability.jobmanager.port").defaultValue("0").withDeprecatedKeys("recovery.jobmanager.port").withDescription("Optional port (range) used by the job manager in high-availability mode.");/*** The time before a JobManager after a fail over recovers the current jobs.*/public static final ConfigOption<String> HA_JOB_DELAY =key("high-availability.job.delay").noDefaultValue().withDeprecatedKeys("recovery.job.delay").withDescription("The time before a JobManager after a fail over recovers the current jobs.");// ------------------------------------------------------------------------//  ZooKeeper Options// ------------------------------------------------------------------------/*** The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.*/public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =key("high-availability.zookeeper.quorum").noDefaultValue().withDeprecatedKeys("recovery.zookeeper.quorum").withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");/*** The root path under which Flink stores its entries in ZooKeeper.*/public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =key("high-availability.zookeeper.path.root").defaultValue("/flink").withDeprecatedKeys("recovery.zookeeper.path.root").withDescription("The root path under which Flink stores its entries in ZooKeeper.");public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =key("high-availability.zookeeper.path.latch").defaultValue("/leaderlatch").withDeprecatedKeys("recovery.zookeeper.path.latch").withDescription("Defines the znode of the leader latch which is used to elect the leader.");/** ZooKeeper root path (ZNode) for job graphs. */public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =key("high-availability.zookeeper.path.jobgraphs").defaultValue("/jobgraphs").withDeprecatedKeys("recovery.zookeeper.path.jobgraphs").withDescription("ZooKeeper root path (ZNode) for job graphs");public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =key("high-availability.zookeeper.path.leader").defaultValue("/leader").withDeprecatedKeys("recovery.zookeeper.path.leader").withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +" leader session ID.");/** ZooKeeper root path (ZNode) for completed checkpoints. */public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =key("high-availability.zookeeper.path.checkpoints").defaultValue("/checkpoints").withDeprecatedKeys("recovery.zookeeper.path.checkpoints").withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");/** ZooKeeper root path (ZNode) for checkpoint counters. */public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =key("high-availability.zookeeper.path.checkpoint-counter").defaultValue("/checkpoint-counter").withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter").withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");/** ZooKeeper root path (ZNode) for Mesos workers. */@PublicEvolvingpublic static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =key("high-availability.zookeeper.path.mesos-workers").defaultValue("/mesos-workers").withDeprecatedKeys("recovery.zookeeper.path.mesos-workers").withDescription(Description.builder().text("The ZooKeeper root path for persisting the Mesos worker information.").build());// ------------------------------------------------------------------------//  ZooKeeper Client Settings// ------------------------------------------------------------------------public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =key("high-availability.zookeeper.client.session-timeout").defaultValue(60000).withDeprecatedKeys("recovery.zookeeper.client.session-timeout").withDescription("Defines the session timeout for the ZooKeeper session in ms.");public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =key("high-availability.zookeeper.client.connection-timeout").defaultValue(15000).withDeprecatedKeys("recovery.zookeeper.client.connection-timeout").withDescription("Defines the connection timeout for ZooKeeper in ms.");public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =key("high-availability.zookeeper.client.retry-wait").defaultValue(5000).withDeprecatedKeys("recovery.zookeeper.client.retry-wait").withDescription("Defines the pause between consecutive retries in ms.");public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =key("high-availability.zookeeper.client.max-retry-attempts").defaultValue(3).withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts").withDescription("Defines the number of connection retries before the client gives up.");public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =key("high-availability.zookeeper.path.running-registry").defaultValue("/running_job_registry/");public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =key("high-availability.zookeeper.client.acl").defaultValue("open").withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +" set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +" SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");// ------------------------------------------------------------------------/** Not intended to be instantiated. */private HighAvailabilityOptions() {}
}
  • HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项

HighAvailabilityServicesUtils

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java

public class HighAvailabilityServicesUtils {public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config,Executor executor) throws Exception {HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);switch (highAvailabilityMode) {case NONE:return new EmbeddedHaServices(executor);case ZOOKEEPER:BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(config),executor,config,blobStoreService);case FACTORY_CLASS:return createCustomHAServices(config, executor);default:throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");}}public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration,Executor executor,AddressResolution addressResolution) throws Exception {HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);switch (highAvailabilityMode) {case NONE:final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,JobMaster.JOB_MANAGER_NAME,addressResolution,configuration);final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,ResourceManager.RESOURCE_MANAGER_NAME,addressResolution,configuration);final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,Dispatcher.DISPATCHER_NAME,addressResolution,configuration);final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),"%s must be set",RestOptions.ADDRESS.key());final int port = configuration.getInteger(RestOptions.PORT);final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);final String protocol = enableSSL ? "https://" : "http://";return new StandaloneHaServices(resourceManagerRpcUrl,dispatcherRpcUrl,jobManagerRpcUrl,String.format("%s%s:%s", protocol, address, port));case ZOOKEEPER:BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration),executor,configuration,blobStoreService);case FACTORY_CLASS:return createCustomHAServices(configuration, executor);default:throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");}}/*** Returns the JobManager's hostname and port extracted from the given* {@link Configuration}.** @param configuration Configuration to extract the JobManager's address from* @return The JobManager's hostname and port* @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration*/public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {final String hostname = configuration.getString(JobManagerOptions.ADDRESS);final int port = configuration.getInteger(JobManagerOptions.PORT);if (hostname == null) {throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +"' is missing (hostname/address of JobManager to connect to).");}if (port <= 0 || port >= 65536) {throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +"' (port of the JobManager actor system) : " + port +".  it must be greater than 0 and less than 65536.");}return Tuple2.of(hostname, port);}private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);final HighAvailabilityServicesFactory highAvailabilityServicesFactory;try {highAvailabilityServicesFactory = InstantiationUtil.instantiate(haServicesClassName,HighAvailabilityServicesFactory.class,classLoader);} catch (Exception e) {throw new FlinkException(String.format("Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",haServicesClassName),e);}try {return highAvailabilityServicesFactory.createHAServices(config, executor);} catch (Exception e) {throw new FlinkException(String.format("Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",haServicesClassName),e);}}/*** Enum specifying whether address resolution should be tried or not when creating the* {@link HighAvailabilityServices}.*/public enum AddressResolution {TRY_ADDRESS_RESOLUTION,NO_ADDRESS_RESOLUTION}
}
  • HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices
  • 其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
  • HighAvailabilityServicesUtils还提供了getJobManagerAddress静态方法,用于获取JobManager的hostname及port

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

/*** The HighAvailabilityServices give access to all services needed for a highly-available* setup. In particular, the services provide access to highly available storage and* registries, as well as distributed counters and leader election.* * <ul>*     <li>ResourceManager leader election and leader retrieval</li>*     <li>JobManager leader election and leader retrieval</li>*     <li>Persistence for checkpoint metadata</li>*     <li>Registering the latest completed checkpoint(s)</li>*     <li>Persistence for the BLOB store</li>*     <li>Registry that marks a job's status</li>*     <li>Naming of RPC endpoints</li>* </ul>*/
public interface HighAvailabilityServices extends AutoCloseable {// ------------------------------------------------------------------------//  Constants// ------------------------------------------------------------------------/*** This UUID should be used when no proper leader election happens, but a simple* pre-configured leader is used. That is for example the case in non-highly-available* standalone setups.*/UUID DEFAULT_LEADER_ID = new UUID(0, 0);/*** This JobID should be used to identify the old JobManager when using the* {@link HighAvailabilityServices}. With the new mode every JobMaster will have a* distinct JobID assigned.*/JobID DEFAULT_JOB_ID = new JobID(0L, 0L);// ------------------------------------------------------------------------//  Services// ------------------------------------------------------------------------/*** Gets the leader retriever for the cluster's resource manager.*/LeaderRetrievalService getResourceManagerLeaderRetriever();/*** Gets the leader retriever for the dispatcher. This leader retrieval service* is not always accessible.*/LeaderRetrievalService getDispatcherLeaderRetriever();/*** Gets the leader retriever for the job JobMaster which is responsible for the given job** @param jobID The identifier of the job.* @return Leader retrieval service to retrieve the job manager for the given job* @deprecated This method should only be used by the legacy code where the JobManager acts as the master.*/@DeprecatedLeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);/*** Gets the leader retriever for the job JobMaster which is responsible for the given job** @param jobID The identifier of the job.* @param defaultJobManagerAddress JobManager address which will be returned by*                              a static leader retrieval service.* @return Leader retrieval service to retrieve the job manager for the given job*/LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);LeaderRetrievalService getWebMonitorLeaderRetriever();/*** Gets the leader election service for the cluster's resource manager.** @return Leader election service for the resource manager leader election*/LeaderElectionService getResourceManagerLeaderElectionService();/*** Gets the leader election service for the cluster's dispatcher.** @return Leader election service for the dispatcher leader election*/LeaderElectionService getDispatcherLeaderElectionService();/*** Gets the leader election service for the given job.** @param jobID The identifier of the job running the election.* @return Leader election service for the job manager leader election*/LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);LeaderElectionService getWebMonitorLeaderElectionService();/*** Gets the checkpoint recovery factory for the job manager** @return Checkpoint recovery factory*/CheckpointRecoveryFactory getCheckpointRecoveryFactory();/*** Gets the submitted job graph store for the job manager** @return Submitted job graph store* @throws Exception if the submitted job graph store could not be created*/SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;/*** Gets the registry that holds information about whether jobs are currently running.** @return Running job registry to retrieve running jobs*/RunningJobsRegistry getRunningJobsRegistry() throws Exception;/*** Creates the BLOB store in which BLOBs are stored in a highly-available fashion.** @return Blob store* @throws IOException if the blob store could not be created*/BlobStore createBlobStore() throws IOException;// ------------------------------------------------------------------------//  Shutdown and Cleanup// ------------------------------------------------------------------------/*** Closes the high availability services, releasing all resources.* * <p>This method <b>does not delete or clean up</b> any data stored in external stores* (file systems, ZooKeeper, etc). Another instance of the high availability* services will be able to recover the job.* * <p>If an exception occurs during closing services, this method will attempt to* continue closing other services and report exceptions only after all services* have been attempted to be closed.** @throws Exception Thrown, if an exception occurred while closing these services.*/@Overridevoid close() throws Exception;/*** Closes the high availability services (releasing all resources) and deletes* all data stored by these services in external stores.* * <p>After this method was called, the any job or session that was managed by* these high availability services will be unrecoverable.* * <p>If an exception occurs during cleanup, this method will attempt to* continue the cleanup and report exceptions only after all cleanup steps have* been attempted.* * @throws Exception Thrown, if an exception occurred while closing these services*                   or cleaning up data stored by them.*/void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法

ZooKeeperHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java

/*** An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.* The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:* * <pre>* /flink*      +/cluster_id_1/resource_manager_lock*      |            |*      |            +/job-id-1/job_manager_lock*      |            |         /checkpoints/latest*      |            |                     /latest-1*      |            |                     /latest-2*      |            |*      |            +/job-id-2/job_manager_lock*      |      *      +/cluster_id_2/resource_manager_lock*                   |*                   +/job-id-1/job_manager_lock*                            |/checkpoints/latest*                            |            /latest-1*                            |/persisted_job_graph* </pre>* * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.* This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to* accommodate specific permission.* * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job* on a framework like YARN or Mesos (in a "per-job-cluster" mode).* * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured* automatically by the client or dispatcher that submits the Job to YARN or Mesos.* * <p>In the case of a standalone cluster, that cluster-id needs to be configured via* {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same* cluster and participate in the execution of the same set of jobs.*/
public class ZooKeeperHaServices implements HighAvailabilityServices {private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";// ------------------------------------------------------------------------/** The ZooKeeper client to use */private final CuratorFramework client;/** The executor to run ZooKeeper callbacks on */private final Executor executor;/** The runtime configuration */private final Configuration configuration;/** The zookeeper based running jobs registry */private final RunningJobsRegistry runningJobsRegistry;/** Store for arbitrary blobs */private final BlobStoreService blobStoreService;public ZooKeeperHaServices(CuratorFramework client,Executor executor,Configuration configuration,BlobStoreService blobStoreService) {this.client = checkNotNull(client);this.executor = checkNotNull(executor);this.configuration = checkNotNull(configuration);this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);this.blobStoreService = checkNotNull(blobStoreService);}// ------------------------------------------------------------------------//  Services// ------------------------------------------------------------------------@Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);}@Overridepublic LeaderRetrievalService getDispatcherLeaderRetriever() {return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);}@Overridepublic LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));}@Overridepublic LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {return getJobManagerLeaderRetriever(jobID);}@Overridepublic LeaderRetrievalService getWebMonitorLeaderRetriever() {return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);}@Overridepublic LeaderElectionService getResourceManagerLeaderElectionService() {return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);}@Overridepublic LeaderElectionService getDispatcherLeaderElectionService() {return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);}@Overridepublic LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));}@Overridepublic LeaderElectionService getWebMonitorLeaderElectionService() {return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);}@Overridepublic CheckpointRecoveryFactory getCheckpointRecoveryFactory() {return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);}@Overridepublic SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);}@Overridepublic RunningJobsRegistry getRunningJobsRegistry() {return runningJobsRegistry;}@Overridepublic BlobStore createBlobStore() throws IOException {return blobStoreService;}// ------------------------------------------------------------------------//  Shutdown// ------------------------------------------------------------------------@Overridepublic void close() throws Exception {Throwable exception = null;try {blobStoreService.close();} catch (Throwable t) {exception = t;}internalClose();if (exception != null) {ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");}}@Overridepublic void closeAndCleanupAllData() throws Exception {LOG.info("Close and clean up all data for ZooKeeperHaServices.");Throwable exception = null;try {blobStoreService.closeAndCleanupAllData();} catch (Throwable t) {exception = t;}internalClose();if (exception != null) {ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");}}/*** Closes components which don't distinguish between close and closeAndCleanupAllData*/private void internalClose() {client.close();}// ------------------------------------------------------------------------//  Utilities// ------------------------------------------------------------------------private static String getPathForJobManager(final JobID jobID) {return "/" + jobID + JOB_MANAGER_LEADER_PATH;}
}
  • ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs

JobClient.submitJob

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/client/JobClient.java

public class JobClient {private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);//....../*** Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be* passed to {@code awaitJobResult} to get the result of the submission.* @return JobListeningContext which may be used to retrieve the JobExecutionResult via*             {@code awaitJobResult(JobListeningContext context)}.*/public static JobListeningContext submitJob(ActorSystem actorSystem,Configuration config,HighAvailabilityServices highAvailabilityServices,JobGraph jobGraph,FiniteDuration timeout,boolean sysoutLogUpdates,ClassLoader classLoader) {checkNotNull(actorSystem, "The actorSystem must not be null.");checkNotNull(highAvailabilityServices, "The high availability services must not be null.");checkNotNull(jobGraph, "The jobGraph must not be null.");checkNotNull(timeout, "The timeout must not be null.");// for this job, we create a proxy JobClientActor that deals with all communication with// the JobManager. It forwards the job submission, checks the success/failure responses, logs// update messages, watches for disconnect between client and JobManager, ...Props jobClientActorProps = JobSubmissionClientActor.createActorProps(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),timeout,sysoutLogUpdates,config);ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);Future<Object> submissionFuture = Patterns.ask(jobClientActor,new JobClientMessages.SubmitJobAndWait(jobGraph),new Timeout(AkkaUtils.INF_TIMEOUT()));return new JobListeningContext(jobGraph.getJobID(),submissionFuture,jobClientActor,timeout,classLoader,highAvailabilityServices);}//......
}
  • 像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job

小结

  • HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability;HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
  • HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices;其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法;ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs;像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job

doc

  • JobManager High Availability (HA)

聊聊flink JobManager的High Availability相关推荐

  1. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  2. Flink JobManager的HA原理分析

    文章目录 前言 JobManager的HA切换通知 利用Zookeeper的领导选举与消息通知 引用 前言 在中心式管理的系统里,主节点如果只是单独服务部署的话,或多或少都会存在单点瓶颈(SPOF)问 ...

  3. 【Flink】 Flink JobManager HA 机制的扩展与实现

    1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...

  4. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  5. Flink JobManager HA模式部署(基于Standalone)

    参考文章:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability. ...

  6. 聊聊flink的OperatorStateBackend

    序 本文主要研究一下flink的OperatorStateBackend OperatorStateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/ ...

  7. Flink – JobManager.submitJob

    JobManager作为actor, case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInf ...

  8. Flink : Flink JobManager报错 akka.pattern.AskTimeoutException: Ask timed out on

    1.美图 2.背景 Flink 1.10 JobManager报错 错误信息如下 2020-04-02 14:38:26,867 INFO org.apache.flink.runtime.execu ...

  9. Flink JobManager占用注册端口引起的小问题

    继续超短流水账一篇. 今天午饭时间,一个向来非常稳定的Flink on YARN任务忽然持续报警.查看TaskManager日志均没有问题,但JobManager日志内报出大量Connection r ...

  10. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

最新文章

  1. 揭秘企业级web负载均衡完美架构(转载)
  2. return 函数
  3. Intel Realsense D435 python (Python Wrapper)example -1: quick start (快速开始)
  4. 82 个代码案例实践,带你学好 Python 机器学习
  5. for,foreach,iterator的用法和区别
  6. 无心剑中译狄金森诗36首
  7. iPhone 12延期恐实锤:台积电5nm A14芯片将延期3个月
  8. python合并大量ts文件_python爬取基于m3u8协议的ts文件并合并
  9. 秘密:从程序员到领导者的微妙之处
  10. 超低频测试信号产生电路软件流程图,一种0_20Hz超低频信号发生器的设计与实现...
  11. Android项目“error: Apostrophe not preceded by \ (”报错解决方法
  12. react 命名规范 书写顺序
  13. 学计算机可以买ipad吗,考研党究竟有必要买ipad吗?买、买大个的
  14. Android对话框和帧动画
  15. 【计算机网络】 如何看懂路由表
  16. 计算机没有usb视频教程,Win7已安装但没有USB3.0驱动如何安装教程
  17. C++ 算法设计与分析 地图着色问题(中国+美国)
  18. 逻辑仿真工具VCS的使用-Makefile
  19. php开源商城 yii,Yii2开源电商商城Fecshop
  20. NPE和CPE的区别

热门文章

  1. Atitit 常用的登录认证法 目录 2. 表单验证 1 3. OAuth 认证 1 4. Web票据模式验证 1 4.1. Token验证 1 4.2. Cookie-Session 认证 1
  2. Atitit attilax要工作研究的要素 纪要 方案 趋势 方向 概念 理论
  3. Atitit.常用的gc算法
  4. atitit.查看预编译sql问号 本质and原理and查看原生sql语句
  5. atitit.技术选型方法总结为什么java就是比.net有前途
  6. PAIP.提升效率----论项目知识库的建设。。
  7. paip.提升安全性----.net C#源码防止反编译以及源码加密
  8. paip.提升开发效率之查询界面
  9. paip.asp vbs通过CLI命令行调用PHP代码
  10. Rust : RUST_BACKTRACE=1错误提示