本文主要研究一下flink JobManager的heap大小设置

JobManagerOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {//....../*** JVM heap size for the JobManager with memory size.*/@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =key("jobmanager.heap.size").defaultValue("1024m").withDescription("JVM heap size for the JobManager.");/*** JVM heap size (in megabytes) for the JobManager.* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}*/@Deprecatedpublic static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =key("jobmanager.heap.mb").defaultValue(1024).withDescription("JVM heap size (in megabytes) for the JobManager.");//......
}
复制代码
  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃

ConfigurationUtils

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java

public class ConfigurationUtils {private static final String[] EMPTY = new String[0];/*** Get job manager's heap memory. This method will check the new key* {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.** @param configuration the configuration object* @return the memory size of job manager's heap memory.*/public static MemorySize getJobManagerHeapMemory(Configuration configuration) {if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");} else {//use default valuereturn MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));}}//......
}
复制代码
  • ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize

MemorySize

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

@PublicEvolving
public class MemorySize implements java.io.Serializable {private static final long serialVersionUID = 1L;// ------------------------------------------------------------------------/** The memory size, in bytes. */private final long bytes;/*** Constructs a new MemorySize.** @param bytes The size, in bytes. Must be zero or larger.*/public MemorySize(long bytes) {checkArgument(bytes >= 0, "bytes must be >= 0");this.bytes = bytes;}// ------------------------------------------------------------------------/*** Gets the memory size in bytes.*/public long getBytes() {return bytes;}/*** Gets the memory size in Kibibytes (= 1024 bytes).*/public long getKibiBytes() {return bytes >> 10;}/*** Gets the memory size in Mebibytes (= 1024 Kibibytes).*/public int getMebiBytes() {return (int) (bytes >> 20);}/*** Gets the memory size in Gibibytes (= 1024 Mebibytes).*/public long getGibiBytes() {return bytes >> 30;}/*** Gets the memory size in Tebibytes (= 1024 Gibibytes).*/public long getTebiBytes() {return bytes >> 40;}// ------------------------------------------------------------------------@Overridepublic int hashCode() {return (int) (bytes ^ (bytes >>> 32));}@Overridepublic boolean equals(Object obj) {return obj == this ||(obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);}@Overridepublic String toString() {return bytes + " bytes";}// ------------------------------------------------------------------------//  Parsing// ------------------------------------------------------------------------/*** Parses the given string as as MemorySize.** @param text The string to parse* @return The parsed MemorySize** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static MemorySize parse(String text) throws IllegalArgumentException {return new MemorySize(parseBytes(text));}/*** Parses the given string with a default unit.** @param text The string to parse.* @param defaultUnit specify the default unit.* @return The parsed MemorySize.** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {if (!hasUnit(text)) {return parse(text + defaultUnit.getUnits()[0]);}return parse(text);}/*** Parses the given string as bytes.* The supported expressions are listed under {@link MemorySize}.** @param text The string to parse* @return The parsed size, in bytes.** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static long parseBytes(String text) throws IllegalArgumentException {checkNotNull(text, "text");final String trimmed = text.trim();checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");final int len = trimmed.length();int pos = 0;char current;while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {pos++;}final String number = trimmed.substring(0, pos);final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);if (number.isEmpty()) {throw new NumberFormatException("text does not start with a number");}final long value;try {value = Long.parseLong(number); // this throws a NumberFormatException on overflow}catch (NumberFormatException e) {throw new IllegalArgumentException("The value '" + number +"' cannot be re represented as 64bit number (numeric overflow).");}final long multiplier;if (unit.isEmpty()) {multiplier = 1L;}else {if (matchesAny(unit, BYTES)) {multiplier = 1L;}else if (matchesAny(unit, KILO_BYTES)) {multiplier = 1024L;}else if (matchesAny(unit, MEGA_BYTES)) {multiplier = 1024L * 1024L;}else if (matchesAny(unit, GIGA_BYTES)) {multiplier = 1024L * 1024L * 1024L;}else if (matchesAny(unit, TERA_BYTES)) {multiplier = 1024L * 1024L * 1024L * 1024L;}else {throw new IllegalArgumentException("Memory size unit '" + unit +"' does not match any of the recognized units: " + MemoryUnit.getAllUnits());}}final long result = value * multiplier;// check for overflowif (result / multiplier != value) {throw new IllegalArgumentException("The value '" + text +"' cannot be re represented as 64bit number of bytes (numeric overflow).");}return result;}private static boolean matchesAny(String str, MemoryUnit unit) {for (String s : unit.getUnits()) {if (s.equals(str)) {return true;}}return false;}//......
}
复制代码
  • MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法

MemoryUnit

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

