Flink作为新一代的大数据处理引擎,不仅是业内公认的最好的流处理引擎,而且具备机器学习等多种强大计算功能,用户只需根据业务逻辑开发一套代码,无论是全量数据还是增量数据,亦或者实时处理,一套方案即可全部解决。K8S是业内最流行的容器编排工具,与docker容器技术结合,可以提供比Yarn与Mesos更强大的集群资源管理功能,成为容器云的主要解决方案之一。如果能将两者结合,无疑是双剑合璧,对生产效能有着巨大的提升。本文将介绍目前为止,Flink On K8S的最前沿实现方案。

Flink集群架构

如下图所示,Flink集群中一个 JobManger 和若干个TaskManager。由 Client 提交任务给 JobManager,JobManager再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的JVM进程。

Client是提交Job的客户端,可以是运行在任何机器上(与JobManager 环境连通即可),也可以运行在容器中。提交Job后,Client可以结束进程(Streaming的任务),也可以不结束并等待结果返回。

JobManager主要负责调度Job并协调Task做checkpoint。从Client处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以Task粒度调度到各个TaskManager上去执行。

TaskManager在启动的时候就设置好了槽位数(Slot),每个slot能启动一个Task,Task为线程。从JobManager处接收需要部署的Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

可以看到Flink的任务调度是多线程模型,并且不同Job/Task混合在一个 TaskManager 进程中。

目前在K8S中执行Flink任务的方式有两种,一种是Standalone,一种是原生模式。

Standalone模式

在K8S中启动Flink集群

Flink on Kubernetes 的架构如图所示,Flink 任务在 Kubernetes 上运行的步骤有:

  • 首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker 的 container。
  • Master Container 中会启动 Flink Master Process,包含 Flink-Container ResourceManager、JobManager 和 Program Runner。
  • Worker Container 会启动 TaskManager,并向负责资源管理的 ResourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务分给 Container,再由 Container 去执行。
  • 需要说明的是,Master Container 与Worker Container是用一个镜像启动的,只是启动参数不一样,如下图所示,两个deployment文件的image都是flink:latest。

计算任务可以以Session模式与Per-Job模式运行提交:

  • Session模式:先启动一个Flink集群,然后向该集群提交任务,所有任务共用JobManager。任务提交速度快,适合频繁提交运行的短时间任务。
  • Per-Job模式:每提交一个任务,单独启动一个集群运行该任务,运行结束集群被删除,资源也被释放。任务启动较慢,适合于长时间运行的大型任务。

Session 模式

在Session模式下,需要先启动一个Flink集群,然后向该集群提交任务,主要步骤为:先将集群配置定义为ConfigMap、然后通过官方资源描述文件分别启动JobManager与一定数量的TaskManager,最后在flink客户端向这个启动的Flink集群中提交任务。

定义ConfigMap

对于 JobManager 和 TaskManager 运行过程中需要的一些配置文件,如:flink-conf.yaml、hdfs-site.xml、core-site.xml,可以通过flink-configuration-configmap.yaml文件将它们定义为 ConfigMap 来实现配置的传递和读取。如果使用默认配置,这一步则不需要。

kubectl create -f flink-configuration-configmap.yaml

启动JobManager

JobManager 的执行过程分为两步:

  • 首先,JobManager 通过 Deployment 进行描述,保证 1 个副本的 Container 运行 JobManager,可以定义一个标签,例如 flink-jobmanager。
kubectl create -f jobmanager-deployment.yaml
  • 其次,还需要定义一个JobManager Service,通过 service name 和 port 暴露 JobManager 服务,通过标签选择对应的 pods。
kubectl create -f jobmanager-service.yaml

启动TaskManager

TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Container 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。

kubectl create -f taskmanager-deployment.yaml

提交任务

提交服务是通过请求JobManager Service实现的,如果从K8S集群外部请求该Service,需要对外暴露端口

kubectl port-forward service/flink-jobmanager 8081:8081

