为什么80%的码农都做不了架构师?>>>   

本文主要研究一下flink的KvStateRegistryGateway

KvStateRegistryGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/KvStateRegistryGateway.java

public interface KvStateRegistryGateway {/*** Notifies that queryable state has been registered.** @param jobId    identifying the job for which to register a key value state* @param jobVertexId JobVertexID the KvState instance belongs to.* @param keyGroupRange Key group range the KvState instance belongs to.* @param registrationName Name under which the KvState has been registered.* @param kvStateId ID of the registered KvState instance.* @param kvStateServerAddress Server address where to find the KvState instance.* @return Future acknowledge if the key-value state has been registered*/CompletableFuture<Acknowledge> notifyKvStateRegistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName,final KvStateID kvStateId,final InetSocketAddress kvStateServerAddress);/*** Notifies that queryable state has been unregistered.** @param jobId    identifying the job for which to unregister a key value state* @param jobVertexId JobVertexID the KvState instance belongs to.* @param keyGroupRange Key group index the KvState instance belongs to.* @param registrationName Name under which the KvState has been registered.* @return Future acknowledge if the key-value state has been unregistered*/CompletableFuture<Acknowledge> notifyKvStateUnregistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName);
}
  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法

JobMaster

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {/** Default names for Flink's distributed components. */public static final String JOB_MANAGER_NAME = "jobmanager";public static final String ARCHIVE_NAME = "archive";// ------------------------------------------------------------------------private final JobMasterConfiguration jobMasterConfiguration;private final ResourceID resourceId;private final JobGraph jobGraph;private final Time rpcTimeout;private final HighAvailabilityServices highAvailabilityServices;private final BlobServer blobServer;private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;private final ScheduledExecutorService scheduledExecutorService;private final OnCompletionActions jobCompletionActions;private final FatalErrorHandler fatalErrorHandler;private final ClassLoader userCodeLoader;private final SlotPool slotPool;private final SlotPoolGateway slotPoolGateway;private final RestartStrategy restartStrategy;// --------- BackPressure --------private final BackPressureStatsTracker backPressureStatsTracker;// --------- ResourceManager --------private final LeaderRetrievalService resourceManagerLeaderRetriever;// --------- TaskManagers --------private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;// -------- Mutable fields ---------private ExecutionGraph executionGraph;@Nullableprivate JobManagerJobStatusListener jobStatusListener;@Nullableprivate JobManagerJobMetricGroup jobManagerJobMetricGroup;@Nullableprivate String lastInternalSavepoint;@Nullableprivate ResourceManagerAddress resourceManagerAddress;@Nullableprivate ResourceManagerConnection resourceManagerConnection;@Nullableprivate EstablishedResourceManagerConnection establishedResourceManagerConnection;//......@Overridepublic CompletableFuture<Acknowledge> notifyKvStateRegistered(final JobID jobId,final JobVertexID jobVertexId,final KeyGroupRange keyGroupRange,final String registrationName,final KvStateID kvStateId,final InetSocketAddress kvStateServerAddress) {if (jobGraph.getJobID().equals(jobId)) {if (log.isDebugEnabled()) {log.debug("Key value state registered for job {} under name {}.",jobGraph.getJobID(), registrationName);}try {executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);return CompletableFuture.completedFuture(Acknowledge.get());} catch (Exception e) {log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);return FutureUtils.completedExceptionally(e);}} else {if (log.isDebugEnabled()) {log.debug("Notification about key-value state registration for unknown job {} received.", jobId);}return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));}}@Overridepublic CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobId,JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName) {if (jobGraph.getJobID().equals(jobId)) {if (log.isDebugEnabled()) {log.debug("Key value state unregistered for job {} under name {}.",jobGraph.getJobID(), registrationName);}try {executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);return CompletableFuture.completedFuture(Acknowledge.get());} catch (Exception e) {log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);return FutureUtils.completedExceptionally(e);}} else {if (log.isDebugEnabled()) {log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);}return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));}}//......
}
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered

KvStateLocationRegistry

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java