   /***  Enum which defines memory unit, mostly used to parse value from configuration file.** <p>To make larger values more compact, the common size suffixes are supported:** <ul>*     <li>q or 1b or 1bytes (bytes)*     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)*     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)*     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)*     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)* </ul>**/public enum MemoryUnit {BYTES(new String[] { "b", "bytes" }),KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),TERA_BYTES(new String[] { "t", "tb", "tebibytes" });private String[] units;MemoryUnit(String[] units) {this.units = units;}public String[] getUnits() {return units;}public static String getAllUnits() {return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());}public static boolean hasUnit(String text) {checkNotNull(text, "text");final String trimmed = text.trim();checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");final int len = trimmed.length();int pos = 0;char current;while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {pos++;}final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);return unit.length() > 0;}private static String concatenateUnits(final String[]... allUnits) {final StringBuilder builder = new StringBuilder(128);for (String[] units : allUnits) {builder.append('(');for (String unit : units) {builder.append(unit);builder.append(" | ");}builder.setLength(builder.length() - 3);builder.append(") / ");}builder.setLength(builder.length() - 3);return builder.toString();}}
复制代码
  • MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配

FlinkYarnSessionCli

flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java

public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {//......private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {if (cmd.hasOption(container.getOpt())) { // number of containers is required option!LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());}// TODO: The number of task manager should be deprecated soonfinal int numberTaskManagers;if (cmd.hasOption(container.getOpt())) {numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));} else {numberTaskManagers = 1;}// JobManager Memoryfinal int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();// Task Managers memoryfinal int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMB).setTaskManagerMemoryMB(taskManagerMemoryMB).setNumberTaskManagers(numberTaskManagers).setSlotsPerTaskManager(slotsPerTaskManager).createClusterSpecification();}//......
}
复制代码
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB

config.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh

//......DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary//......# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; thenFLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; thenFLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi//......if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; thenFLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; thenFLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fi//......# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; thenJVM_ARGS=""
fi//......
复制代码
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
  • 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
  • JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置