然后通过flink命令的m参数,指定服务的地址,即可向刚创建的集群中提交任务了。

./bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

删除集群

直接利用K8S的命令行工具或者API删除前面创建的资源对象即可

kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f flink-configuration-configmap.yaml

Flink on Kubernetes–交互原理

整个交互的流程比较简单,用户往 Kubernetes 集群提交定义好的资源描述文件即可,例如 deployment、configmap、service 等描述。后续的事情就交给 Kubernetes 集群自动完成。Kubernetes 集群会按照定义好的描述来启动 pod,运行用户程序。各个组件的具体工作如下:

  • Service: 通过标签(label selector)找到 job manager 的 pod 暴露服务。
  • Deployment:保证 n 个副本的 container 运行 JM/TM,应用升级策略。
  • ConfigMap:在每个 pod 上通过挂载 /etc/flink 目录,包含 flink-conf.yaml 内容。

Per-Job模式

在官方的Per Job模式下,需要先将用户代码都打到镜像里面,然后根据该镜像来部署一个flink集群运行用户代码,即Flink job cluster。所以主要分为两步:创建镜像与部署Flink job cluster。

创建镜像

在flink/flink-container/docker目录下有一个build.sh脚本,可以根据指定版本的基础镜像去构建你的job镜像,成功后会输出 “Successfully tagged topspeed:latest” 的提示。

sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed

镜像构建完成后,可以上传到 hub.docker.com 上,也可以上传到你们项目组的内部Registry。

docker tag topspeed zkb555/topspeedwindowing
docker push zkb555/topspeedwindowing

部署Flink job cluster

在镜像上传之后,可以根据该镜像部署Flink job cluster。

# 启动Servive
kubectl create -f job-cluster-service.yaml
# 启动JobManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
# 启动TaskManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -

参数说明:

  1. FLINK_DOCKER_IMAGE_NAME - 镜像名称(默认:flink-job:latest)
  2. FLINK_JOB - 要执行的Flink任务名称(默认:none)
  3. DEFAULT_PARALLELISM - Flink任务的默认并行度 (默认: 1)
  4. FLINK_JOB_ARGUMENTS - 其他任务参数;
  5. SAVEPOINT_OPTIONS - Savepoint选项 (default: none)

这种方式比较笨重,如果业务逻辑的变动涉及代码的修改,都需要重新生成镜像,非常麻烦,在生产环境提交一个新任务重新打镜像是不切实际的。一种更好的替代方案是将你的业务代码放到NFS或者HDFS上,然后在启动容器时通过挂载或者将jar包下载到容器内的方式执行你的Flink代码,代码位置通过启动参数传入。

需要注意的是Standalone模式需要在任务启动时就确定TaskManager的数量,暂且不能像Yarn一样,可以在任务启动时申请动态资源。然而很多时候任务需要多少个TaskManager事先并不知道,TaskManager设置少了,任务可能跑不起来,多了又会造成资源浪费,需要在任务启动时才能确定需要多少个TaskMananger,为了支持任务启动时实时动态申请资源的功能,就有了下面介绍的原生模式, 这意味着Flink任务可以直接向K8s集群申请资源。

原生模式

原生模式提供了与K8S更好的集成,在Flink 1.9以上版本内置了K8S的客户端,Flink的可以直接向K8S申请计算资源,集群资源得到了更高效的利用。这点与同Flink on Yarn/Mesos一样。

做好以下准备工作就可以从你的flink客户端直接提交flink任务到K8S集群。

  • KubeConfig, 位于 ~/.kube/config,需要具备查看、创建与删除pod与service对象的权限,可以在K8S客户端通过 kubectl auth can-i <list|create|edit|delete> pods来验证;
  • Kubernetes开启DNS服务;
  • 一个Kubernetes账户,需要具备创建与删除pod的权限。

原生模式同样支持Session模式玉Per-job两种方式提交任务。

原生Session模式

