聊聊flink的KvStateRegistryGateway
为什么80%的码农都做不了架构师?>>>
序
本文主要研究一下flink的KvStateRegistryGateway
KvStateRegistryGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java
public interface KvStateRegistryGateway {/*** Notifies that queryable state has been registered.** @param jobId identifying the job for which to register a key value state* @param jobVertexId JobVertexID the KvState instance belongs to.* @param keyGroupRange Key group range the KvState instance belongs to.* @param registrationName Name under which the KvState has been registered.* @param kvStateId ID of the registered KvState instance.* @param kvStateServerAddress Server address where to find the KvState instance.* @return Future acknowledge if the key-value state has been registered*/CompletableFuture<Acknowledge> notifyKvStateRegistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName,final KvStateID kvStateId,final InetSocketAddress kvStateServerAddress);/*** Notifies that queryable state has been unregistered.** @param jobId identifying the job for which to unregister a key value state* @param jobVertexId JobVertexID the KvState instance belongs to.* @param keyGroupRange Key group index the KvState instance belongs to.* @param registrationName Name under which the KvState has been registered.* @return Future acknowledge if the key-value state has been unregistered*/CompletableFuture<Acknowledge> notifyKvStateUnregistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName);
}
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
JobMaster
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {/** Default names for Flink's distributed components. */public static final String JOB_MANAGER_NAME = "jobmanager";public static final String ARCHIVE_NAME = "archive";// ------------------------------------------------------------------------private final JobMasterConfiguration jobMasterConfiguration;private final ResourceID resourceId;private final JobGraph jobGraph;private final Time rpcTimeout;private final HighAvailabilityServices highAvailabilityServices;private final BlobServer blobServer;private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;private final ScheduledExecutorService scheduledExecutorService;private final OnCompletionActions jobCompletionActions;private final FatalErrorHandler fatalErrorHandler;private final ClassLoader userCodeLoader;private final SlotPool slotPool;private final SlotPoolGateway slotPoolGateway;private final RestartStrategy restartStrategy;// --------- BackPressure --------private final BackPressureStatsTracker backPressureStatsTracker;// --------- ResourceManager --------private final LeaderRetrievalService resourceManagerLeaderRetriever;// --------- TaskManagers --------private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;// -------- Mutable fields ---------private ExecutionGraph executionGraph;@Nullableprivate JobManagerJobStatusListener jobStatusListener;@Nullableprivate JobManagerJobMetricGroup jobManagerJobMetricGroup;@Nullableprivate String lastInternalSavepoint;@Nullableprivate ResourceManagerAddress resourceManagerAddress;@Nullableprivate ResourceManagerConnection resourceManagerConnection;@Nullableprivate EstablishedResourceManagerConnection establishedResourceManagerConnection;//......@Overridepublic CompletableFuture<Acknowledge> notifyKvStateRegistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName,final KvStateID kvStateId,final InetSocketAddress kvStateServerAddress) {if (jobGraph.getJobID().equals(jobId)) {if (log.isDebugEnabled()) {log.debug("Key value state registered for job {} under name {}.",jobGraph.getJobID(), registrationName);}try {executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);return CompletableFuture.completedFuture(Acknowledge.get());} catch (Exception e) {log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);return FutureUtils.completedExceptionally(e);}} else {if (log.isDebugEnabled()) {log.debug("Notification about key-value state registration for unknown job {} received.", jobId);}return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));}}@Overridepublic CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobId,JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName) {if (jobGraph.getJobID().equals(jobId)) {if (log.isDebugEnabled()) {log.debug("Key value state unregistered for job {} under name {}.",jobGraph.getJobID(), registrationName);}try {executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);return CompletableFuture.completedFuture(Acknowledge.get());} catch (Exception e) {log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);return FutureUtils.completedExceptionally(e);}} else {if (log.isDebugEnabled()) {log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);}return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));}}//......
}
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
KvStateLocationRegistry
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
public class KvStateLocationRegistry {/** JobID this coordinator belongs to. */private final JobID jobId;/** Job vertices for determining parallelism per key. */private final Map<JobVertexID, ExecutionJobVertex> jobVertices;/*** Location info keyed by registration name. The name needs to be unique* per JobID, i.e. two operators cannot register KvState with the same* name.*/private final Map<String, KvStateLocation> lookupTable = new HashMap<>();/*** Creates the registry for the job.** @param jobId JobID this coordinator belongs to.* @param jobVertices Job vertices map of all vertices of this job.*/public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {this.jobId = Preconditions.checkNotNull(jobId, "JobID");this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");}/*** Returns the {@link KvStateLocation} for the registered KvState instance* or <code>null</code> if no location information is available.** @param registrationName Name under which the KvState instance is registered.* @return Location information or <code>null</code>.*/public KvStateLocation getKvStateLocation(String registrationName) {return lookupTable.get(registrationName);}/*** Notifies the registry about a registered KvState instance.** @param jobVertexId JobVertexID the KvState instance belongs to* @param keyGroupRange Key group range the KvState instance belongs to* @param registrationName Name under which the KvState has been registered* @param kvStateId ID of the registered KvState instance* @param kvStateServerAddress Server address where to find the KvState instance** @throws IllegalArgumentException If JobVertexID does not belong to job* @throws IllegalArgumentException If state has been registered with same* name by another operator.* @throws IndexOutOfBoundsException If key group index is out of bounds.*/public void notifyKvStateRegistered(JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName,KvStateID kvStateId,InetSocketAddress kvStateServerAddress) {KvStateLocation location = lookupTable.get(registrationName);if (location == null) {// First registration for this operator, create the location infoExecutionJobVertex vertex = jobVertices.get(jobVertexId);if (vertex != null) {int parallelism = vertex.getMaxParallelism();location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);lookupTable.put(registrationName, location);} else {throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);}}// Duplicated name if vertex IDs don't matchif (!location.getJobVertexId().equals(jobVertexId)) {IllegalStateException duplicate = new IllegalStateException("Registration name clash. KvState with name '" + registrationName +"' has already been registered by another operator (" +location.getJobVertexId() + ").");ExecutionJobVertex vertex = jobVertices.get(jobVertexId);if (vertex != null) {vertex.fail(new SuppressRestartsException(duplicate));}throw duplicate;}location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);}/*** Notifies the registry about an unregistered KvState instance.** @param jobVertexId JobVertexID the KvState instance belongs to* @param keyGroupRange Key group index the KvState instance belongs to* @param registrationName Name under which the KvState has been registered* @throws IllegalArgumentException If another operator registered the state instance* @throws IllegalArgumentException If the registration name is not known*/public void notifyKvStateUnregistered(JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName) {KvStateLocation location = lookupTable.get(registrationName);if (location != null) {// Duplicate name if vertex IDs don't matchif (!location.getJobVertexId().equals(jobVertexId)) {throw new IllegalArgumentException("Another operator (" +location.getJobVertexId() + ") registered the KvState " +"under '" + registrationName + "'.");}location.unregisterKvState(keyGroupRange);if (location.getNumRegisteredKeyGroups() == 0) {lookupTable.remove(registrationName);}} else {throw new IllegalArgumentException("Unknown registration name '" +registrationName + "'. " + "Probably registration/unregistration race.");}}}
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
- notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
- notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
小结
- KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
- JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
- KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除
doc
- KvStateRegistryGateway
转载于:https://my.oschina.net/go4it/blog/3024329
聊聊flink的KvStateRegistryGateway相关推荐
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- 聊聊flink的HistoryServer
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...
- 聊聊flink的TimeCharacteristic
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...
- 聊聊flink JobManager的heap大小设置
序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...
- 聊聊flink的InternalTimeServiceManager
序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...
- 聊聊flink的StateTtlConfig
序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...
- 聊聊flink的AscendingTimestampExtractor
序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...
- 聊聊flink的CheckpointScheduler
序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...
- 聊聊flink的NetworkEnvironmentConfiguration
序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...
- 聊聊flink Table的groupBy操作
序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...
最新文章
- keil c51的内部RAM(idata)动态内存管理程序(转)
- web前端面试题:20道做完信心嫉妒膨胀的测试题
- centos6 python 安装 sqlite 解决 No module named ‘_sqlite3′
- h5实现网页内容跟随窗口大小移动_h5页面能流行于各大社交平台必定有其原因的...
- 2.0、Android Studio编写你的应用
- 软件oem要注意什么_租房软件有哪些 租房有什么需要注意的地方
- python轮胎缺陷检测_基于深度学习的轮胎缺陷无损检测与分类技术研究
- matlab二叉树期权定价,二叉树期权定价模型
- linux 关键字搜索文件
- 融入动画技术的交互应用优秀作业推荐
- Eel+VUE python GUI编程
- python python中max()函数的用法
- 为什么使用use strict可以节约你的时间
- Jenkins邮件通知模板(Git修改版)
- 苹果或将为iPhone 13全系配备LiDAR
- (二进制枚举+思维)1625 夹克爷发红包
- 微服务-分布式事务seata
- [windows]修改本机host配置
- 模型评估方法(混淆矩阵)
- 04_星仔带你学Java之流程语句(顺序结构、选择结构、循环结构、控制循环结构语句)