jobmanager.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instancesif [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; thenecho $USAGEexit 1
fibin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; thenif [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; thenecho "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"elseflink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})fiif [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; thenecho "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."exit 1fiif [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; thenexport JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"fi# Add JobManager-specific JVM optionsexport FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"# Startup parametersargs=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")if [ ! -z $HOST ]; thenargs+=("--host")args+=("${HOST}")fiif [ ! -z $WEBUIPORT ]; thenargs+=("--webui-port")args+=("${WEBUIPORT}")fi
fiif [[ $STARTSTOP == "start-foreground" ]]; thenexec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
复制代码
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS)
  • 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中
  • jobmanager.sh最后调用flink-console.sh来启动相关类

flink-console.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as arraybin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shcase $SERVICE in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint;;(*)echo "Unknown service '${SERVICE}'. $USAGE."exit 1;;
esacFLINK_TM_CLASSPATH=`constructFlinkClassPath`log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; thenif [ "$JAVA_VERSION" -lt 18 ]; thenJVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"fi
fiecho "Starting $SERVICE as a console application on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
复制代码
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

小结

  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中;jobmanager.sh最后调用flink-console.sh来启动相关类
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP环境变量(比如FLINK_JM_HEAP=512m),或者在flink-conf.yaml中指定jobmanager.heap.size

doc

  • jobmanager.heap.size

聊聊flink JobManager的heap大小设置 1相关推荐

  1. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  2. Flink JobManager的HA原理分析

    文章目录 前言 JobManager的HA切换通知 利用Zookeeper的领导选举与消息通知 引用 前言 在中心式管理的系统里,主节点如果只是单独服务部署的话,或多或少都会存在单点瓶颈(SPOF)问 ...

  3. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  4. 聊聊flink TaskManager的managed memory

    序 本文主要研究一下flink TaskManager的managed memory TaskManagerOptions flink-core-1.7.2-sources.jar!/org/apac ...

  5. mysql innodbuffer修改_mysql参数之innodb_buffer_pool_size大小设置

    MYSQL无法启动报错日志,如下: 190801 11:24:29 mysqld_safe mysqld from pid file /var/run/mysql/mysql.pid ended 19 ...

  6. mysql栈空间大小_jvm 堆内存 栈内存 大小设置

    Tomcat 的JVM 内存溢出问题的解决keyword: tomcat 的jvm 内存溢出问题的解决 近期在熟悉一个开发了有几年的项目,须要把数据库从mysql移植到oracle.首先把jdbc的连 ...

  7. 【Flink】 Flink JobManager HA 机制的扩展与实现

    1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...

  8. 【totti】sun和IBM虚拟机堆大小设置分析

    在IBM的虚拟机官方指导文档中明确指出,禁止将虚拟机的最大值和最小值设置为相等,否则会导致以下2个后果 <1>极大的增加垃圾回收时间,影响系统性能 <2>造成系统中存在内存碎片 ...

  9. android控制台字体大小设置,Android studio 4.1 豆沙护眼色配置、字体大小设置、内存大小设置...

    1.左边目录栏颜色配置: 2.代码编辑区域背景色设置 3.控制台背景色设置 4.菜单栏.工具栏.左边栏字体大小设置 6.修改内存大小.显示内存 例如:修改android-studio/bin/stud ...

  10. JVM内存 大小设置

    Tomcat本身不能直接在计算机上运行,需要依赖于硬件基础之上的操作系统和一个Java虚拟机.Tomcat的内存溢出本质就是JVM内存溢出,所以在本文开始时,应该先对Java JVM有关内存方面的知识 ...

最新文章

  1. jps命令(Java Virtual Machine Process Status Tool)(转)
  2. Java与C++Socket通讯注意事项
  3. Eclipse 创建 Java 包
  4. ConvertUtil-数据类型转换的工具类
  5. 光流 | 光流算法对比:Farneback、Horn-Schunck、Lucas-Kanade、Lucas-Kanade derivative of Gaussian(附Matlab与C++代码)
  6. VTK:IO之ReadExodusData
  7. 在SAP Cloud Platform ABAP编程环境里打印系统变量
  8. hdu-1728(贪心bfs的灵活运用吧)
  9. 关于游戏排行榜设计开发的一些总结
  10. 关于返回二维数组排序后序号数组的问题求解
  11. IOS企业应用出现无法验证,需要网络连接以在这台iPad上验证。接入互联网并重试
  12. 【5G RRU专题】什么是PA非线性失真?
  13. python代码续航的方法_编写python高质量python代码的59个有效方法
  14. 《程序员的创世传说》第三节 魔王与2012
  15. 【Python】数据可视化-散点图绘制
  16. 牛客小白月赛10 A,B,C,D
  17. can的波特率计算(基于stm32开发)
  18. java 接受传感器的数据_java中调用第三方接口获取数据的方式
  19. Oracle INSERT ALL 语句介绍
  20. ESP32 CAM CameraWebServer示例测试

热门文章

  1. “宽带中国、智慧岳阳”项目启动
  2. EasyUI 1.4.4 DataGrid(大数据量) bufferview滚动时不加载下一页数据解决方案
  3. SQLite数据库学习小结——Frameworks层实现
  4. window下从python开始安装科学计算环境
  5. 深入理解jQuery插件开发(讲解jQuery开发的好文)
  6. cisco ASA
  7. Java6开发WebService进阶
  8. 拜托,面试别再问我TopK了!!!
  9. Makefile中创建一个以当前时间为文件夹名的文件
  10. Linux下多线程调试以及查看信息