按照上篇文章《解锁云原生 AI 技能 | 在 Kubernetes 上构建机器学习系统》搭建了一套 Kubeflow Pipelines 之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于 Kubeflow Pipelines 的机器学习工作流。

准备工作

机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备、模型训练 Checkpoint 的导出评估、到最终模型的导出。这就需要分布式存储作为传输的媒介,此处使用 NAS 作为分布式存储。

  • 创建分布式存储,这里以 NAS 为例。此处 NFS_SERVER_IP 需要替换成真实 NAS 服务器地址
  1. 创建阿里云 NAS 服务,可以参考文档
  2. 需要在 NFS Server 中创建 /data
# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs
  1. 创建对应的 Persistent Volume
# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:name: user-susanlabels:user-susan: pipelines
spec:persistentVolumeReclaimPolicy: Retaincapacity:storage: 10GiaccessModes:- ReadWriteManynfs:server: NFS_SERVER_IPpath: "/data"# kubectl create -f nfs-pv.yaml
  1. 创建 Persistent Volume Claim
# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:name: user-susanannotations:description: "this is the mnist demo"owner: Tom
spec:accessModes:- ReadWriteManyresources:requests:storage: 5Giselector:matchLabels:user-susan: pipelines
# kubectl create -f nfs-pvc.yaml

开发 Pipeline

由于 Kubeflow Pipelines 提供的例子都是依赖于 Google 的存储服务,这导致国内的用户无法真正体验 Pipelines 的能力。为此,阿里云容器服务团队提供了基于 NAS 存储训练 MNIST 模型的例子,方便您在阿里云上使用和学习 Kubeflow Pipelines。具体步骤分 3 步:

  • (1) 下载数据
  • (2) 利用 TensorFlow 进行模型训练
  • (3) 模型导出

在这 3 个步骤中,后一个步骤都依赖于前一个步骤而完成。
Kubeflow Pipelines 中可以用 Python 代码描述这样一个流程, 完整代码可以查看 standalone_pipeline.py。
我们在例子中使用了基于开源项目 Arena 的 arena_op ,这是对于 Kubeflow 默认的 container_op 封装,它能够实现对于分布式训练 MPI 和 PS 模式的无缝衔接,另外也支持使用 GPU 和 RDMA 等异构设备和分布式存储的简单接入,同时方便从 git 源同步代码,是一个比较实用的工具 API。

