本文主要研究一下flink的RestartStrategies

RestartStrategies

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

@PublicEvolving
public class RestartStrategies {/*** Generates NoRestartStrategyConfiguration.** @return NoRestartStrategyConfiguration*/public static RestartStrategyConfiguration noRestart() {return new NoRestartStrategyConfiguration();}public static RestartStrategyConfiguration fallBackRestart() {return new FallbackRestartStrategyConfiguration();}/*** Generates a FixedDelayRestartStrategyConfiguration.** @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy* @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy* @return FixedDelayRestartStrategy*/public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));}/*** Generates a FixedDelayRestartStrategyConfiguration.** @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy* @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy* @return FixedDelayRestartStrategy*/public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);}/*** Generates a FailureRateRestartStrategyConfiguration.** @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job* @param failureInterval Time interval for failures* @param delayInterval Delay in-between restart attempts*/public static FailureRateRestartStrategyConfiguration failureRateRestart(int failureRate, Time failureInterval, Time delayInterval) {return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);}//......
}
  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration

RestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public abstract static class RestartStrategyConfiguration implements Serializable {private static final long serialVersionUID = 6285853591578313960L;private RestartStrategyConfiguration() {}/*** Returns a description which is shown in the web interface.** @return Description of the restart strategy*/public abstract String getDescription();}
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类

NoRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {private static final long serialVersionUID = -5894362702943349962L;@Overridepublic String getDescription() {return "Restart deactivated.";}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}return o instanceof NoRestartStrategyConfiguration;}@Overridepublic int hashCode() {return Objects.hash();}}
  • NoRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表no restart strategy

FixedDelayRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {private static final long serialVersionUID = 4149870149673363190L;private final int restartAttempts;private final Time delayBetweenAttemptsInterval;FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {this.restartAttempts = restartAttempts;this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;}public int getRestartAttempts() {return restartAttempts;}public Time getDelayBetweenAttemptsInterval() {return delayBetweenAttemptsInterval;}@Overridepublic int hashCode() {int result = restartAttempts;result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);return result;}@Overridepublic boolean equals(Object obj) {if (obj instanceof FixedDelayRestartStrategyConfiguration) {FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);} else {return false;}}@Overridepublic String getDescription() {return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #"+ restartAttempts + " restart attempts.";}}
  • FixedDelayRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表fixed delay restart strategy,它有restartAttempts及delayBetweenAttemptsInterval两个属性

FailureRateRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {private static final long serialVersionUID = 1195028697539661739L;private final int maxFailureRate;private final Time failureInterval;private final Time delayBetweenAttemptsInterval;public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {this.maxFailureRate = maxFailureRate;this.failureInterval = failureInterval;this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;}public int getMaxFailureRate() {return maxFailureRate;}public Time getFailureInterval() {return failureInterval;}public Time getDelayBetweenAttemptsInterval() {return delayBetweenAttemptsInterval;}@Overridepublic String getDescription() {return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()+ " and fixed delay " + delayBetweenAttemptsInterval.toString();}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;return maxFailureRate == that.maxFailureRate &&Objects.equals(failureInterval, that.failureInterval) &&Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);}@Overridepublic int hashCode() {return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);}}
  • FailureRateRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表failure rate restart strategy,它有maxFailureRate、failureInterval、delayBetweenAttemptsInterval三个属性

FallbackRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {private static final long serialVersionUID = -4441787204284085544L;@Overridepublic String getDescription() {return "Cluster level default restart strategy";}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}return o instanceof FallbackRestartStrategyConfiguration;}@Overridepublic int hashCode() {return Objects.hash();}}
  • FallbackRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表Cluster level default restart strategy

RestartStrategyResolving

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java

public final class RestartStrategyResolving {/*** Resolves which {@link RestartStrategy} to use. It should be used only on the server side.* The resolving strategy is as follows:* <ol>* <li>Strategy set within job graph.</li>* <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing* is enabled.</li>* <li>If no strategy was set on client and server side and checkpointing was enabled then* {@link FixedDelayRestartStrategy} is used</li>* </ol>** @param clientConfiguration restart configuration given within the job graph* @param serverStrategyFactory default server side strategy factory* @param isCheckpointingEnabled if checkpointing was enabled for the job* @return resolved strategy*/public static RestartStrategy resolve(RestartStrategies.RestartStrategyConfiguration clientConfiguration,RestartStrategyFactory serverStrategyFactory,boolean isCheckpointingEnabled) {final RestartStrategy clientSideRestartStrategy =RestartStrategyFactory.createRestartStrategy(clientConfiguration);if (clientSideRestartStrategy != null) {return clientSideRestartStrategy;} else {if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory).createRestartStrategy(isCheckpointingEnabled);} else {return serverStrategyFactory.createRestartStrategy();}}}private RestartStrategyResolving() {}
}
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy

RestartStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java

public interface RestartStrategy {/*** True if the restart strategy can be applied to restart the {@link ExecutionGraph}.** @return true if restart is possible, otherwise false*/boolean canRestart();/*** Called by the ExecutionGraph to eventually trigger a full recovery.* The recovery must be triggered on the given callback object, and may be delayed* with the help of the given scheduled executor.** <p>The thread that calls this method is not supposed to block/sleep.** @param restarter The hook to restart the ExecutionGraph* @param executor An scheduled executor to delay the restart*/void restart(RestartCallback restarter, ScheduledExecutor executor);
}
  • RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

NoRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java

public class NoRestartStrategy implements RestartStrategy {@Overridepublic boolean canRestart() {return false;}@Overridepublic void restart(RestartCallback restarter, ScheduledExecutor executor) {throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");}/*** Creates a NoRestartStrategyFactory instance.** @param configuration Configuration object which is ignored* @return NoRestartStrategyFactory instance*/public static NoRestartStrategyFactory createFactory(Configuration configuration) {return new NoRestartStrategyFactory();}@Overridepublic String toString() {return "NoRestartStrategy";}public static class NoRestartStrategyFactory extends RestartStrategyFactory {private static final long serialVersionUID = -1809462525812787862L;@Overridepublic RestartStrategy createRestartStrategy() {return new NoRestartStrategy();}}
}
  • NoRestartStrategy实现了RestartStrategy接口,它的canRestart方法返回false,restart方法抛出UnsupportedOperationException

FixedDelayRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java

public class FixedDelayRestartStrategy implements RestartStrategy {private final int maxNumberRestartAttempts;private final long delayBetweenRestartAttempts;private int currentRestartAttempt;public FixedDelayRestartStrategy(int maxNumberRestartAttempts,long delayBetweenRestartAttempts) {Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive.");Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");this.maxNumberRestartAttempts = maxNumberRestartAttempts;this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;currentRestartAttempt = 0;}public int getCurrentRestartAttempt() {return currentRestartAttempt;}@Overridepublic boolean canRestart() {return currentRestartAttempt < maxNumberRestartAttempts;}@Overridepublic void restart(final RestartCallback restarter, ScheduledExecutor executor) {currentRestartAttempt++;executor.schedule(new Runnable() {@Overridepublic void run() {restarter.triggerFullRecovery();}}, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);}/*** Creates a FixedDelayRestartStrategy from the given Configuration.** @param configuration Configuration containing the parameter values for the restart strategy* @return Initialized instance of FixedDelayRestartStrategy* @throws Exception*/public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);long delay;try {delay = Duration.apply(delayString).toMillis();} catch (NumberFormatException nfe) {throw new Exception("Invalid config value for " +ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +". Value must be a valid duration (such as '100 milli' or '10 s')");}return new FixedDelayRestartStrategyFactory(maxAttempts, delay);}@Overridepublic String toString() {return "FixedDelayRestartStrategy(" +"maxNumberRestartAttempts=" + maxNumberRestartAttempts +", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +')';}public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {private static final long serialVersionUID = 6642934067762271950L;private final int maxAttempts;private final long delay;public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {this.maxAttempts = maxAttempts;this.delay = delay;}@Overridepublic RestartStrategy createRestartStrategy() {return new FixedDelayRestartStrategy(maxAttempts, delay);}}
}
  • FixedDelayRestartStrategy实现了RestartStrategy接口,它的canRestart方法依据currentRestartAttempt及maxNumberRestartAttempts来判断;restart方法则直接调用ScheduledExecutor.schedule方法,延时delayBetweenRestartAttempts毫秒执行RestartCallback.triggerFullRecovery()

FailureRateRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java

