本例中使用非守护进程模式,与守护进程模式的比较会在后面介绍

Kafka集群的启用只需要通过./kafka-server-start.sh ../config/server.properties来进行,可以说是非常的简单,但是这个脚本之中又作了哪些事情配置了哪些配置呢,我们一步步来看看。

在kafka-server-start.sh脚本中会首先对输入的参数做合法校验,并设置当前该脚本所在文件目录为基础目录。如果没有通过环境变量$KAFKA_LOG4J_OPTS指定log4j配置文件的目录即将基础目录平级的config目录下的log4j.properties文件作为配置文件。如果没有配置KAFKA_HEAP_OPTS环境变量的则会默认设置JVM参数最大内存及初始内存都为1G(-Xmx1G -Xms1G),并配置环境变量EXTRA_ARGS为 "-name kafkaServer -loggc"在守护进程模式下为“-daemon -name kafkaServer -loggc"

注:源脚本中为变量EXTRA_ARGS赋值的语句为${EXTRA_ARGS-'-name kafkaServer -loggc'},该句意为如果EXTRA_ARGS为定义的话则返回单引号部分内容

随后执行 kafka-run-class.sh 脚本在非守护进程下并且没有配置额外参数时该命令可重写为:

./kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka ../config/server.proerpties

下面来看 Kafka-run-class.sh脚本部分内容。

kafka-run-class.sh脚本是所有kafka启动脚本的终点,对其进行逐行分析。


