

  • linux 系统环境 :linux系统采用的是Centos 8 、安装linux软件包管理工具yum
  • 安装docker-ce
yum install -y docker-ce
  • 启动docker服务
systemctl start docker
  • 查看docker运行状态
systemctl status docker


[root@INMS-T ~]# systemctl status docker
* docker.service - Docker Application Container EngineLoaded: loaded (/usr/lib/systemd/system/docker.service; enabled; vendor preset: disabled)Active: active (running) since Tue 2021-08-31 16:54:22 CST; 3h 12min leftDocs: https://docs.docker.comMain PID: 1339 (dockerd)Tasks: 34Memory: 198.2MCGroup: /system.slice/docker.service`-1339 /usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock
  • 安装kubernetes集群

省略 。。

  • 安装istio 网关访问控制组件

省略 。。

  • 查看kubernetes运行状态
systemctl status kubelet
[root@INMS-T ~]# systemctl status kubelet
* kubelet.service - kubelet: The Kubernetes Node AgentLoaded: loaded (/usr/lib/systemd/system/kubelet.service; enabled; vendor preset: disabled)Drop-In: /etc/systemd/system/kubelet.service.d`-10-kubeadm.confActive: active (running) since Tue 2021-08-31 16:54:00 CST; 3h 5min leftDocs: http://kubernetes.io/docs/Main PID: 880 (kubelet)Tasks: 17 (limit: 62498)Memory: 161.7MCGroup: /system.slice/kubelet.service`-880 /var/lib/minikube/binaries/v1.21.2/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --cgroup-driver=systemd --config=/var/lib/kubelet/config.yaml --container-runtime=docker --hostname-override>8<D4><C2> 31 13:48:26 INMS-T kubelet[880]: I0831 13:48:26.257187     880 scope.go:111] "RemoveContainer" containerID="6e76783c081c87e40fddbadb2ad8c22b8ebed5794bcaa3921f3a2b532c5622e3"
8<D4><C2> 31 13:48:27 INMS-T kubelet[880]: I0831 13:48:27.017515     880 scope.go:111] "RemoveContainer" containerID="1c4b10aa4887dd7027c90a81f0ebff18bc5c6dfb6f29fcaadd1f5480f231533e"

二、搭建Flink 会话集群

1、构建Flink 镜像

编写dockerfile,Flink镜像采用源码包: openjava8 、flink 1.3.2 、scala 2.12