public class KvStateLocationRegistry {/** JobID this coordinator belongs to. */private final JobID jobId;/** Job vertices for determining parallelism per key. */private final Map<JobVertexID, ExecutionJobVertex> jobVertices;/*** Location info keyed by registration name. The name needs to be unique* per JobID, i.e. two operators cannot register KvState with the same* name.*/private final Map<String, KvStateLocation> lookupTable = new HashMap<>();/*** Creates the registry for the job.** @param jobId       JobID this coordinator belongs to.* @param jobVertices Job vertices map of all vertices of this job.*/public KvStateLocationRegistry(JobID jobId, Map<JobVertexID, ExecutionJobVertex> jobVertices) {this.jobId = Preconditions.checkNotNull(jobId, "JobID");this.jobVertices = Preconditions.checkNotNull(jobVertices, "Job vertices");}/*** Returns the {@link KvStateLocation} for the registered KvState instance* or <code>null</code> if no location information is available.** @param registrationName Name under which the KvState instance is registered.* @return Location information or <code>null</code>.*/public KvStateLocation getKvStateLocation(String registrationName) {return lookupTable.get(registrationName);}/*** Notifies the registry about a registered KvState instance.** @param jobVertexId JobVertexID the KvState instance belongs to* @param keyGroupRange Key group range the KvState instance belongs to* @param registrationName Name under which the KvState has been registered* @param kvStateId ID of the registered KvState instance* @param kvStateServerAddress Server address where to find the KvState instance** @throws IllegalArgumentException If JobVertexID does not belong to job* @throws IllegalArgumentException If state has been registered with same* name by another operator.* @throws IndexOutOfBoundsException If key group index is out of bounds.*/public void notifyKvStateRegistered(JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName,KvStateID kvStateId,InetSocketAddress kvStateServerAddress) {KvStateLocation location = lookupTable.get(registrationName);if (location == null) {// First registration for this operator, create the location infoExecutionJobVertex vertex = jobVertices.get(jobVertexId);if (vertex != null) {int parallelism = vertex.getMaxParallelism();location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);lookupTable.put(registrationName, location);} else {throw new IllegalArgumentException("Unknown JobVertexID " + jobVertexId);}}// Duplicated name if vertex IDs don't matchif (!location.getJobVertexId().equals(jobVertexId)) {IllegalStateException duplicate = new IllegalStateException("Registration name clash. KvState with name '" + registrationName +"' has already been registered by another operator (" +location.getJobVertexId() + ").");ExecutionJobVertex vertex = jobVertices.get(jobVertexId);if (vertex != null) {vertex.fail(new SuppressRestartsException(duplicate));}throw duplicate;}location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);}/*** Notifies the registry about an unregistered KvState instance.** @param jobVertexId JobVertexID the KvState instance belongs to* @param keyGroupRange Key group index the KvState instance belongs to* @param registrationName Name under which the KvState has been registered* @throws IllegalArgumentException If another operator registered the state instance* @throws IllegalArgumentException If the registration name is not known*/public void notifyKvStateUnregistered(JobVertexID jobVertexId,KeyGroupRange keyGroupRange,String registrationName) {KvStateLocation location = lookupTable.get(registrationName);if (location != null) {// Duplicate name if vertex IDs don't matchif (!location.getJobVertexId().equals(jobVertexId)) {throw new IllegalArgumentException("Another operator (" +location.getJobVertexId() + ") registered the KvState " +"under '" + registrationName + "'.");}location.unregisterKvState(keyGroupRange);if (location.getNumRegisteredKeyGroups() == 0) {lookupTable.remove(registrationName);}} else {throw new IllegalArgumentException("Unknown registration name '" +registrationName + "'. " + "Probably registration/unregistration race.");}}}
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系
  • notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法
  • notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

小结

  • KvStateRegistryGateway接口定义了notifyKvStateRegistered、notifyKvStateUnregistered两个方法;JobMaster实现了这两个方法
  • JobMaster的notifyKvStateRegistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered;notifyKvStateUnregistered方法主要是触发executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered
  • KvStateLocationRegistry的构造器要求传入jobId及jobVertices;它有一个属性为lookupTable,存储了registrationName与KvStateLocation的映射关系;notifyKvStateRegistered方法在lookupTable查找不到对应的KvStateLocation的时候会创建一个KvStateLocation并存放入lookupTable,最后调用location.registerKvState方法;notifyKvStateUnregistere方法在lookupTable查找对应KvStateLocation的时候会触发location.unregisterKvState,然后将该KvStateLocation从lookupTable中移除

doc

  • KvStateRegistryGateway

转载于:https://my.oschina.net/go4it/blog/3024329

聊聊flink的KvStateRegistryGateway相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  3. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  4. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  5. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  6. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  7. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  8. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  9. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

  10. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. keil c51的内部RAM(idata)动态内存管理程序(转)
  2. web前端面试题:20道做完信心嫉妒膨胀的测试题
  3. centos6 python 安装 sqlite 解决 No module named ‘_sqlite3′
  4. h5实现网页内容跟随窗口大小移动_h5页面能流行于各大社交平台必定有其原因的...
  5. 2.0、Android Studio编写你的应用
  6. 软件oem要注意什么_租房软件有哪些 租房有什么需要注意的地方
  7. python轮胎缺陷检测_基于深度学习的轮胎缺陷无损检测与分类技术研究
  8. matlab二叉树期权定价,二叉树期权定价模型
  9. linux 关键字搜索文件
  10. 融入动画技术的交互应用优秀作业推荐
  11. Eel+VUE python GUI编程
  12. python python中max()函数的用法
  13. 为什么使用use strict可以节约你的时间
  14. Jenkins邮件通知模板(Git修改版)
  15. 苹果或将为iPhone 13全系配备LiDAR
  16. (二进制枚举+思维)1625 夹克爷发红包
  17. 微服务-分布式事务seata
  18. [windows]修改本机host配置
  19. 模型评估方法(混淆矩阵)
  20. 04_星仔带你学Java之流程语句(顺序结构、选择结构、循环结构、控制循环结构语句)

热门文章

  1. vue 图片切换动态绑定
  2. IdentityServer4【Topic】之定义资源
  3. 更新.xsd后,rdlc 数据源更新不了
  4. STM32-串行SPI nor
  5. 360笔试题-字符置换
  6. 教你使用Donemax DMmenu可以解决Mac启动缓慢的问题呢?
  7. Android中Text文本特效处理
  8. VEGAS不等双11,提前嗨购!助你成为视频大神!
  9. 会声会影2018,带你体验不一样的光影世界!
  10. CSS按钮动画(四)