@dsl.pipeline(name='pipeline to run jobs',description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',dropout='0.9',model_version='1',commit='f097575656f927d86d99dd64931042e1a9003cb2'):"""A pipeline for end to end machine learning workflow."""data=["user-susan:/training"]gpus=1
# 1. prepare dataprepare_data = arena.standalone_job_op(name="prepare-data",image="byrnedo/alpine-curl",data=data,command="mkdir -p /training/dataset/mnist && \cd /training/dataset/mnist && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")# 2. downalod source code and train the modelstrain = arena.standalone_job_op(name="train",image="tensorflow/tensorflow:1.11.0-gpu-py3",sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",env=["GIT_SYNC_REV=%s" % (commit)],gpus=gpus,data=data,command='''echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \--max_steps 500 --data_dir /training/dataset/mnist \--log_dir /training/output/mnist  --learning_rate %s \--dropout %s''' % (prepare_data.output, learning_rate, dropout),metrics=["Train-accuracy:PERCENTAGE"])# 3. export the modelexport_model = arena.standalone_job_op(name="export-model",image="tensorflow/tensorflow:1.11.0-py3",sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",env=["GIT_SYNC_REV=%s" % (commit)],data=data,command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

Kubeflow Pipelines 会将上面的代码转化成一个有向无环图 (DAG), 其中的每一个节点就是 Component (组件),而 Component (组件)之间的连线代表它们之间的依赖关系。从 Pipelines UI 可以看到 DAG 图:

首先具体理解一下数据准备的部分,这里我们提供了 arena.standalone_job_op 的 Python API, 需要指定该步骤的名称: name; 需要使用的容器镜像: image; 要使用的数据以及其对应到容器内部的挂载目录: data。
这里的 data 是一个数组格式, 如 data=[“user-susan:/training”],表示可以挂载到多个数据。 其中 user-susan 是之前创建的 Persistent Volume Claim, 而 /training 为容器内部的挂载目录。

prepare_data = arena.standalone_job_op(name="prepare-data",image="byrnedo/alpine-curl",data=data,command="mkdir -p /training/dataset/mnist && \cd /training/dataset/mnist && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

而上述步骤实际上是从指定地址利用 curl 下载数据到分布式存储对应的目录 /training/dataset/mnist,请注意这里的 /training 为分布式存储的根目录,类似大家熟悉的根 mount 点;而 /training/dataset/mnist 是子目录。其实后面的步骤可以通过使用同样的根 mount 点,读到数据,进行运算。
第二步是利用下载到分布式存储的数据,并通过 git 指定固定 commit id 下载代码,并进行模型训练。

train = arena.standalone_job_op(name="train",image="tensorflow/tensorflow:1.11.0-gpu-py3",sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",env=["GIT_SYNC_REV=%s" % (commit)],gpus=gpus,data=data,command='''echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \--max_steps 500 --data_dir /training/dataset/mnist \--log_dir /training/output/mnist  --learning_rate %s \--dropout %s''' % (prepare_data.output, learning_rate, dropout),metrics=["Train-accuracy:PERCENTAGE"])

可以看到这个步骤比数据准备要相对复杂一点,除了和第一步骤中的 name, image, data 和 command 一样需要指定之外,在模型训练步骤中,还需要指定:

  • 获取代码的方式: 从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在 API 调用时指定 sync_source 的 git 代码源,同时通过设定 envGIT_SYNC_REV 指定训练代码的 commit id;
  • gpu: 默认为 0,就是不使用 GPU;如果为大于 0 的整数值,就代表该步骤需要这个数量的 GPU 数;
  • metrics: 同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过 Pipelines UI 进行直观的显示和比较。具体使用方法分为两步:1. 在调用 API 时以数组的形式指定要收集指标的 metrics name 和指标的展示格式 PERCENTAGE 或者是 RAW,比如 metrics=["Train-accuracy:PERCENTAGE"]。 2. 由于 Pipelines 默认会从 stdout 日志中收集指标,你需要在真正运行的模型代码中输出 {metrics name}={value} 或者 {metrics name}:{value}, 可以参考具体样例代码。


值得注意的是:

在本步骤中指定了和 prepare_data 相同的 data 参数 [“user-susan:/training”],就可以在训练代码中读到对应的数据,比如 --data_dir /training/dataset/mnist
另外由于该步骤依赖于 prepare_data,可以在方法中通过指定 prepare_data.output 表示两个步骤的依赖关系。

最后 export_model 是基于 train 训练产生的 checkpoint,生成训练模型:

export_model = arena.standalone_job_op(name="export-model",image="tensorflow/tensorflow:1.11.0-py3",sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",env=["GIT_SYNC_REV=%s" % (commit)],data=data,command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

export_model 和第二步 train 类似,甚至要更为简单,它只是从 git 同步模型导出代码并且利用共享目录 /training/output/mnist 中的 checkpoint 执行模型导出。
整个工作流程看起来还是很直观的, 下面就可以定义一个 Python 方法将整个流程贯穿在一起:

@dsl.pipeline(name='pipeline to run jobs',description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',dropout='0.9',model_version='1',commit='f097575656f927d86d99dd64931042e1a9003cb2'):

@dsl.pipeline 是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是 namedescription
入口方法 sample_pipeline 中定义了 4 个参数: learning_rate, dropout, model_versioncommit, 分别可以在上面的 trainexport_model 阶段使用。这里的参数的值实际上是 dsl.PipelineParam 类型,定义成 dsl.PipelineParam 的目的在于可以通过 Kubeflow Pipelines 的原生 UI 将其转换成输入表单,表单的关键字是参数名称,而默认值为参数的值。值得注意的是,这里的 dsl.PipelineParam 对应值实际上只能是字符串和数字型;而数组和 map,以及自定义类型都是无法通过转型进行变换的。

实际上,这些参数都可以在用户提交工作流时进行覆盖,以下就是提交工作流对应的 UI:

提交 Pipeline

您可以在自己的 Kubernetes 内将前面开发工作流的 Python DSL 提交到 Kubeflow Pipelines 服务中, 实际提交代码很简单:

KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"import kfp.compiler as compilercompiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')client = kfp.Client(host=KFP_SERVICE)try:experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).idexcept:experiment_id = client.create_experiment(EXPERIMENT_NAME).idrun = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',params={'learning_rate':learning_rate,'dropout':dropout,'model_version':model_version,'commit':commit})

利用 compiler.compile 将 Python 代码编译成执行引擎 (Argo) 识别的 DAG 配置文件;
通过 Kubeflow Pipeline 的客户端创建或者找到已有的实验,并且提交之前编译出的 DAG 配置文件。

在集群内准备一个 python3 的环境,并且安装 Kubeflow Pipelines SDK:

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

登录到 Python3 的环境后,执行如下命令,连续提交两个不同参数的任务:

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

查看运行结果

登录到 Kubeflow Pipelines 的 UI: https://{pipeline地址}/pipeline/#/experiments, 比如:

https://11.124.285.171/pipeline/#/experiments


点击 Compare runs 按钮,可以比较两个实验的输入、花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步,而利用 Kubeflow Pipelines 本身的实验管理能力则是开启实验可重现的第一步。

总结

实现一个可以运行的 Kubeflow Pipeline 需要的步骤是:

  1. 构建 Pipeline (流水线)中需要的最小执行单元 Component (组件),如果是利用原生定义的 dsl.container_ops, 需要构建两部分代码:
  • 构建运行时代码:通常是为每个步骤构建容器镜像,作为 Pipelines 和真正执行业务逻辑代码之间的适配器。它所做的事情为获取 Pipelines 上下文的输入参数,调用业务逻辑代码,并且将需要传递到下个步骤的输出按照 Pipelines 的规则放到容器内的指定位置,由底层工作流组件负责传递。 这样产生的结果是运行时代码与业务逻辑代码会耦合在一起。可以参考 Kubeflow Pipelines 的例子;
  • 构建客户端代码:这个步骤通常是长成下面的样子, 熟悉 Kubernetes 的朋友会发现这个步骤实际上就是在编写 Pod Spec:
container_op = dsl.ContainerOp(name=name,image='<train-image>',arguments=['--input_dir', input_dir,'--output_dir', output_dir,'--model_name', model_name,'--model_version', model_version,'--epochs', epochs],file_outputs={'output': '/output.txt'})
container_op.add_volume(k8s_client.V1Volume(host_path=k8s_client.V1HostPathVolumeSource(path=persistent_volume_path),name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(mount_path=persistent_volume_path,name=persistent_volume_name))

利用原生定义的 dsl.container_ops 的好处在于灵活,由于开放了和 Pipelines 的交互接口,用户可以在 container_ops 这个层面做许多事情。但是它的问题在于:

  • 复用度低。每个 Component 都需要构建镜像和开发运行时代码;
  • 复杂度高。使用者需要了解 Kubernetes 的概念,比如 resource limit, PVC, node selector 等一系列概念;
  • 支持分布式训练困难。由于 container_op 为单容器操作,如果需要支持分布式训练就需要在 container_ops 中提交和管理类似 TFJob 的任务。这里会带来复杂度和安全性的双重挑战,复杂度比较好理解,安全性是说提交 TFJob 这类任务的权限会需要开放额外的权限给 Pipeline 的开发者。

另一种方式是使用 arena_op 这种可以重用的 Component API,它使用通用运行时代码,可以免去重复构建运行时代码的工作;同时利用通用一套的 arena_op API 简化用户的使用;也支持 Parameter Server 和 MPI 等场景。建议您使用这种方式编译 Pipelines。

  1. 将构建好的 Component (组件)拼接成 Pipeline (流水线);
  2. 将 Pipeline (流水线)编译成 Argo 的执行引擎 (Argo) 识别的 DAG 配置文件, 并提交 DAG 配置文件到 Kubeflow Pipelines, 利用 Kubeflow Pipelines 自身的 UI 查看流程结果。

解锁云原生 AI 技能 - 开发你的机器学习工作流相关推荐

  1. 解锁云原生 AI 技能|在 Kubernetes 上构建机器学习系统

    本系列将利用阿里云容器服务,帮助您上手 Kubeflow Pipelines. 介绍 机器学习的工程复杂度,除了来自于常见的软件开发问题外,还和机器学习数据驱动的特点相关.而这就带来了其工作流程链路更 ...

  2. 云原生AI开发平台架构及核心优势

    导读: 云原生AI开发平台以云原生容器服务为基础架构调用云资源,配合大数据计算.人工智能计算以及分布式计算框架,服务于企业的数据分析.模型训练以及AI应用部署需求.借助容器.微服务.无服务器等云原生优 ...

  3. 艾瑞咨询:2021年中国云原生AI开发平台白皮书

    国内人工智能技术成熟度持续提升.服务种类不断丰富,在企业经营管理各环节的价值已得到市场的初步验证,然而当前国内甲方企业进行人工智能应用仍然面临着一系列问题. 云原生AI开发平台融合了成熟的人工智能开发 ...

  4. 深入云原生 AI:基于 Alluxio 数据缓存的大规模深度学习训练性能优化

    作者 | 车漾(阿里云高级技术专家).顾荣(南京大学 副研究员) 导读:Alluxio 项目诞生于 UC Berkeley AMP 实验室,自开源以来经过 7 年的不断开发迭代,支撑大数据处理场景的数 ...

  5. 理清逻辑,确保云原生时代应用开发的全生命周期安全

    摘要:正所谓"道高一尺魔高一丈",不断的攻防对抗中,安全设计者也面临很大的挑战.华为云MVP毛哲文是一个偏向于安全防守的安全工程师,他认为,"攻防之间要做到平衡,知己知彼 ...

  6. 云原生下的开发测试实践

    本文整理自阿里巴巴技术专家金戟和砧木的视频分享<云原生下的开发测试>. 测试环境管理之困与阿里巴巴的解决之道 查看全文 http://www.taodudu.cc/news/show-42 ...

  7. 华为云解锁云原生数据库发展新动能

    摘要:如流水般源源不断的数据都存放在哪里?云原生数据库到底是什么?企业基于云原生数据库如何随取随用,实现从 "上好云" 到 "用好云" 的跨越发展? 本文分享自 ...

  8. 【云原生AI】Fluid + JindoFS 助力微博海量小文件模型训练速度提升 18 倍

    简介: 深度学习平台在微博社交业务扮演着重要的角色.计算存储分离架构下,微博深度学习平台在数据访问与调度方面存在性能低效的问题.本文将介绍微博内部设计实现的一套全新的基于 Fluid(内含 Jindo ...

  9. 毫末智行 Fluid 实践:云原生 AI 让汽车变得“更聪明”

    简介:毫末机器学习训练场景对于数据读取有较高的性能要求,且对于元数据和数据缓存的精细化控制要求较高, 通过 Fluid + JindoRuntime 的缓存能力可以灵活地缓存 OSS 训练文件进行元数 ...

最新文章

  1. JSP内置对象-out
  2. C 语言Hello world
  3. Linux下去掉Windows文件的^M
  4. Output Operations on DStreams
  5. 深度探索C++ 对象模型(2)-类的对象的内存大小
  6. wxWidgets:wxURI类用法
  7. java内存泄漏总结
  8. 买了套房28年后才想起来,房价从33万涨到600万!上门却发现住着人?
  9. oracle 删除jobs日志,修改Oracle的Job Scheduler 日志级别及删除运行日志
  10. jq+layui之保存修改的表状态
  11. mysql忘记root密码怎么办,几个简单操作步骤找回mysql密码
  12. 兄弟打印机内存已满清零方法_兄弟打印机清零方法大全
  13. 计算机打印不了测试纸,win10系统打印机无法打印测试页的解决办法
  14. mysql 生成一张全年日期表:工作日、周末、节假日
  15. vue 加headers_(vue.js)axios interceptors 拦截器中添加headers 属性
  16. SpringCloud分布式架构演进
  17. 谷歌ai人工智能叫什么_谷歌正在通过AI策展和内置订阅全面革新Google新闻
  18. Typora中的emoji图标标签
  19. 算法题 子数组之和为零
  20. 15天精读掌握《高德纳_具体数学》 第7天 2019.6.3

热门文章

  1. 线程池之工作项,等待项,计时项 (存在疑问???)
  2. ANSI,ASCII,UNICODE
  3. 8、MySQL注释:单行注释和多行注释
  4. ACM入门之【约数】
  5. 【AC Saber】双指针
  6. 建基小型计算机,建碁AOpenminiITX小型化平台应用(44页)-原创力文档
  7. reactor线程模型_从TCP服务器到I/O模型,带你学习Netty
  8. 推荐 7 个牛哄哄 Spring Cloud 实战项目
  9. 推荐 19 个 github 超牛逼项目!
  10. 22道Java面试题,看看你会了多少?