public class FailureRateRestartStrategy implements RestartStrategy {private final Time failuresInterval;private final Time delayInterval;private final int maxFailuresPerInterval;private final ArrayDeque<Long> restartTimestampsDeque;public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms.");Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms.");this.failuresInterval = failuresInterval;this.delayInterval = delayInterval;this.maxFailuresPerInterval = maxFailuresPerInterval;this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);}@Overridepublic boolean canRestart() {if (isRestartTimestampsQueueFull()) {Long now = System.currentTimeMillis();Long earliestFailure = restartTimestampsDeque.peek();return (now - earliestFailure) > failuresInterval.toMilliseconds();} else {return true;}}@Overridepublic void restart(final RestartCallback restarter, ScheduledExecutor executor) {if (isRestartTimestampsQueueFull()) {restartTimestampsDeque.remove();}restartTimestampsDeque.add(System.currentTimeMillis());executor.schedule(new Runnable() {@Overridepublic void run() {restarter.triggerFullRecovery();}}, delayInterval.getSize(), delayInterval.getUnit());}private boolean isRestartTimestampsQueueFull() {return restartTimestampsDeque.size() >= maxFailuresPerInterval;}@Overridepublic String toString() {return "FailureRateRestartStrategy(" +"failuresInterval=" + failuresInterval +"delayInterval=" + delayInterval +"maxFailuresPerInterval=" + maxFailuresPerInterval +")";}public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);String failuresIntervalString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString());String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);Duration failuresInterval = Duration.apply(failuresIntervalString);Duration delay = Duration.apply(delayString);return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));}public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {private static final long serialVersionUID = -373724639430960480L;private final int maxFailuresPerInterval;private final Time failuresInterval;private final Time delayInterval;public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {this.maxFailuresPerInterval = maxFailuresPerInterval;this.failuresInterval = Preconditions.checkNotNull(failuresInterval);this.delayInterval = Preconditions.checkNotNull(delayInterval);}@Overridepublic RestartStrategy createRestartStrategy() {return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);}}
}
  • FailureRateRestartStrategy实现了RestartStrategy接口,它的canRestart方法在restartTimestampsDeque队列大小小于maxFailuresPerInterval时返回true,大于等于maxFailuresPerInterval时则判断当前时间距离earliestFailure是否大于failuresInterval;restart方法则往restartTimestampsDeque添加当前时间,然后调用ScheduledExecutor.schedule方法,延时delayInterval执行RestartCallback.triggerFullRecovery()

小结

  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy;RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

doc

  • Restart Strategies

聊聊flink的RestartStrategies相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  3. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  4. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  5. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  6. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  7. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  8. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  9. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

最新文章

  1. [解题报告]10929 - You can say 11
  2. 桩身弹性压缩计算公式_基于非线性应力应变关系的桩身压缩量计算
  3. linux centos 编译luabind-0.9.1 动态库 静态库
  4. pom.xml中的artifactId是什么意思?
  5. swift5.5异常的处理的三种的方式
  6. 'ADB server didn't ACK'的解决办法
  7. 写给人类的机器学习 2.2 监督学习 II
  8. python全局变量的声明和使用_python自学篇(第三章:函数)
  9. firefox下光标处插入文本
  10. java自动类型提升_Java中的基本数据类型转换(自动、强制、提升)
  11. hive函数参考手册
  12. 计算机组成原理试卷分析,《计算机组成原理与汇编语言》试卷分析报告.doc.docx...
  13. 《沉浸式线性代数》完整版正式发布,全交互式体验
  14. 计算机中汉字字库分为哪两种,常用的汉字字库有GB2312字库和GBK字库两种。 (转)...
  15. html当当图书榜页面,2019书排行榜_当当网图书排行榜
  16. CSS | 盒模型的宽度计算规则
  17. 华为ax3怎么接光纤sc接口_华为路由器AX3怎么设置?
  18. 研华安装Linux系统,在自带显卡GeForce RTX 2070的研华MIC-770工控机上安装Ubuntu18.
  19. android自定义抽奖,Android 自定义View 抽奖大转盘(2)
  20. 新世纪福音战士剧场版(EVA破)蓝光BD高清下载

热门文章

  1. java zk监听异步_zk事件的监听和处理2
  2. cesium +vue项目怎么运行
  3. Idea中,Terminal 无法联接window终端解决
  4. 项目后台运行关闭_iOS到底有没有必要上滑强制关闭APP?
  5. python tkinter获取屏幕大小_使用Python构建属于自己的Markdown编辑器
  6. java 打印gc_java – 以编程方式打印启用GC日志记录时通常在JVM出口上打印的堆使用情况...
  7. java里面string什么意思_java中string什么意思
  8. python import system_[Python Basics]引用系统(The Import System)
  9. 600分左右的计算机院校,600分左右的985大学 性价比最高的学校
  10. java list 树_java list转换为树形