# Install dependencies
RUN set -ex; \apt-get update; \apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev; \rm -rf /var/lib/apt/lists/*# Grab gosu for easy step-down from root
RUN set -ex; \wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture)"; \wget -nv -O /usr/local/bin/gosu.asc "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture).asc"; \export GNUPGHOME="$(mktemp -d)"; \for server in ha.pool.sks-keyservers.net $(shuf -e \hkp://p80.pool.sks-keyservers.net:80 \keyserver.ubuntu.com \hkp://keyserver.ubuntu.com:80 \pgp.mit.edu) ; do \gpg --batch --keyserver "$server" --recv-keys B42F6819007F00F88E364FD4036A9C25BF357DD4 && break || : ; \done && \gpg --batch --verify /usr/local/bin/gosu.asc /usr/local/bin/gosu; \gpgconf --kill all; \rm -rf "$GNUPGHOME" /usr/local/bin/gosu.asc; \chmod +x /usr/local/bin/gosu; \gosu nobody true# Configure Flink version
ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz \FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz.asc \GPG_KEY=78A306590F1081CC6794DC7F62DAD618E07CF996 \CHECK_GPG=true# Prepare environment
ENV FLINK_HOME=/opt/flink
RUN groupadd --system --gid=9999 flink && \useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
RUN set -ex; \wget -nv -O flink.tgz "$FLINK_TGZ_URL"; \\if [ "$CHECK_GPG" = "true" ]; then \wget -nv -O flink.tgz.asc "$FLINK_ASC_URL"; \export GNUPGHOME="$(mktemp -d)"; \for server in ha.pool.sks-keyservers.net $(shuf -e \hkp://p80.pool.sks-keyservers.net:80 \keyserver.ubuntu.com \hkp://keyserver.ubuntu.com:80 \pgp.mit.edu) ; do \gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ; \done && \gpg --batch --verify flink.tgz.asc flink.tgz; \gpgconf --kill all; \rm -rf "$GNUPGHOME" flink.tgz.asc; \fi; \\tar -xf flink.tgz --strip-components=1; \rm flink.tgz; \\chown -R flink:flink .;# Configure container
COPY docker-entrypoint.sh /
COPY flink-console.sh /opt/flink/bin/
ENTRYPOINT ["/docker-entrypoint.sh"]   #入口启动脚本
EXPOSE 6123 8


#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#      http://www.apache.org/licenses/LICENSE-2.0
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
# limitations under the License.
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec flinkelse# Othersecho gosu flinkfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-1}set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}if [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenexport LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.sofi
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"# exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start "${args[@]}"
elif [ "$1" = "jobmanagerv10" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"#exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start "${args[@]}"elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"#exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start "${args[@]}"a
elif [ "$1" = "taskmanagerv10" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"#    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start "${args[@]}"fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

编写 flink-console.sh

#!/usr/bin/env bash
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#      http://www.apache.org/licenses/LICENSE-2.0
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|kubernetes-session|kubernetes-application|kubernetes-taskmanager) [args]"SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as arraybin=`dirname "$0"`
bin=`cd "$bin"; pwd`. "$bin"/config.shcase $SERVICE in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(kubernetes-session)CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;;(kubernetes-application)CLASS_TO_RUN=org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint;;(kubernetes-taskmanager)CLASS_TO_RUN=org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;;(*)echo "Unknown service '${SERVICE}'. $USAGE."exit 1;;
esacFLINK_TM_CLASSPATH=`constructFlinkClassPath`if [ "$FLINK_IDENT_STRING" = "" ]; thenFLINK_IDENT_STRING="$USER"
mkdir -p "$FLINK_PID_DIR"
# The lock needs to be released after use because this script is started foreground
command -v flock >/dev/null 2>&1
if [[ ${flock_exist} -eq 0 ]]; thenexec 200<"$FLINK_PID_DIR"flock 200
# Remove the pid file when all the processes are dead
if [ -f "$pid" ]; thenall_dead=0while read each_pid; do# Check whether the process is still runningkill -0 $each_pid > /dev/null 2>&1[[ $? -eq 0 ]] && all_dead=1done < "$pid"[ ${all_dead} -eq 0 ] && rm $pid
id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
out="${FLINK_LOG_PREFIX}.out"  #新增 STDOUT 文件输出log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")echo "Starting $SERVICE as a console application on host $HOSTNAME."# Add the current process id to pid file
echo $$ >> "$pid" 2>/dev/null# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
[[ ${flock_exist} -eq 0 ]] &&  flock -u 200exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 200<&- 2>&1 < /dev/null

2、生成Flink docker 镜像

执行语句生成 Flink image

docker build -f ./flink-1.13-scala_2.12-java8 -t flink:1.13.2 .
Sending build context to Docker daemon  2.114MB
Step 1/15 : FROM openjdk:8-jre---> da343308793f
Step 2/15 : RUN set -ex;   apt-get update;   apt-get -y install libsnappy1v5 gettext-base libjemalloc-dev;   rm -rf /var/lib/apt/lists/*---> Using cache---> aad9db3002e7
Step 3/15 : ENV GOSU_VERSION 1.11---> Using cache---> 34ee8367ae28
Step 4/15 : RUN set -ex;   wget -nv -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture)";   wget -nv -O /usr/local/bin/gosu.asc "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture).asc";   export GNUPGHOME="$(mktemp -d)";   for server in ha.pool.sks-keyservers.net $(shuf -e                           hkp://p80.pool.sks-keyservers.net:80                           keyserver.ubuntu.com                           hkp://keyserver.ubuntu.com:80                           pgp.mit.edu) ; do       gpg --batch --keyserver "$server" --recv-keys B42F6819007F00F88E364FD4036A9C25BF357DD4 && break || : ;   done &&   gpg --batch --verify /usr/local/bin/gosu.asc /usr/local/bin/gosu;   gpgconf --kill all;   rm -rf "$GNUPGHOME" /usr/local/bin/gosu.asc;   chmod +x /usr/local/bin/gosu;   gosu nobody true---> Using cache---> ec528cf7b71a
Step 5/15 : ENV FLINK_TGZ_URL=https://www.apache.org/dyn/closer.cgi?action=download&filename=flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz     FLINK_ASC_URL=https://www.apache.org/dist/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz.asc     GPG_KEY=78A306590F1081CC6794DC7F62DAD618E07CF996     CHECK_GPG=true---> Using cache---> b020ef331780
Step 6/15 : ENV FLINK_HOME=/opt/flink---> Using cache---> e9a25984a5ac
Step 7/15 : ENV PATH=$FLINK_HOME/bin:$PATH---> Using cache---> 8a3fb6109932
Step 8/15 : RUN groupadd --system --gid=9999 flink &&     useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink---> Using cache---> cc208dfcf4c2
Step 9/15 : WORKDIR $FLINK_HOME---> Using cache---> b2773e788cd5
Step 10/15 : RUN set -ex;   wget -nv -O flink.tgz "$FLINK_TGZ_URL";     if [ "$CHECK_GPG" = "true" ]; then     wget -nv -O flink.tgz.asc "$FLINK_ASC_URL";     export GNUPGHOME="$(mktemp -d)";     for server in ha.pool.sks-keyservers.net $(shuf -e                             hkp://p80.pool.sks-keyservers.net:80                             keyserver.ubuntu.com                             hkp://keyserver.ubuntu.com:80                             pgp.mit.edu) ; do         gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ;     done &&     gpg --batch --verify flink.tgz.asc flink.tgz;     gpgconf --kill all;     rm -rf "$GNUPGHOME" flink.tgz.asc;   fi;     tar -xf flink.tgz --strip-components=1;   rm flink.tgz;     chown -R flink:flink .;---> Using cache---> 0cebe1ff3a6b
Step 11/15 : COPY docker-entrypoint.sh /---> Using cache---> 89211b835ea6
Step 12/15 : COPY flink-console.sh /opt/flink/bin/---> Using cache---> b3dca2180744
Step 13/15 : ENTRYPOINT ["/docker-entrypoint.sh"]---> Using cache---> 7ed0e9daaecc
Step 14/15 : EXPOSE 6123 8081---> Using cache---> 92a85c820d14
Step 15/15 : CMD ["help"]---> Using cache---> ab7b8b26049b
Successfully built ab7b8b26049b
Successfully tagged flink:1.13.2

查看flink docker image

[root@INMS-T bin]# docker images flink:1.13.2
flink        1.13.2    ab7b8b26049b   38 hours ago   625MB
[root@INMS-T bin]# 

如需要 Flink on docker 请到官方连接查找 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/ops/

3、通过Kubernetes - configmap 构建flink环境变量配置


apiVersion: v1
kind: ConfigMap
metadata:name: flink-env-confignamespace: istio-app
data:JOB_MANAGER_RPC_ADDRESS: flink-jm-rpc-service.istio-appLOG: /opt/flink/log

执行命令生成 flink configmap

kubectl apply -f flink-env-config.yaml

查看configmap 列表是否有 flink-env-config

LOG: /opt/flink/log[root@INMS-T ~]# kubectl get cm -A
NAMESPACE         NAME                                  DATA   AGE
default           flink-env-config                      2      3d16h
default           istio-ca-root-cert                    1      3d16h
default           kube-root-ca.crt                      1      3d16h
istio-system      istio                                 2      3d16h
istio-system      istio-ca-root-cert                    1      3d16h
istio-system      istio-gateway-leader                  0      3d16h
istio-system      istio-leader                          0      3d16h
istio-system      istio-namespace-controller-election   0      3d16h
istio-system      istio-sidecar-injector                2      3d16h
istio-system      kube-root-ca.crt                      1      3d16h
kube-node-lease   kube-root-ca.crt                      1      3d16h
kube-public       cluster-info                          1      3d16h
kube-public       kube-root-ca.crt                      1      3d16h
kube-system       coredns                               1      3d16h
kube-system       extension-apiserver-authentication    6      3d16h
kube-system       kube-proxy                            2      3d16h
kube-system       kube-root-ca.crt                      1      3d16h
kube-system       kubeadm-config                        2      3d16h
kube-system       kubelet-config-1.21                   1      3d16h

查看 flink-env-config key、value是否正确

[root@INMS-T ~]# kubectl describe cm flink-env-config
Name:         flink-env-config
Namespace:    istio-app
Labels:       <none>
Annotations:  <none>Data
====Events:  <none>

4、生成 flink 的 job- task 通信服务

编写 yaml

[root@INMS-T ~]# cat  flink-rpc-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jm-rpc-servicenamespace: istio-app
spec:type: ClusterIP ports:- protocol: TCPport: 6123targetPort: 6123name: rpc- protocol: TCPport: 6124targetPort: 6124name: blob- protocol: TCPport: 6125targetPort: 6125name: queryselector:app: flinkcomponent: jobmanager


kubectl apply -f flink-jm-rpc-service.yaml

5、生成Flink Job-manager 并启动

编写 Jobmanager.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanagernamespace: istio-applabels:app: flink
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: flink:1.13.2imagePullPolicy: IfNotPresentworkingDir:  /opt/flinkargs: ["jobmanagerv10"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob- containerPort: 6125name: query- containerPort: 8081name: uienv:- name: JOB_MANAGER_RPC_ADDRESSvalueFrom:configMapKeyRef:name: flink-env-configkey: JOB_MANAGER_RPC_ADDRESS- name: FLINK_LOG_DIRvalueFrom:configMapKeyRef:name: flink-env-configkey: LOG- name: TZvalue: Asia/Shanghai

执行 Jobmanager.yaml 生成jobmanager 并启动

kubectl apply -f flink-jobmanager.yaml
[root@INMS-T ~]# kubectl get pod -o wide
NAME                                   READY   STATUS    RESTARTS   AGE   IP           NODE     NOMINATED NODE   READINESS GATES
flink-jobmanager-86d8d7df5d-hdd2k      1/1     Running   0          84m   inms-t   <none>           <none>
flink-taskmanager-1-7cf4c7b8f-tg4f8    1/1     Running   0          84m   inms-t   <none>           <none>
flink-taskmanager-2-7949bdb4b4-lbwr7   1/1     Running   0          84m   inms-t   <none>           <none>



apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager-1namespace: istio-applabels:app: flink
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: flink:1.13.2imagePullPolicy: IfNotPresentargs: ["taskmanagerv10"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: queryenv:- name: JOB_MANAGER_RPC_ADDRESSvalueFrom:configMapKeyRef:name: flink-env-configkey: JOB_MANAGER_RPC_ADDRESS- name: FLINK_LOG_DIRvalueFrom:configMapKeyRef:name: flink-env-configkey: LOG- name: TZvalue: Asia/Shanghai

执行 taskmanager.yaml 生成taskmanager 并启动

kubectl apply -f flink-taskmanager.yaml
[root@INMS-T ~]# kubectl get pod -o wide
NAME                                   READY   STATUS    RESTARTS   AGE   IP           NODE     NOMINATED NODE   READINESS GATES
flink-jobmanager-86d8d7df5d-hdd2k      1/1     Running   0          84m   inms-t   <none>           <none>
flink-taskmanager-1-7cf4c7b8f-tg4f8    1/1     Running   0          84m   inms-t   <none>           <none>
flink-taskmanager-2-7949bdb4b4-lbwr7   1/1     Running   0          84m   inms-t   <none>           <none>



apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-restnamespace: istio-app
spec:ports:- name: restport: 8081targetPort: 8081selector:app: flinkcomponent: jobmanager


kubectl apply -f flink-web-ui.yaml

8.添加istio 网关和虚拟服务


apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:name: flink-gatewaynamespace: istio-app
spec:selector:istio: ingressgateway # use istio default controllerservers:- port:number: 8081name: httpprotocol: HTTPhosts:- "*"
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:name: flink-vsnamespace: istio-app
spec:hosts:- "*"gateways:- flink-gatewayhttp:- route:- destination:host: flink-jobmanager-restport:number: 8081


kubectl apply -f flink-gateway-vs.yaml

9.暴露flink ui 服务

kubectl edit svc istio-ingressgateway  -n istio-system


可以看到1 个job 2个task
至此搭建kubernetes 上搭建 flink 集群完毕。


