推荐阅读

Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506
Helm3(K8S 资源对象管理工具)博客专栏:https://blog.csdn.net/xzk9381/category_10895812.html

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

本文完整介绍了在K8S中搭建 Kafka 和 Zookeeper 集群,并且通过修改镜像,实现扩容 Pod 时自动扩展 brokerID 和 zookeeper 集群信息,无需手动干预。

一、服务版本信息:

  • Kafka:v2.13-2.6.0
  • Zookeeper:v3.6.2
  • Kubernetes:v1.18.4

二、制作 Zookeeper 镜像

Zookeeper 使用的是 docker hub 中提供的官方镜像,使用如下命令可以直接下载:

docker pull zookeeper:3.6.2

由于官方镜像中使用的启动脚本不适用于我们公司内部使用,所以对其中的 docker-entrypoint.sh 脚本和 Dockerfile 进行了一些修改。

1. 修改 docker-entrypoint.sh 脚本

修改后的 docker-entrypoint.sh 脚本如下(原脚本内容可参考:https://github.com/31z4/zookeeper-docker/tree/2373492c6f8e74d3c1167726b19babe8ac7055dd/3.6.2):

#!/bin/bashset -eHOST=$(hostname -s)
DOMAIN=$(hostname -d)
CLIENT_PORT=2181
SERVER_PORT=2888
ELECTION_PORT=3888function createConfig(){if [[ ! -f "$ZOO_CONF_DIR/${HOST}/zoo.cfg" ]]; thenmkdir -p $ZOO_CONF_DIR/${HOST}mkdir -p $ZOO_DATA_DIR/${HOST}mkdir -p $ZOO_DATA_LOG_DIR/${HOST}CONFIG="$ZOO_CONF_DIR/${HOST}/zoo.cfg"{echo "dataDir=$ZOO_DATA_DIR/${HOST}"echo "dataLogDir=$ZOO_DATA_LOG_DIR/${HOST}"echo "tickTime=$ZOO_TICK_TIME"echo "initLimit=$ZOO_INIT_LIMIT"echo "syncLimit=$ZOO_SYNC_LIMIT"echo "autopurge.snapRetainCount=$ZOO_AUTOPURGE_SNAPRETAINCOUNT"echo "autopurge.purgeInterval=$ZOO_AUTOPURGE_PURGEINTERVAL"echo "maxClientCnxns=$ZOO_MAX_CLIENT_CNXNS"echo "standaloneEnabled=$ZOO_STANDALONE_ENABLED"echo "admin.enableServer=$ZOO_ADMINSERVER_ENABLED"} >> ${CONFIG}if [[ -n $ZOO_4LW_COMMANDS_WHITELIST ]]; thenecho "4lw.commands.whitelist=$ZOO_4LW_COMMANDS_WHITELIST" >> ${CONFIG}fifor cfg_extra_entry in $ZOO_CFG_EXTRA; doecho "$cfg_extra_entry" >> ${CONFIG}donefi
}function getHostNum(){if [[ $HOST =~ (.*)-([0-9]+)$ ]]; thenNAME=${BASH_REMATCH[1]}ORD=${BASH_REMATCH[2]}elseecho "Fialed to parse name and ordinal of Pod"exit 1fi
}function createID(){ID_FILE="$ZOO_DATA_DIR/${HOST}/myid"MY_ID=$((ORD+1))echo $MY_ID > $ID_FILE
}function addServer(){for (( i=1; i<=$SERVERS; i++ ))dos="server.$i=$NAME-$((i-1)).$DOMAIN:$SERVER_PORT:$ELECTION_PORT;$CLIENT_PORT"[[ $(grep "$s" $ZOO_CONF_DIR/${HOST}/zoo.cfg) ]] || echo $s >> $ZOO_CONF_DIR/${HOST}/zoo.cfgdone
}function userPerm(){if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; thenchown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" "$ZOO_CONF_DIR"exec gosu zookeeper "$0" "$@"fi
}function startZK(){/apache-zookeeper-3.6.2-bin/bin/zkServer.sh --config "$ZOO_CONF_DIR/$(hostname -s)" start-foreground
}createConfig
getHostNum
createID
addServer
userPerm
startZK

2. 修改 Dockerfile

我这里对于 Dockerfile 的改动很小,只是将原来的 ENTRYPOINT 配置项注释掉,CMD 配置项更改为由 docker-entrypoint.sh 启动:

FROM openjdk:11-jre-slimENV ZOO_CONF_DIR=/conf \ZOO_DATA_DIR=/data \ZOO_DATA_LOG_DIR=/datalog \ZOO_LOG_DIR=/logs \ZOO_TICK_TIME=2000 \ZOO_INIT_LIMIT=5 \ZOO_SYNC_LIMIT=2 \ZOO_AUTOPURGE_PURGEINTERVAL=0 \ZOO_AUTOPURGE_SNAPRETAINCOUNT=3 \ZOO_MAX_CLIENT_CNXNS=60 \ZOO_STANDALONE_ENABLED=true \ZOO_ADMINSERVER_ENABLED=true# Add a user with an explicit UID/GID and create necessary directories
RUN set -eux; \groupadd -r zookeeper --gid=1000; \useradd -r -g zookeeper --uid=1000 zookeeper; \mkdir -p "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"; \chown zookeeper:zookeeper "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"# Install required packges
RUN set -eux; \apt-get update; \DEBIAN_FRONTEND=noninteractive \apt-get install -y --no-install-recommends \ca-certificates \dirmngr \gosu \gnupg \netcat \wget; \rm -rf /var/lib/apt/lists/*; \
# Verify that gosu binary worksgosu nobody trueARG GPG_KEY=BBE7232D7991050B54C8EA0ADC08637CA615D22C
ARG SHORT_DISTRO_NAME=zookeeper-3.6.2
ARG DISTRO_NAME=apache-zookeeper-3.6.2-bin# Download Apache Zookeeper, verify its PGP signature, untar and clean up
RUN set -eux; \ddist() { \local f="$1"; shift; \local distFile="$1"; shift; \local success=; \local distUrl=; \for distUrl in \'https://www.apache.org/dyn/closer.cgi?action=download&filename=' \https://www-us.apache.org/dist/ \https://www.apache.org/dist/ \https://archive.apache.org/dist/ \; do \if wget -q -O "$f" "$distUrl$distFile" && [ -s "$f" ]; then \success=1; \break; \fi; \done; \[ -n "$success" ]; \}; \ddist "$DISTRO_NAME.tar.gz" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz"; \ddist "$DISTRO_NAME.tar.gz.asc" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz.asc"; \export GNUPGHOME="$(mktemp -d)"; \gpg --keyserver ha.pool.sks-keyservers.net --recv-key "$GPG_KEY" || \gpg --keyserver pgp.mit.edu --recv-keys "$GPG_KEY" || \gpg --keyserver keyserver.pgp.com --recv-keys "$GPG_KEY"; \gpg --batch --verify "$DISTRO_NAME.tar.gz.asc" "$DISTRO_NAME.tar.gz"; \tar -zxf "$DISTRO_NAME.tar.gz"; \mv "$DISTRO_NAME/conf/"* "$ZOO_CONF_DIR"; \rm -rf "$GNUPGHOME" "$DISTRO_NAME.tar.gz" "$DISTRO_NAME.tar.gz.asc"; \chown -R zookeeper:zookeeper "/$DISTRO_NAME"WORKDIR $DISTRO_NAME
VOLUME ["$ZOO_DATA_DIR", "$ZOO_DATA_LOG_DIR", "$ZOO_LOG_DIR"]EXPOSE 2181 2888 3888 8080ENV PATH=$PATH:/$DISTRO_NAME/bin \ZOOCFGDIR=$ZOO_CONF_DIRCOPY docker-entrypoint.sh /# 将 ENTRYPOINT 内容注释
# ENTRYPOINT ["/docker-entrypoint.sh"]# 将原 CMD 注释,并新增下面的配置
# CMD ["zkServer.sh", "start-foreground"]
CMD ["/docker-entrypoint.sh"]

3. 打包镜像并上传私服

在 Dockerfile 的根目录下,使用如下命令打包镜像,并修改 tag

docker build --tag 10.16.12.204/ops/zookeeper:custom-v3.6.2 -f Dockerfile .

上传至镜像仓库:

docker push 10.16.12.204/ops/zookeeper:custom-v3.6.2

三、制作 Kafka 镜像

制作 Kafka 镜像是基于 docker hub 中 wurstmeister 制作的镜像,原镜像文件可使用如下命令下载:

docker pull wurstmeister/kafka:2.13-2.6.0

这个镜像中使用 start-kafka.sh 脚本来初始化 Kafka 的配置并启动,但是其中有些内容不符合在 K8S 中部署的需求,所以对该脚本进行修改。

1. 修改 start-kafka.sh 脚本

原始的 start-kafka.sh 脚本内容可到 https://github.com/wurstmeister/kafka-docker 中查看。修改后的内容如下:

#!/bin/bash -e# Allow specific kafka versions to perform any unique bootstrap operations
OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh"
if [[ -x "$OVERRIDE_FILE" ]]; thenecho "Executing override file $OVERRIDE_FILE"eval "$OVERRIDE_FILE"
fi# Store original IFS config, so we can restore it at various stages
ORIG_IFS=$IFSif [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; thenecho "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT"exit 1
fiif [[ -z "$KAFKA_PORT" ]]; thenexport KAFKA_PORT=9092
ficreate-topics.sh &
unset KAFKA_CREATE_TOPICSif [[ -z "$KAFKA_BROKER_ID" ]]; thenif [[ -n "$BROKER_ID_COMMAND" ]]; thenKAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND")export KAFKA_BROKER_IDelseexport KAFKA_BROKER_ID=-1fi
fiif [[ -z "$KAFKA_LOG_DIRS" ]]; thenexport KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fiif [[ -n "$KAFKA_HEAP_OPTS" ]]; thensed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh"unset KAFKA_HEAP_OPTS
fiif [[ -n "$HOSTNAME_COMMAND" ]]; thenHOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND")# Replace any occurences of _{HOSTNAME_COMMAND} with the valueIFS=$'\n'for VAR in $(env); doif [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; theneval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"fidoneIFS=$ORIG_IFS
fiif [[ -n "$PORT_COMMAND" ]]; thenPORT_VALUE=$(eval "$PORT_COMMAND")# Replace any occurences of _{PORT_COMMAND} with the valueIFS=$'\n'for VAR in $(env); doif [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; theneval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"fidoneIFS=$ORIG_IFS
fiif [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; thenKAFKA_BROKER_RACK=$(eval "$RACK_COMMAND")export KAFKA_BROKER_RACK
fiif [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; thenif [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; thenecho "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS"exit 1elif [[ -z "$HOSTNAME_VALUE" ]]; thenecho "ERROR: No listener or advertised hostname configuration provided in environment."echo "       Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME"exit 1fiexport KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
fiecho "" >> "$KAFKA_HOME/config/server.properties"(function updateConfig() {key=$1value=$2file=$3echo "[Configuring] '$key' in '$file'"if grep -E -q "^#?$key=" "$file"; thensed -r -i "s@^#?$key=.*@$key=$value@g" "$file" elseecho "$key=$value" >> "$file"fi}# KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|'EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|"IFS=$'\n'for VAR in $(env)doenv_var=$(echo "$VAR" | cut -d= -f1)if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; thenecho "Excluding $env_var from broker config"continuefiif [[ $env_var =~ ^KAFKA_ ]]; thenkafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties"fiif [[ $env_var =~ ^LOG4J_ ]]; thenlog4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .)updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties"fidonePODNAME=$(hostname -s | awk -F'-' 'OFS="-"{$NF="";print}' |sed 's/-$//g')for ((i=0;i<$SERVERS;i++))doBOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT},"doneBOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?}echo ${BOOTSTRAP_SERVERS} > /opt/log.txtsed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.propertiessed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties
)if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; theneval "$CUSTOM_INIT_SCRIPT"
fiexec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"

2. 修改 Dockerfile

Dockerfile 未做其他修改,只是将修改后的 start-kafka.sh 脚本添加到镜像中,并使用 bash 环境来执行脚本(否则会有些命令无法执行):

FROM wurstmeister/kafka:2.13-2.6.0ADD start-kafka.sh /CMD ["bash","start-kafka.sh"]

3. 打包镜像并上传私服

使用如下命令重新打包镜像并修改 tag:

docker build --tag 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 -f Dockerfile .

将镜像上传至镜像仓库:

docker push 10.16.12.204/ops/kafka:custom-v2.13-2.6.0

四、创建命名空间

整个 Kafka 和 Zookeeper 集群都要在同一个命名空间下,所以使用如下 yaml 文件创建 ns-kafka 命名空间:

---
apiVersion: v1
kind: Namespace
metadata:name: ns-kafkalabels:name: ns-kafka

五、创建 Secret

Kubelet 到镜像仓库中拉取镜像需要进行验证,所以创建一个用于验证 Harbor 仓库的 Secret:

kubectl create secret docker-registry harbor-secret --namespace=ns-kafka --docker-server=http://10.16.12.204 --docker-username=admin --docker-password=Harbor12345

六、创建 PV 和 PVC

在此次搭建集群的过程中,计划让 Kafka 集群和 Zookeeper 集群使用同一个 PV。在前面定义 Pod 初始化脚本时可以看到,Kafka 和 Zookeeper 中的数据目录以及日志目录,都是在以自己主机名命名的目录下,所以即便使用同一个 PV,也可以对目录进行区分。创建 PV 和 PVC 的 yaml 文件内容如下:

---
apiVersion: v1
kind: PersistentVolume
metadata:name: kafka-data-pv
spec:accessModes:- ReadWriteManycapacity:storage: 500Gilocal:path: /opt/ops_ceph_data/kafka_datanodeAffinity:required:nodeSelectorTerms:- matchExpressions:- key: kafka-clusteroperator: Invalues:- "true"persistentVolumeReclaimPolicy: Retain
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:name: kafka-data-pvcnamespace: ns-kafka
spec:accessModes:- ReadWriteManyresources:requests:storage: 500Gi

需要声明的一点是,我当前使用的存储是 cephfs,并将其挂载到 K8S 的各个节点的 /opt/ops_ceph_data 目录下,所以在创建 PV 的时候使用的存储类型是 local。

七、创建 Labels

由于上面创建 PV 时指定的存储类型是 local,这个 PV 只能在满足指定 Label 的节点中进行调度,所以为集群中的所有节点添加一个 label:

for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done

八、创建 Zookeeper 集群

1. 创建 Service

创建用于 Zookeeper 与其他节点通信的 Service,yaml 文件内容如下:

---
apiVersion: v1
kind: Service
metadata:name: zk-inner-servicenamespace: ns-kafkalabels:app: zk
spec:selector:app: zkclusterIP: Noneports:- name: serverport: 2888- name: leader-electionport: 3888
---
apiVersion: v1
kind: Service
metadata:name: zk-client-servicenamespace: ns-kafkalabels:app: zk
spec:selector:app: zktype: NodePortports:- name: clientport: 2181nodePort: 31811

2. 创建 StatefulSet

Zookeeper 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:

---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zknamespace: ns-kafka
spec:selector:matchLabels:app: zkserviceName: "zk-inner-service"replicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: Paralleltemplate:metadata:labels:app: zkspec:containers:- name: zkimagePullPolicy: Alwaysimage: 10.16.12.204/ops/zookeeper:custom-v3.6.2resources:requests:memory: "500Mi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electionenv:- name: SERVERSvalue: "3"- name: ZOO_CONF_DIRvalue: /opt/conf- name: ZOO_DATA_DIRvalue: /opt/data- name: ZOO_DATA_LOG_DIRvalue: /opt/data_logvolumeMounts:- name: zookeeper-datamountPath: /opt/datasubPath: zookeeper-cluster-data/data- name: zookeeper-datamountPath: /opt/data_logsubPath: zookeeper-cluster-data/data_log- name: data-confmountPath: /etc/localtimeimagePullSecrets:- name: harbor-secretvolumes:- name: zookeeper-datapersistentVolumeClaim:claimName: kafka-data-pvc- name: data-confhostPath:path: /usr/share/zoneinfo/Asia/Shanghai

3. 验证集群状态

集群搭建完成后,查看 zookeeper 各个节点当前的状态,使用如下命令:

[@k8s-master1 /]# for i in  0 1 2; do kubectl exec -it zk-$i  -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-0/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-1/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
ZooKeeper JMX enabled by default
Using config: /opt/conf/zk-2/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

可以看到当前集群中是一个 leader,两个follower。接下来验证集群各个节点的消息同步,首先在 zk-0 节点上创建一个信息:

[@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] create /testMessage Hello
Created /testMessage

在其他两个节点上查看这条消息:

[@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get /testMessage
Hello[@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get /testMessage
Hello

可以正常看到消息,代表集群当前运行正常。

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

九、创建 Kafka 集群

1. 创建 Service

创建用于 Kafka 通信的 Service,yaml 文件内容如下:

---
apiVersion: v1
kind: Service
metadata:name: kafka-servicenamespace: ns-kafkalabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka

2. 创建 StatefulSet

Kafka 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:

---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafkanamespace: ns-kafka
spec:selector:matchLabels:app: kafkaserviceName: "kafka-service"replicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: Paralleltemplate:metadata:labels:app: kafkaspec:imagePullSecrets:- name: harbor-secretcontainers:- name: kafkaimagePullPolicy: Alwaysimage: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0resources:requests:memory: "500Mi"cpu: "0.5"env:- name: SERVERS                      # 要确保 SERVERS 设置的值与副本数一致value: "3"- name: KAFKA_LISTENERSvalue: "PLAINTEXT://:9092"- name: KAFKA_ZOOKEEPER_CONNECT      # 设置 Zookeeper 连接地址value: "zk-inner-service.ns-kafka.svc.cluster.local:2181"- name: KAFKA_PORTvalue: "9092"- name: KAFKA_MESSAGE_MAX_BYTESvalue: "20000000"- name: BROKER_ID_COMMAND            # 这个变量用于在容器内部生成一个 broker idvalue: "hostname | awk -F'-' '{print $NF}'"volumeMounts:- name: kafka-log                    # 只需要将 kafka 的 log 目录持久化存储mountPath: /kafkasubPath: kafka-cluster-log- name: data-confmountPath: /etc/localtimevolumes:- name: kafka-logpersistentVolumeClaim:claimName: kafka-data-pvc- name: data-confhostPath:path: /usr/share/zoneinfo/Asia/Shanghai

3. 验证集群状态

3.1 在 Zookeeper 中查看 broker
[@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
Connecting to localhost:2181[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper][zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics][zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2][zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"}[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"}[zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"}

可以看到 3 个 broker 都已经在 zookeeper 中注册了。

3.2 创建 Topic

在 kafka-0 节点中创建一个名为 Message 的 topic,3个分区,3个副本:

[@k8s-master1 ~]# kubectl exec -it kafka-0 -n ns-kafka -- /bin/bash
bash-4.4# kafka-topics.sh --create --topic Message --zookeeper zk-inner-service.ns-kafka.svc.cluster.local:2181 --partitions 3 --replication-factor 3
Created topic Message.

在 zk-1 节点中查看是否存在这个 Topic:

[@k8s-master1 ~]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[Message]

可以看到 Zookeeper 中已经存在这个 Topic 了。

3.3 模拟生产者和消费者

首先 在 kafka-1 上模拟生产者向 Message 中写入消息:

[@k8s-master1 ~]# kubectl exec -it kafka-1 -n ns-kafka -- /bin/bash
bash-4.4# kafka-console-producer.sh --topic Message --broker-list kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092
>This is a test message
>Welcome to Kafka

然后在 kafka-2 中模拟消费者消费这些信息:

[@k8s-master1 ~]# kubectl exec -it kafka-2 -n ns-kafka -- /bin/bash
bash-4.4# kafka-console-consumer.sh --topic Message --bootstrap-server kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 --from-beginningThis is a test message
Welcome to Kafka

可以正常生产消息和消费消息,代表 Kafka 集群运行正常。

十、FAQ

1. 如何在 yaml 文件中指定要创建的 Topic

在 yaml 文件中指定如下 env,即可在 Pod 启动自动创建 Topic:

env:- name: KAFKA_CREATE_TOPICSvalue: "Topic1:1:3,Topic2:1:1:compact"

上面的内容代表 Topic1 会有 1 个分区,3个副本,Topic2 会有 1 个分区,1 个副本并且副本的 cleanup.policy 设置为 compact。

自动创建 Topic 一定要设置 KAFKA_CREATE_TOPICS 变量,然后会由 create_topic.sh 脚本(镜像中存在)根据变量内容自动创建。

2. 为 Topic 设置的 compaction 不生效

可参考网址:https://github.com/wurstmeister/kafka-docker/wiki#topic-compaction-does-not-work

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

Kubernetes 搭建 Kafka 和 Zookeeper 集群(完整教程)相关推荐

  1. ZooKeeper1 利用虚拟机搭建自己的ZooKeeper集群

    前言:       前段时间自己参考网上的文章,梳理了一下基于分布式环境部署的业务系统在解决数据一致性问题上的方案,其中有一个方案是使用ZooKeeper,加之在大数据处理中,ZooKeeper确实起 ...

  2. kafka 、 zookeeper 集群(一)

    一.入门 1.简介 Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同 ...

  3. linux下kafka与zookeeper集群部署

    *********************************配置主机名,通过主机名连接机器********************************* 比如说,已经有了三台主机 1,在li ...

  4. linux安装zookeeper集群保姆教程,包括集群启停脚本

    三台机器安装zookeeper集群 注意事项:安装前三台机器一定要保证时钟同步 说明: 我这里是有3台服务器,分别hostname为node01.node02.node03. 三台机器已经配置好了ss ...

  5. Kafka与zookeeper集群配置安装(Zk集群至少三台)

    1.上传解包 tar -zxvf kafka_2.12-2.7.1.tgz -C /usr/hadoop/ 2.Kafka配置 主要的配置项: broker.id=1log.dirs=/usr/had ...

  6. STORM_0001_用vmware拷贝出三个相同的ubuntu搭建小的zookeeper集群

    第一次配置zookeeper的集群 因为想运行storm必须搭建集群 在自己的电脑上拷贝了自己的ubuntu虚拟机 采用的是vmware给虚拟机分配的地址 三个机器的配置基本上一样除了myid这个文件 ...

  7. Kafka、zookeeper集群一键启动、一键关闭

    在学习大数据的过程中,经常用到集群,软件的启停一个一个点击很麻烦 1.Kafka的一键启动和一键关闭(onekey路径下) 目录/export中新建onekey文件夹 mkdir onekey 进入文 ...

  8. kubernetes搭建 二、Etcd集群

    etcd需要在master和node上都部署,首先在master上部署,然后把相关的二进制文件和程序拷贝到node上再修改即可 1.下载二进制包然后解压 wget https://github.com ...

  9. 理解和实现分布式TensorFlow集群完整教程

    手把手教你搭建分布式集群,进入生产环境的TensorFlow 分布式TensorFlow简介 前一篇<分布式TensorFlow集群local server使用详解>我们介绍了分布式Ten ...

最新文章

  1. llist对象两个属性相乘在相加_Java8使用stream实现list中对象属性的合并(去重并求和)...
  2. docker启动远程管理接口
  3. 鼠标点上去怎么那向上html,鼠标上去文字上移.html
  4. iOS开发之ReplayKit框架学习
  5. kafka配置文件 中文乱码_Kafka学习之配置文件详解
  6. mysql实战38 | 都说InnoDB好,那还要不要使用Memory引擎?
  7. 随机森林筛选特征Matlab,特征筛选(随机森林)
  8. 【转载】git生成SSH步骤
  9. 难道现在是保险业的高速发展期?
  10. Linux下出现command not found的解决办法
  11. 什么是编解码器codec
  12. visio一分二的箭头_visio双箭头怎么画? visio2013绘制双箭头直线的教程
  13. 【DNSPOD】利用DNSPod实现动态域名解析【DDNS】
  14. Linux Swap 介绍
  15. iol植入手术过程_卢奕教授专访:领略Phaco+IOL+Express植入术
  16. 旷视科技——身份验证、活体检测、人脸比对
  17. 钢条切割(记忆型递归)dp
  18. docker mysql 启动命令_Mac 下使用 Docker mysql 运行的容器如果挂载了/var/lib/mysql 不能启动容器...
  19. css适配iphonex底部安全区
  20. 廖雪峰Java高级架构师第一期

热门文章

  1. 多元高斯分布的边缘概率和条件概率
  2. 矩阵(二维数组)的性质在算法求解中的应用
  3. Python Tricks(二十)—— 阶乘的极简实现
  4. Python 面向对象 —— 特殊函数(setattr、getattr、hasattr)
  5. Tricks(二十六) —— 妙用关键字参数(keyword arguments)创建字典
  6. 江苏高考成绩什么时候可以查询2021,2021年江苏高考成绩什么时候公布出来,几月几号几点钟可以查询...
  7. 遥感分类误差矩阵_遥感卫星影像之分类精度评价
  8. WPS怎么统计相同名称的数据_群发邮件平台的数据统计怎么用
  9. 精通python工资高吗-2020年10月程序员工资再创新高
  10. python学习手册-Python学习手册(第4版) 中文版.pdf