聊聊flink JobManager的heap大小设置 1
序
本文主要研究一下flink JobManager的heap大小设置
JobManagerOptions
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving
public class JobManagerOptions {//....../*** JVM heap size for the JobManager with memory size.*/@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =key("jobmanager.heap.size").defaultValue("1024m").withDescription("JVM heap size for the JobManager.");/*** JVM heap size (in megabytes) for the JobManager.* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}*/@Deprecatedpublic static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =key("jobmanager.heap.mb").defaultValue(1024).withDescription("JVM heap size (in megabytes) for the JobManager.");//......
}
复制代码
- jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃
ConfigurationUtils
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java
public class ConfigurationUtils {private static final String[] EMPTY = new String[0];/*** Get job manager's heap memory. This method will check the new key* {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.** @param configuration the configuration object* @return the memory size of job manager's heap memory.*/public static MemorySize getJobManagerHeapMemory(Configuration configuration) {if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");} else {//use default valuereturn MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));}}//......
}
复制代码
- ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize
MemorySize
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
@PublicEvolving
public class MemorySize implements java.io.Serializable {private static final long serialVersionUID = 1L;// ------------------------------------------------------------------------/** The memory size, in bytes. */private final long bytes;/*** Constructs a new MemorySize.** @param bytes The size, in bytes. Must be zero or larger.*/public MemorySize(long bytes) {checkArgument(bytes >= 0, "bytes must be >= 0");this.bytes = bytes;}// ------------------------------------------------------------------------/*** Gets the memory size in bytes.*/public long getBytes() {return bytes;}/*** Gets the memory size in Kibibytes (= 1024 bytes).*/public long getKibiBytes() {return bytes >> 10;}/*** Gets the memory size in Mebibytes (= 1024 Kibibytes).*/public int getMebiBytes() {return (int) (bytes >> 20);}/*** Gets the memory size in Gibibytes (= 1024 Mebibytes).*/public long getGibiBytes() {return bytes >> 30;}/*** Gets the memory size in Tebibytes (= 1024 Gibibytes).*/public long getTebiBytes() {return bytes >> 40;}// ------------------------------------------------------------------------@Overridepublic int hashCode() {return (int) (bytes ^ (bytes >>> 32));}@Overridepublic boolean equals(Object obj) {return obj == this ||(obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);}@Overridepublic String toString() {return bytes + " bytes";}// ------------------------------------------------------------------------// Parsing// ------------------------------------------------------------------------/*** Parses the given string as as MemorySize.** @param text The string to parse* @return The parsed MemorySize** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static MemorySize parse(String text) throws IllegalArgumentException {return new MemorySize(parseBytes(text));}/*** Parses the given string with a default unit.** @param text The string to parse.* @param defaultUnit specify the default unit.* @return The parsed MemorySize.** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {if (!hasUnit(text)) {return parse(text + defaultUnit.getUnits()[0]);}return parse(text);}/*** Parses the given string as bytes.* The supported expressions are listed under {@link MemorySize}.** @param text The string to parse* @return The parsed size, in bytes.** @throws IllegalArgumentException Thrown, if the expression cannot be parsed.*/public static long parseBytes(String text) throws IllegalArgumentException {checkNotNull(text, "text");final String trimmed = text.trim();checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");final int len = trimmed.length();int pos = 0;char current;while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {pos++;}final String number = trimmed.substring(0, pos);final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);if (number.isEmpty()) {throw new NumberFormatException("text does not start with a number");}final long value;try {value = Long.parseLong(number); // this throws a NumberFormatException on overflow}catch (NumberFormatException e) {throw new IllegalArgumentException("The value '" + number +"' cannot be re represented as 64bit number (numeric overflow).");}final long multiplier;if (unit.isEmpty()) {multiplier = 1L;}else {if (matchesAny(unit, BYTES)) {multiplier = 1L;}else if (matchesAny(unit, KILO_BYTES)) {multiplier = 1024L;}else if (matchesAny(unit, MEGA_BYTES)) {multiplier = 1024L * 1024L;}else if (matchesAny(unit, GIGA_BYTES)) {multiplier = 1024L * 1024L * 1024L;}else if (matchesAny(unit, TERA_BYTES)) {multiplier = 1024L * 1024L * 1024L * 1024L;}else {throw new IllegalArgumentException("Memory size unit '" + unit +"' does not match any of the recognized units: " + MemoryUnit.getAllUnits());}}final long result = value * multiplier;// check for overflowif (result / multiplier != value) {throw new IllegalArgumentException("The value '" + text +"' cannot be re represented as 64bit number of bytes (numeric overflow).");}return result;}private static boolean matchesAny(String str, MemoryUnit unit) {for (String s : unit.getUnits()) {if (s.equals(str)) {return true;}}return false;}//......
}
复制代码
- MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
MemoryUnit
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
/*** Enum which defines memory unit, mostly used to parse value from configuration file.** <p>To make larger values more compact, the common size suffixes are supported:** <ul>* <li>q or 1b or 1bytes (bytes)* <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)* <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)* <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)* <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)* </ul>**/public enum MemoryUnit {BYTES(new String[] { "b", "bytes" }),KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),TERA_BYTES(new String[] { "t", "tb", "tebibytes" });private String[] units;MemoryUnit(String[] units) {this.units = units;}public String[] getUnits() {return units;}public static String getAllUnits() {return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());}public static boolean hasUnit(String text) {checkNotNull(text, "text");final String trimmed = text.trim();checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");final int len = trimmed.length();int pos = 0;char current;while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {pos++;}final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);return unit.length() > 0;}private static String concatenateUnits(final String[]... allUnits) {final StringBuilder builder = new StringBuilder(128);for (String[] units : allUnits) {builder.append('(');for (String unit : units) {builder.append(unit);builder.append(" | ");}builder.setLength(builder.length() - 3);builder.append(") / ");}builder.setLength(builder.length() - 3);return builder.toString();}}
复制代码
- MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配
FlinkYarnSessionCli
flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {//......private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {if (cmd.hasOption(container.getOpt())) { // number of containers is required option!LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());}// TODO: The number of task manager should be deprecated soonfinal int numberTaskManagers;if (cmd.hasOption(container.getOpt())) {numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));} else {numberTaskManagers = 1;}// JobManager Memoryfinal int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();// Task Managers memoryfinal int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);return new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(jobManagerMemoryMB).setTaskManagerMemoryMB(taskManagerMemoryMB).setNumberTaskManagers(numberTaskManagers).setSlotsPerTaskManager(slotsPerTaskManager).createClusterSpecification();}//......
}
复制代码
- FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
config.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh
//......DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary//......# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; thenFLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; thenFLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi//......if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; thenFLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; thenFLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
fi//......# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; thenJVM_ARGS=""
fi//......
复制代码
- config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
- 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
- JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
jobmanager.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instancesif [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; thenecho $USAGEexit 1
fibin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; thenif [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; thenecho "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"elseflink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})fiif [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; thenecho "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."exit 1fiif [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; thenexport JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"fi# Add JobManager-specific JVM optionsexport FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"# Startup parametersargs=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")if [ ! -z $HOST ]; thenargs+=("--host")args+=("${HOST}")fiif [ ! -z $WEBUIPORT ]; thenargs+=("--webui-port")args+=("${WEBUIPORT}")fi
fiif [[ $STARTSTOP == "start-foreground" ]]; thenexec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
复制代码
- jobmanager.sh首先调用config.sh来初始化相关变量(
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
) - 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM(
依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts
)中 - jobmanager.sh最后调用flink-console.sh来启动相关类
flink-console.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as arraybin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shcase $SERVICE in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint;;(*)echo "Unknown service '${SERVICE}'. $USAGE."exit 1;;
esacFLINK_TM_CLASSPATH=`constructFlinkClassPath`log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; thenif [ "$JAVA_VERSION" -lt 18 ]; thenJVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"fi
fiecho "Starting $SERVICE as a console application on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
复制代码
- flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN
小结
- jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
- FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
- config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
- jobmanager.sh首先调用config.sh来初始化相关变量(
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts
)中;jobmanager.sh最后调用flink-console.sh来启动相关类 - flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN
由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP环境变量(
比如FLINK_JM_HEAP=512m
),或者在flink-conf.yaml中指定jobmanager.heap.size
doc
- jobmanager.heap.size
聊聊flink JobManager的heap大小设置 1相关推荐
- 聊聊flink JobManager的heap大小设置
序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...
- Flink JobManager的HA原理分析
文章目录 前言 JobManager的HA切换通知 利用Zookeeper的领导选举与消息通知 引用 前言 在中心式管理的系统里,主节点如果只是单独服务部署的话,或多或少都会存在单点瓶颈(SPOF)问 ...
- 聊聊flink的FsStateBackend
序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...
- 聊聊flink TaskManager的managed memory
序 本文主要研究一下flink TaskManager的managed memory TaskManagerOptions flink-core-1.7.2-sources.jar!/org/apac ...
- mysql innodbuffer修改_mysql参数之innodb_buffer_pool_size大小设置
MYSQL无法启动报错日志,如下: 190801 11:24:29 mysqld_safe mysqld from pid file /var/run/mysql/mysql.pid ended 19 ...
- mysql栈空间大小_jvm 堆内存 栈内存 大小设置
Tomcat 的JVM 内存溢出问题的解决keyword: tomcat 的jvm 内存溢出问题的解决 近期在熟悉一个开发了有几年的项目,须要把数据库从mysql移植到oracle.首先把jdbc的连 ...
- 【Flink】 Flink JobManager HA 机制的扩展与实现
1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...
- 【totti】sun和IBM虚拟机堆大小设置分析
在IBM的虚拟机官方指导文档中明确指出,禁止将虚拟机的最大值和最小值设置为相等,否则会导致以下2个后果 <1>极大的增加垃圾回收时间,影响系统性能 <2>造成系统中存在内存碎片 ...
- android控制台字体大小设置,Android studio 4.1 豆沙护眼色配置、字体大小设置、内存大小设置...
1.左边目录栏颜色配置: 2.代码编辑区域背景色设置 3.控制台背景色设置 4.菜单栏.工具栏.左边栏字体大小设置 6.修改内存大小.显示内存 例如:修改android-studio/bin/stud ...
- JVM内存 大小设置
Tomcat本身不能直接在计算机上运行,需要依赖于硬件基础之上的操作系统和一个Java虚拟机.Tomcat的内存溢出本质就是JVM内存溢出,所以在本文开始时,应该先对Java JVM有关内存方面的知识 ...
最新文章
- jps命令(Java Virtual Machine Process Status Tool)(转)
- Java与C++Socket通讯注意事项
- Eclipse 创建 Java 包
- ConvertUtil-数据类型转换的工具类
- 光流 | 光流算法对比:Farneback、Horn-Schunck、Lucas-Kanade、Lucas-Kanade derivative of Gaussian(附Matlab与C++代码)
- VTK:IO之ReadExodusData
- 在SAP Cloud Platform ABAP编程环境里打印系统变量
- hdu-1728(贪心bfs的灵活运用吧)
- 关于游戏排行榜设计开发的一些总结
- 关于返回二维数组排序后序号数组的问题求解
- IOS企业应用出现无法验证,需要网络连接以在这台iPad上验证。接入互联网并重试
- 【5G RRU专题】什么是PA非线性失真?
- python代码续航的方法_编写python高质量python代码的59个有效方法
- 《程序员的创世传说》第三节 魔王与2012
- 【Python】数据可视化-散点图绘制
- 牛客小白月赛10 A,B,C,D
- can的波特率计算(基于stm32开发)
- java 接受传感器的数据_java中调用第三方接口获取数据的方式
- Oracle INSERT ALL 语句介绍
- ESP32 CAM CameraWebServer示例测试
热门文章
- “宽带中国、智慧岳阳”项目启动
- EasyUI 1.4.4 DataGrid(大数据量) bufferview滚动时不加载下一页数据解决方案
- SQLite数据库学习小结——Frameworks层实现
- window下从python开始安装科学计算环境
- 深入理解jQuery插件开发(讲解jQuery开发的好文)
- cisco ASA
- Java6开发WebService进阶
- 拜托,面试别再问我TopK了!!!
- Makefile中创建一个以当前时间为文件夹名的文件
- Linux下多线程调试以及查看信息