与Standalone模式中的Session模式类似,还是分为两步,先启动一个集群,然后向集群提交任务。可以通过运行kubernetes-session.sh文件来启动一个集群

./bin/kubernetes-session.sh

或者通过一些超参数来对集群进行设置

./bin/kubernetes-session.sh \-Dkubernetes.cluster-id=<ClusterId> \-Dtaskmanager.memory.process.size=4096m \-Dkubernetes.taskmanager.cpu=2 \-Dtaskmanager.numberOfTaskSlots=4 \-Dresourcemanager.taskmanager-timeout=3600000

然后在flink客户端,通过flink命令提交任务

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=<ClusterId> examples/streaming/WindowJoin.jar

原生Session cluster的创建流程为:

  • Flink客户端先通过K8S的ApiServer提交cluster的描述信息,包括ConfigMap spec, Job Manager Service spec, Job Manager Deployment spec等;
  • K8S接收到这些信息后就会拉取镜像、挂载卷轴来启动Flink master,这时候Dispatcher 与KubernetesResourceManager也会被启动,从而可以接受Flink job;

  • 当用户通过Flink客户端提交一个job时,客户端就会生成这个job的job graph,并与这个job的jar包一起提交到Dispatcher,然后就会生成这个job的JobMaster;
  • 最后JobMaster会向KubernetesResourceManager申请slot来执行这个job graph,如果集群中slot数量不够,KubernetesResourceManager会启动新的TaskManager pod并将它注册到集群中。

原生Per-Job模式

目前尚处于实验阶段,在Flink 1.11版本中才支持。

官方的使用方式也是与前面Standalone-Per-Cluster模式类似,先创建一个包含用户jar的用于启动Flink Master的docker image,然后在客户端通过flink命令根据该image提交任务,从而创建一个运行该任务的独立集群。

./bin/flink run -d -e kubernetes-per-job -Dkubernetes.cluster-id=<ClusterId>-Dtaskmanager.container.image=<your image>-Dtaskmanager.memory.process.size=4096m \-Dkubernetes.taskmanager.cpu=2 \-Dtaskmanager.numberOfTaskSlots=4 \-Dresourcemanager.taskmanager-timeout=3600000-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

Per-Job模式的运行过程与Session模式的不同点在于Flink Master的启动,其他步骤都一样。Flink Master Deployment里面已经有Flink任务的jar包,在启动Flink Master时Cluster Entrypoint就会运行该jar包的main函数产生job graph,并将该job graph与jar包提交给Dispatcher。

当然这种方式的缺点与Standalone-Per-Cluster一样,每个用户jar都需要一个单独的镜像,实际还是建议将用户jar放在外部,在运行时挂载或者下载到容器中。

总结

本文介绍了Flink on K8S的各种方案,鉴于不需要事先指定taskmanager数量,原生模式相对于Standalone模式更有优势,但目前尚处于实验阶段。他们两者都支持Session模式与Per-Job模式,至于选择哪种,看你实际的任务类型。如果式以频繁提交的短期任务,如批处理为主,则适合Session模式,如果以长期运行的流式任务为主,则适合用Per-Job模式。