if [ $# -lt 1 ];
thenecho "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"exit 1
fi

第一段是判断输入参数,如果输入参数的数量小于1,提示用法结束。

# CYGINW == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; thenCYGWIN=1
elseCYGWIN=0
fiif [ -z "$INCLUDE_TEST_JARS" ]; thenINCLUDE_TEST_JARS=false
fi

其次判断是否是Cygwin的模拟UNIX环境,以及是否包括了测试的JAR包;


# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
should_include_file() {if [ "$INCLUDE_TEST_JARS" = true ]; thenreturn 0fifile=$1if [ -z "$(echo "$file" | egrep "$regex")" ] ; thenreturn 0elsereturn 1fi
}

声明函数should_include_file()函数,用来排除非必须的jar包(如果未引入测试jar选项直接返回0(真),否则判断输入的参数中是否有符合条件的包,没有返回0(真)否则返回1(假))。

base_dir=$(dirname $0)/..if [ -z "$SCALA_VERSION" ]; thenSCALA_VERSION=2.11.12
fiif [ -z "$SCALA_BINARY_VERSION" ]; thenSCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi

设置基础目录为当前目录的上级目录,设置SCALA默认版本为2..11.12,SCALA二进制版本号为SCALA版本的前两位默认为2.11.

shopt -s nullglob  #意为启用nullglobfor dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
doCLASSPATH="$CLASSPATH:$dir/*"
done

将core/build/下符合SCALA版本的包引入到环境变量中

for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
done

将examples/build/libs/目录下所有以kafka-examples开头的非test、test-doc etc.等包导入到环境变量

if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; thenclients_lib_dir=$(dirname $0)/../clients/build/libsstreams_lib_dir=$(dirname $0)/../streams/build/libsrocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
elseclients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libsstreams_lib_dir=$clients_lib_dirrocksdb_lib_dir=$streams_lib_dir
fifor file in "$clients_lib_dir"/kafka-clients*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor file in "$streams_lib_dir"/kafka-streams*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
doneif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; thenfor file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fidone
elseVERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix numberfor file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fidone
fifor file in "$rocksdb_lib_dir"/rocksdb*.jar;
doCLASSPATH="$CLASSPATH":"$file"
donefor file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
doCLASSPATH="$CLASSPATH:$dir/*"
donefor cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension"
dofor file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fidoneif [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; thenCLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"fi
done# classpath addition for release
for file in "$base_dir"/libs/*;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
done
shopt -u nullglob

这一段相对来说比较简单,只有当使用者是通过源码包进行自己编译使用时才会引入以上环境变量,如果使用二进制包安装Kafka时,该段代码可以直接略过。其中大致内容为根据环境变量更新classpath的值。

if [ -z "$CLASSPATH" ] ; thenecho "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"exit 1
fi

如果环境变量为空则提示报错并退出。

# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi# JMX port to use
if [  $JMX_PORT ]; thenKAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi

指定JVM相关参数

# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then# Log to console. This is a tool.LOG4J_DIR="$base_dir/config/tools-log4j.properties"# If Cygwin is detected, LOG4J_DIR is converted to Windows format.(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else# create logs directoryif [ ! -d "$LOG_DIR" ]; thenmkdir -p "$LOG_DIR"fi
fi# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

如果未指定Log4j目录则去config目录下加载tools-log4j.properties文件,如果检测到为模拟UNIX环境则将LINUX路径转换为WINDOWS环境下目录格式,如果目录不存在则创建。同时将KAFKA中关于LOg的配置参数也转换为win格式。

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; thenKAFKA_OPTS=""
fi# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then# Use default portsDEFAULT_JAVA_DEBUG_PORT="5005"if [ -z "$JAVA_DEBUG_PORT" ]; thenJAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"fi# Use the defaults if JAVA_DEBUG_OPTS was not setDEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"if [ -z "$JAVA_DEBUG_OPTS" ]; thenJAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"fiecho "Enabling Java debug options: $JAVA_DEBUG_OPTS"KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi

如果调试选项被配置则加载JAVA调试选项到KAFKA启动参数里面。否则到此行为止该选项参数仍为空。


# Which java to use
if [ -z "$JAVA_HOME" ]; thenJAVA="java"
elseJAVA="$JAVA_HOME/bin/java"
fi# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; thenKAFKA_HEAP_OPTS="-Xmx256M"
fi# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; thenKAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
fi

指定可执行路径以及jvm 参数

# version option
for args in "$@" ; doif [ "$args" = "--version" ]; thenexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "kafka.utils.VersionInfo"fi
done

遍历输入参数如果有--version的话则 调用kafka.utils.VersionInfo类输出INfo信息,该类的具体分析见下一节。

while [ $# -gt 0 ]; doCOMMAND=$1case $COMMAND in-name)DAEMON_NAME=$2CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.outshift 2;;-loggc)if [ -z "$KAFKA_GC_LOG_OPTS" ]; thenGC_LOG_ENABLED="true"fishift;;-daemon)DAEMON_MODE="true"shift;;*)break;;esac
done

检查输入参数中是否使用-name参数指定执行类,使用-loggc参数打印gc 日志,指定-daemon参数指定守护进程模式。本例中该段结束后各个变量的值为:

(./kafka-run-class.sh -name kafkaServer -loggc kafka.Kafka ../config/server.proerpties)

DAEMON_NAME=kafkaServer
LOG_DIR=~/logs/
CONSOLE_OUTPUT_FILE=~/logs/kafkaServer.out
GC_LOG_ENABLED=true
DAEMON_MODE=true# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; thenGC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
给GCLOG文件变量赋值。默认为kafkaServer-gc.log# The first segment of the version number, which is '1' for releases before Java 9# it then becomes '9', '10', ...# Some examples of the first line of `java --version`:# 8 -> java version "1.8.0_152"# 9.0.4 -> java version "9.0.4"# 10 -> java version "10" 2018-03-20# 10.0.1 -> java version "10.0.1" 2018-04-17# We need to match to the end of the line to prevent sed from printing the characters that do not matchJAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; thenKAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"elseKAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"fi
fi# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")

判断JAVA版本以及对jvm参数根据版本做调整,如果为模拟UNIX环境还需要将环境变量的格式做格式化。

最后部署的命令在非守护进程模式下为

exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"

$JAVA

$JAVA_HOME/bin/java

$KAFKA_HEAP_OPTS

-Xmx256M

$KAFKA_JVM_PERFORMANCE_OPTS

-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true

$KAFKA_GC_LOG_OPTS

-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

$KAFKA_JMX_OPTS

-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false

$KAFKA_LOG4J_OPTS

-Dlog4j.configuration=file:../config/tools-log4j.properties

$KAFKA_OPTS

-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT

$@

kafka.Kafka ../config/server.proerpties

kafka(1.1版本)源码阅读记——深入kafka-server-start.sh脚本相关推荐

  1. ClickHouse源码阅读(0000 1001) —— CK Server对SQL的处理

    在上篇文章中提到:服务端*Handler接收到client端SQL后会进行处理,今天具体来分析一下处理过程.会介绍一下大概的处理流程.以TCPHandler为例: dbms/programs/serv ...

  2. sfm三维重建源码_OpenMVG源码阅读小记

    "读一份好源码,就是和许多智慧的人谈话". 本文记录了笔者学习 openMVG 开源软件的一些初步经验和心得.如果你对计算机视觉和摄影测量有兴趣,需要用到相关技术,这篇文章正好就是 ...

  3. Kafka源码阅读-Controller(二)管理brokers

    上一篇kafka源码(一)correspond to/explain Kafka设计解析(二) 中的3.2.3.3.以前一直用kafka 0.8.2.x,那时候redis开始风靡,hadoop方兴未艾 ...

  4. Darknet源码阅读【吐血整理,持续更新中】

    github地址 https://github.com/BBuf/Darknet Darknet源码阅读 Darknet是一个较为轻型的完全基于C与CUDA的开源深度学习框架,其主要特点就是容易安装, ...

  5. 源码阅读:AFNetworking(十六)——UIWebView+AFNetworking

    该文章阅读的AFNetworking的版本为3.2.0. 这个分类提供了对请求周期进行控制的方法,包括进度监控.成功和失败的回调. 1.接口文件 1.1.属性 /**网络会话管理者对象*/ @prop ...

  6. 源码阅读:SDWebImage(六)——SDWebImageCoderHelper

    该文章阅读的SDWebImage的版本为4.3.3. 这个类提供了四个方法,这四个方法可分为两类,一类是动图处理,一类是图像方向处理. 1.私有函数 先来看一下这个类里的两个函数 /**这个函数是计算 ...

  7. 源码阅读:AFNetworking(八)——AFAutoPurgingImageCache

    该文章阅读的AFNetworking的版本为3.2.0. AFAutoPurgingImageCache该类是用来管理内存中图片的缓存. 1.接口文件 1.1.AFImageCache协议 这个协议定 ...

  8. webpack源码阅读——npm脚本运行webpack与命令行输入webpack的区别

    原文地址:webpack源码阅读--npm脚本执行webpack与命令行输入webpack执行的区别 如有错误,欢迎指正! webpack是目前被大家广为使用的模块打包器.从命令行输入webpack或 ...

  9. 【SeaJS】【3】seajs.data相关的源码阅读

    在SeaJS官网上推荐了源码阅读顺序,本文并没有采用这个顺序,而是按个人习惯以调试官方示例的方式进行源码阅读.早期版本作者玉伯使用了几个闭包形式,本文源码版本为2.1.1,它的编码方式个人认为更加脚本 ...

最新文章

  1. python turtle画画 30排以内_Python竟能画这么漂亮的花,帅呆了(代码分享)
  2. 机器学习实战源码数据集
  3. HDOJ 4883 TIANKENG’s restaurant
  4. 7.12固定信息认证
  5. Linux下的字符处理命令之tr命令详解
  6. 个人控件/对象命名规范(慢慢更新)
  7. leetcode981. 基于时间的键值存储(treemap)
  8. mysql 分段执行_19个MySQL优化技巧,索引优化这样做最有效!
  9. 国际图形学大会(SIGGRAPH)2017届主席竞选答辩是如此场景
  10. 机器人学中的状态估计 中文版_机器人学——学习笔记18(Minpulator Traj Planning Example)...
  11. java md5加密解密类_Java实现MD5加密解密类
  12. 【C/C++】__stdcall、__cdcel和__fastcall定义与区别
  13. 神经网络测试样本的选择,如何测试神经网络模型
  14. 跨界融合 | 零数科技正式成为上海现代服务业联合会会员单位
  15. 1062: 最大公约数 Python
  16. day14课后总结app
  17. app营销应该这样做(读书笔记)
  18. 一篇文了解电商直播的优势和平台
  19. Db2 insert got DSNISGRT:500A abend
  20. “我们的开源项目”发起人、息壤开源社区共同创始人——程旭文专访

热门文章

  1. 光谱仪工作过程及重要参数定义
  2. (附源码)spring boot网上求职招聘系统 毕业设计 081201
  3. 为什么阿里巴巴不喜欢招聘应届生?
  4. RTK基站加入3D扼流圈天线,对卫星导航有什么助益?
  5. Eudemon 200   Eudemon 200
  6. 简析在React Native中如何适配iPhoneX
  7. 网络分析与网络数据集—网络分析的实际应用
  8. chi2inv函数 matlab_matlab常用命令
  9. 单缝衍射的相对光强分布matlab代码
  10. mysql数据库添加角色_用户系统 – MySQL数据库中的多个角色