Flink On K8S终极实现方案相关推荐

  1. flink on k8s部署方案调研

    背景 flink是目前最常用的大数据流计算处理引擎,一共有local,standalone,yarn,kubernetes四种安装部署机构方案.通常使用yarn进行管理,随着k8s云计算体系越来越成熟 ...

  2. flink on k8s部署方案实践--详细步骤

    背景 Flink-operator极大的方便了我们管理 Flink 集群及其作业,我们只需要自定义yaml文件就可以做到. Flink 官方还未给出 flink-operator 方案,不过 Goog ...

  3. 【Flink】flink on k8s 部署方案

    1. HDFS部署 (k8s) hdfs的作用是用户保存flink的检查点与保持点 但是大数据集群目前还是建议单独使用CDH或者HDP部署 目前文中的这种hdfs on k8s方式做高可用不太方便 1 ...

  4. Uno 平台 一 WinUI终极跨平台方案(二)

    上篇<Uno 平台 一 WinUI终极跨平台方案(一)>简单介绍了Uno平台3.6版本的更新内容,大家评论里持怀疑态度的人很多,主要还是Uno平台非微软官方发布,咱们.net技术方向的已经 ...

  5. 【Flink on k8s】JConsole 远程监控 TaskManager

    1.概述 转载:[Flink on k8s]JConsole 远程监控 TaskManager 问题:如何定位 Flink 作业卡在哪里? 首先,配置文件 flink-conf.yaml 添加 Jav ...

  6. 唯品会基于Kubernetes(k8s)网络方案演进

    VIP PaaS在接近两年时间里,基于kubernetes主要经历四次网络方案的变迁: 1. kubernetes + flannel 2. 基于Docker libnetwork的网络定制 3. k ...

  7. 大规模 Web 数据采集的终极开源方案 - PulsarRPA

    简介 PulsarRPA (国内镜像)是大规模采集 Web 数据的终极开源方案,可满足几乎所有规模和性质的网络数据采集需要. 大规模提取 Web 数据非常困难.网站经常变化并且变得越来越复杂,这意味着 ...

  8. k8s网络架构图_唯品会基于Kubernetes(k8s)网络方案演进

    VIP PaaS在接近两年时间里,基于kubernetes主要经历四次网络方案的变迁: 1. kubernetes + flannel 2. 基于Docker libnetwork的网络定制 3. k ...

  9. flink在k8s上的部署和使用教程

    官网:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/python.html 架构 要了解一个系统,一般都是从架构开 ...

  10. Flink On K8s

    Flink 原生支持 Kubernetes # 本页面介绍了如何在Kubernetes 上本地部署 Flink . 开始 # 本入门部分将指导您在 Kubernetes 上设置功能齐全的 Flink ...

最新文章

  1. 性能领先,即训即用,快速部署,飞桨首次揭秘服务器端推理库
  2. pyhanlp 文本聚类
  3. 《百面机器学习算法工程师带你去面试》资料学习
  4. 请写一个java程序实现线程连接池功能_写一个java程序实现线程连接池的功能
  5. python3 在线加密_Python3对称加密算法AES、DES3实例详解
  6. VS2012手动关联xaml与CS文件
  7. 聚类算法实践——PCCA、SOM、Affinity Propagation
  8. 金融风控实战——反欺诈特征
  9. 球面坐标系与三角函数 Spherical Coordinates and Trigonometric Functions
  10. fms安装教程 linux_linux通过fms搭建流媒体服务器Flash Media Server
  11. vue使用element-ui打包后与本地测试样式不同问题,改变element-ui原有样式方法
  12. 【干货】ubuntu18.04+win7双系统+双硬盘安装总结
  13. java怎样写网页_java怎么写网站
  14. 重磅!上海出落户新政:双一流应届硕士可直接落户!
  15. Theme Studio(主题工作室)
  16. python微信群管理_利用python实现在微信群刷屏的方法
  17. Python那些让我疑惑许久的代码--2
  18. 使用you-get批量下载B站视频
  19. Codeforces Round #548 (Div. 2) C. Edgy Trees(并查集+快速幂)
  20. react native 极光推送 jpush-react-native

热门文章

  1. 浅谈中国古代服饰纹样中礼制精神的体现
  2. VMware 中Fedora系统连接网络问题!
  3. quartz mysql 表 集群配置_Quartz集群配置
  4. 同济大学Python程序设计基础 实验二:数据表示和处理
  5. 全球与中国SS-OCT激光市场深度研究分析报告
  6. 入门级概述光学相干层析(OCT)原理
  7. U盘装Win11教程 系统之家装机教程
  8. 【转】音视频工程中VGA线材的选材技巧
  9. FIFO读数据异常分析
  10. react在线浏览doc_如何实现 React 中的状态自动保存?