全栈工程师开发手册 (作者:栾鹏)
一站式云原生机器学习平台


前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征;数据源管理,支持结构数据和媒体标注数据管理;在线开发,在线的vscode/jupyter代码开发;在线镜像调试,支持免dockerfile,增量构建;任务流编排,在线拖拉拽;开放的模板框架,支持tf/pytorch/spark/ray/horovod/kaldi等分布式训练任务;task的单节点debug,分布式任务的批量优先级调度,聚合日志;任务运行资源监控,报警;定时调度,支持补录,忽略,重试,依赖,并发限制,定时任务算力的智能修正;nni,katib,ray的超参搜索;多集群多资源组,算力统筹,联邦调度;tf/pytorch/onnx模型的推理服务,serverless流量管控,tensorrt gpu推理加速,依据gpu利用率/qps等指标的 hpa能力,虚拟化gpu,虚拟显存等服务化能力。 目前开源到github:https://github.com/tencentmusic/cube-studio

Argo

业界有很多的调度解决方案,机器学习平台底层依赖argo框架,先来了解一下argo,整体结构如下:

argo定制了一个k8s自定义资源workflows.argoproj.io,我们通过这类资源可以定义自己的一个dag。argo的核心就是这个workflow,依据结构图,我们可以看出,当我们通过k8s创建了一个workflow,或者通过argo cli创建了一个workflow,这相当于一个dag或者一个pipeline。workflow controller监听到此创建事件,就会解析dag的结构,根据结构编排dag中的每一个task,并依据顺序进行单步或者并发执行。每个task为一个k8s pod。该pod包含wait container和一个main container。其中main 容器就是你的业务容器,真正运行你的代码的地方。wait容器是启动控制容器,负责控制启动你的业务容器和监听业务容器是否完成。从下面的图可以看到在k8s中每个task运行时包含这两个容器的。

这些argo的架构配置由configmap:workflow-controller-configmap 和 workflow-controller-parameters进行管理。

main业务容器的日志由业务自己把控,下面我们来看看wait控制容器的日志,通过这个日志就能很清楚的了解一个task示例运行时的过程。

看日志我们看到wait控制容器,使用docker进行启动一个main容器,并打印了容器需要的所有配置,这里可以校验你的配置参数是否正确。main容器启动后拿到main容器的id,就启动监控程序直到main容器完成。main完成后打包main的log成压缩文件到minio,然后识别task是否有输出parameters或artifacts,有artifacts的话就通过docker cp将文件复制出来也打包放到minio,最后杀掉main容器,回收内存。

workflow.argoproj.io/v1alpha1

workflow是argo项目封装的一个k8s crd。我们只需要创建一个workflow就可以创建运行一次pipeline。一个pipeline相当于一个dag,其中每个节点是一个task,这些task组成有向无环的任务流。每个task可以理解为一个pod。下面我们来看看这个workflow crd是怎么使用的?先来看看需要配置的参数。

WorkflowSpec:Arguments:parameters:       参数列表,下面的变量可以使用的参数可选项Entrypoint:          workflow入口的templatename:inputs:             workflow的输入parameters:       使用上面的哪个参数表outputs:            workflow输出artifacts:        输出到对象存储parameters:       输出变量imagePullSecrets: [] 默认拉取秘钥parallelism: 3       默认并行task数目nodeSelector: {}     默认node选择serviceAccountName:  task携带的k8s账户Sidecars:Volumes:            全局可用挂载卷Templates:           所有的task节点。包含dag,container几种类型# 每种类型的template都包含的字段- name: 'xx'         名称inputs: []         输入metadata: {}       元数据nodeSelector: {}   选择的机器outputs: {}        输出volumes: []        挂载卷affinity: {}       亲密度# 下面的是特殊字段,根据节点类型不同而配置不同的字段container:         业务容器化任务sidecars:          业务容器的伴随容器daemon:            dag守护进程,容器就绪后,pipline向后流入dag:               pipeline结构resource:          创建k8s资源类型steps: [step]      条件执行步骤

先来了解一下字段的概念和使用

  • Template: 表示一个pipeline的一个节点(node、步骤)。可以是一个dag的定义,或者一个task、一批task
  • Step:一个task
  • Steps:一批task
  • Entrypoint: dag的入口节点
  • DAG:一个pipeline,就是一批task和他们之间的前后运行关系
  • Workflow Template:argo定义的k8s crd,命名空间内的workflow模板
  • Cluster Workflow Template:argo定义的k8s crd, 一个集群的workflow模板
  • Inputs:task的输入参数,可以是变量parameters或者artifacts
  • Outputs: task的输出,parameters或者artifacts 由任务产生输出
  • Parameters: 参数,可以是objects, strings, booleans, arrays
  • Artifacts: 对象存储系统的对象,相当于文件,在minio上
  • Artifact Repository: Artifact存储的地方
  • Executor:执行container的方法(docker, Kubelet, Kubernetes API, pns)
  • Workflow Service Account:每个task携带的k8s身份。用来控制task对集群的控制能力。

随着版本的迭代,k8s的更多字段被支持,imagePullSecrets、nodeSelector等

workflow本身属于argo项目,并不归属于kubeflow,只是kubeflow使用argo作为基础组件来使用。workflow更详细的字段解释可以参考:https://argoproj.github.io/argo/fields/

Template

pipeline中可以包含多种类型的任务节点。下面我们来主要看几种。

流转类节点类型(task流转):

Steps:用来控制什么条件下任务怎么流转

  • name: 名称
  • template: 被调节点名称(可以循环调用自己)
  • when:条件
  • 参数传递:{{steps.generate-parameter.outputs.parameters.hello-param}}

DAG:用来控制上下游关系,不控制流转条件。结构类似如下,通过配置包含的task和依赖的上游task,形成了

hdfs-train-data-fetch -> model-train -> model-evaluation的任务流

- dag:tasks:- arguments: {}name: hdfs-train-data-fetchtemplate: hdfs-train-data-fetch- arguments: {}dependencies:- model-trainname: model-evaluationtemplate: model-evaluation- arguments: {}dependencies:- hdfs-train-data-fetchname: model-traintemplate: model-train

任务运行类节点类型

Container类:可以直接配置k8s中pod container的参数

    container:args: []env: []image: xxximagePullPolicy: Alwaysname: "xx"resources: {}volumeMounts: []

Script类:

  • image:
  • command:
  • source: python代码,产生template名称

Resource类:

  • action: create
  • manifest: |

SUSPEND类:

  • duration: "20s”

其他高级配置

Pod 回收策略:OnPodCompletion,OnPodSuccess,OnWorkflowCompletion,OnWorkflowSuccess

task节点类型: Pod, Steps, StepGroup, DAG, TaskGroup, Retry, Skipped, Suspend

task节点状态:Pending,Running,Succeeded,Skipped,Failed,Error,Omitted

input/output

输入输出支持两种类型:Parameter和Artifacts。parameter和argument在理解上容易有混淆,我们可以理解parameter为定义参数,argument为调用参数。

Parameter:

下面是使用parameter的示例。task1业务容器编写/tmp/param.txt文件,并将文件内容定义为输出参数。在steps的task2中引入该变量并传给task2的输入参数task2-param,最后由task2进行打印。

- steps:- - name: task1template: task1- - name: task2template: task2arguments:parameters:- name: task2-paramvalue: "{{steps.task1.outputs.parameters.task1-param}}"- name: task1outputs:parameters:- name: task1-paramvalueFrom:path: /tmp/param.txt   # 具体内容又业务容器自己写进去- name: task2inputs:parameters:- name: task2-paramcontainer:command: ["echo","{{inputs.parameters.message}}"]

Artifacts:

artifacts和parameters在参数传递中地位相同,只不过在使用artifacts作为输出前,系统会把配置的path文件打包并上传到对象存储服务器上,作为输入的时候会先从对象存储上把文件下载下来解压到input的path中。这样业务容器识别的就是文件,并不会感知对象存储文件的存在。

- steps:- - name: task1template: task1- - name: task2template: task2arguments:artifacts:- name: task2-artifactsfrom: "{{steps.generate.outputs.artifacts.task1-artifact}}"- name: task1outputs:artifacts:- name: out-artifactpath: /tmp/hello_world.txts3:endpoint: s3.amazonaws.combucket: my-aws-bucket-namekey: path/in/bucket/my-input-artifact.txt- name: task2inputs:artifacts:- name: in-artifactpath: /tmp/input.txtcontainer:image: docker/whalesay:latestcommand: ["cat","/tmp/input.txt"]

file->minio

MinIO 是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小,从几kb到最大5T不等。minio作为kubeflow的基础服务:对象存储,我们不太用操心。一般我们不会参与minio的开源改造,只需要关心如何部署使用它。

minio的kubernetes的部署参考:https://docs.min.io/cn/deploy-minio-on-kubernetes.html

minio客户端cli的使用参考:https://docs.min.io/cn/minio-client-complete-guide.html

minio的sdk(python)的api参考:https://docs.min.io/cn/python-client-api-reference.html

在机器学习平台中,集成了minio web ui,我们在这里可以使用和查看到kubeflow系统对minio的使用。

下图显示了workflow的输出文件是如何被转移到minio上的。除了用户定义的outputs,业务容器的代理也是默认被打包到minio上的。

WorkflowStatus

下面来看看workflow的status。

status:conditions: []phase:           # 综合状态startedAt: finishedAt:nodes: $pipeline_node_name1:children:- $pipeline_node_name2displayName: startedAt:finishedAt: id: name: phase: Running/SucceededtemplateName: type: DAG/POD

workflow中有任何操作改动,都会更新到status中。我们通过workflow_status就可以查看整个workflow和每个task的所有运行状况。但是如果遇到无法排除的特殊报错详情,还是要从workflow-operator中获取。

kubeflow pipeline(kfp)

kubeflow-pipelines是 Kubeflow 社区新开源的端到端的 ML/DL 工作流系统。实现了一个工作流模型。所谓工作流,或者称之为流水线(pipeline),可以将其当做一个有向无环图(DAG)。其中的每一个节点,在 kubeflow/pipelines 的语义下被称作组件(component)。组件在图中作为一个节点,其会处理真正的逻辑,比如预处理,数据清洗,模型训练等等。每一个组件负责的功能不同,但有一个共同点,即组件都是以 Docker 镜像的方式被打包,以容器的方式被运行的。

kubeflow-pipeline依赖argo workflow,是一个更加产品化的项目。更接近于普通用户。

pipeline架构分层

kubeflow-pipeline的架构图如下图所示。由上至下划分为4层。

第一层:用户交互层。如:kubeflow-pipeline提供了web ui,python sdk和cli命令行,都可以直接用来操作pipeline。在web界面上,我们可以上传workflow,管理pipeline的版本,配置定时运行,查询运行记录细节,和可视化输出结果。

第二层:API服务化层。如:除了pipeline 后端服务api,还有元数据的服务和对象存储服务。pipeline的后端通过kubernetes api server 来管理自定义资源,例如workflow。

第三层:引擎controller。这一层为k8s资源的controller控制器,通过监听k8s 资源的增减,来控制管理pod。例如workflow operator。此外还有tfjobs.kubeflow.org、pytorchjobs.kubeflow.org、experiments.kubeflow.org、scheduledworkflows.kubeflow.org、suggestions.kubeflow.org、trials.kubeflow.org、xgboostjobs.xgboostjob.kubeflow.org、viewers.kubeflow.org、workflows.argoproj.io、cronworkflows.argoproj.io等资源的控制器

第四层:pod驱动层。这一层又k8s进行控制,真正运行业务代码的层面,真正进行调度的层面。

组件概念解释

下面来说一下结构图中组件的含义和功能。

  • Python SDK: 用于创建kubeflow pipelines组件的python 包,其中包含DSL compiler: 将Python代码转换成YAML静态配置文件。
  • Pipeline Web Server: pipeline的前端服务,它收集各种数据以显示相关视图:当前正在运行的pipeline列表,pipeline执行的历史记录,有关各个pipeline运行的调试信息和执行状态等。
  • Pipeline api-Service: pipeline的后端服务,调用K8S服务从YAML创建 pipeline(workflow)运行。
  • Kubernetes Resources: 创建CRD运行pipeline。
  • Machine Learning Metadata Service: 用于监视由Pipeline Service创建的Kubernetes资源,并将这些资源的状态持久化在ML元数据服务中(存储任务流容器之间的input/output数据交互)。
  • Artifact Storage: 用于存储Metadata和Artifact。Kubeflow Pipelines将元数据存储在MySQL数据库中,将工件存储在MINIO服务器或其他云存储等工件存储中。
  • Orchestration Controllers:任务编排,比如Argo Workflow Controller,它可以协调任务驱动的工作流。

元数据概念解释

我们先来看看元数据mysql中存储的字段。db名和密码都是自己部署时配置的。

字段包含default_experiments,experiments,jobs,pipeline_versions,pipelines,resource_references,run_details,run_metrics。kubeflow-pipeline平台支持pipeline多版本管理,并且每个pipeline都有一个默认版本,pipeline要运行的话需要绑定一个experiments,每次运行都是一个run实例,每个run实例都有一个run_details和run_metrics。

构建pipeline

要完成kubeflow pipeline的正常运转,需要我们我们完成编写代码、构建镜像、配置task,构建pipeline,配置定时调度,查看调度效果等过程。

1)构建镜像、编写代码

task采用容器运行,所以需要我们能够打包自己的镜像。平台侧可以封装公共环境镜像,在内网部署,只需要使用内网的docker仓库源就好。

在idc里面使用docker需要额外了解的几个操作步骤。

修改默认python版本rm /usr/bin/python
ln -s /usr/bin/python3.6 /usr/bin/python
rm /usr/bin/pip
ln -s /usr/bin/pip3 /usr/bin/pip
pip install pip --upgrade

当你需要连接idc的应用进行开发时,你也可以使用okteto本地开发idc应用,或者使用notebook在线开发。

2)构建task

代码构建的task有几种类型,也提供了几种方式。先来看看支持的主要的task类型

通过dsl ContainerOp进行创建,可以添加k8s container的诸多配置参数。如果需要有outputs功能,需要做专门配置,系统只是在task运行结束后,将此文件打包到minio上,具体的文件的内容,是由业务代码自己去掌控的。这个机器学习平台产品层主要使用的方式。并且在task中需要挂载的pvc,都自动设置为用户子目录,这样每个用户就只能操作个人子目录,保证数据隔离。

# dsl ContainerOp定义容器任务
def save_op(text):return dsl.ContainerOp(# 反复调用  同名container 会自动加数字分区name='save',image='library/bash:4.4.23',command=['sh', '-c'],arguments=['echo 00000000 > /tmp/results.txt'],file_outputs={'data': '/tmp/results.txt',})

通过dsl ResourceOp进行直接创建k8s的资源

CONTAINER_MANIFEST =
{"apiVersion": "v1","kind": "xx","metadata": {},"spec": {…}
}
op = dsl.ResourceOp(name='test-step',k8s_resource=CONTAINER_MANIFEST,action='create'
)

由于自定义资源的能力太广泛,机器学习平台并没有将此功能开放给用户。

此外还提供了

  • ContainerOp:容器组成的job
  • PipelineParam:表示可以传递的管道参数。job函数参数将是一个PipelineParam对象。
  • Component:返回job的修饰器
  • Pipeline:返回pipline的修饰器
  • python_component:将job元数据添加到函数对象
  • ResourceOp:可以直接操作k8s资源的job
  • VolumeOp:可以创建挂载的job
  • PipelineVolume:job之间传递数据的卷。add_pvolumes方式使用

除了上面直接定的方式,还有其他几种构建task的方式。

使用func_to_container_op直接将python函数转成task

@func_to_container_op(base_image, output_component_file, packages_to_install, extra_code)
def xxxx:return "111"

这样系统会把函数转为在指定镜像中之执行函数的task,默认的base_image为python3.7,系统会自动生成一个启动python文件,如下,会将用户函数封装在这个py文件中,并获取输出结果。

def xxxx:return '111'
def _serialize_str(str_value: str) -> str:if not isinstance(str_value, str):raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(str(str_value), str(type(str_value))))return str_value
import argparse
_parser = argparse.ArgumentParser(prog=xxxx', description='')
_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop(\"_output_paths\", [])
_outputs = xxxxx(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):try:os.makedirs(os.path.dirname(output_file))except OSError:passwith open(output_file, 'w') as f:f.write(_output_serializers[idx](_outputs[idx]))

这个pod的启动参数为

"args": ["----output-paths","/tmp/outputs/Output/data"   ],

通过load_component_from_file从本地文件加载task。也就是task的定义在外部文件中完成。yaml文件格式如下

name: xxxxx
description: xxxxxxxx
inputs:
- {name: Log dir URI}
outputs:
- {name: Log dir URI}
- {name: MLPipeline UI Metadata}
implementation:container:image: xxxxcommand: [python3, //program.py,--input1-path, <local file path for the Input 1 data>,--param1, <value of Param1 input>,--output1-path, <local file path for the Output 1 data>,]

可以通过sdk加载外部文件成task

task1 = kfp.components.load_component_from_file(your_file_path)

官方提供了一批文件定义的task

另外还提供了下面几种构建task的方式。

  • func_to_container_op:基于python函数构建task
  • load_component_from_file:从本地文件加载构建task
  • load_component_from_url:从url加载构建task,也就是远程文件

除了在创建时配置参数也可以在创建后配置参数。如果官方无法满足需求,自己改造一下python包就可以。

echo_task2 = create_op('bbbbb','echo_task2')\.set_retry(2)\.set_memory_request('1G').set_memory_limit('2G')\.set_cpu_request('1').set_cpu_limit('2')\.set_gpu_limit(1, 'nvidia')\.set_image_pull_policy('IfNotPresent')\.set_lifecycle()\.set_readiness_probe()\.set_liveness_probe()\.add_port()\.add_env_variable()\.add_volume_mount()

3)构建pipeline

配置上下游关系

@dsl.pipeline(name='xx',description='xx') # annotations
def my_pipeline():                        # 输入参数,默认值。入口点task2.after(task1)                    # 配置task2在task1后面task2.after(task0)                    # 配置task2在task0后面,一个task可以有多个上游task3.after(task1)                    # 配置task3在task1后面,一个task可以有多个下游task4 = make_task(task1.output)       # 使用某task的output,则自动排在该task的后面

配置并发执行

loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
with dsl.ParallelFor(loop_args,parallelism=None) as item:op1 = make_task()

配置条件执行

with dsl.Condition(task1.output == '11):op1 = make_task()

可以有多个起点,多个并发,多个上游,多个下游,不能成环。

公共配置项,用来为pipeline每个task做相同的处理,比如设置task某些参数的默认值,如果task没有提供就使用该默认值。比如提供统一的task pod变换操作,来避免在每个task都进行相同的处理。

# 添加task变换
def add_annotation(op):op.add_pod_annotation(name='key1', value='value1')op.add_pod_label(name='key1', value='value1')return op@dsl.pipeline(name='xx',description='xx')     # 将会是每个task容器的名称和annotations
def my_pipeline(): # 配置pipeline的公共默认配置。如果task进行了配置,就使用task的配置dsl.get_pipeline_conf().\set_image_pull_secrets([k8s_client.V1ObjectReference(name="hubsecret")]).\set_default_pod_node_selector(label_name="test",value="true").\set_image_pull_policy('IfNotPresent').\set_timeout(60). \set_parallelism(1).\add_op_transformer(add_annotation)    # 添加每个op的变换函数

4)编译上传运行

构建配置完pipeline,有几种方式编译上传运行。

python sdk的方式:dsl提供了编译函数,并且可以直接访问ml pipeline在线api,实现代码编译上传运行。

kfp.compiler: 编译相关
Compiler:                   编译python dsl 成yaml
build_docker_image:         基于Dockerfile构建镜像
build_python_component:     基于python函数构建镜像kfp.Client:   客户端函数
upload_pipeline_version:   上传新版本
create_experiment:         创建实验
run_pipeline:              运行管道

web方式:kubeflow提供了ml pipeline ui组件,我们可以将compiler后的yaml文件或tar文件,直接直接通过ui进行上传(upload pipeline),并且通过ui运行按钮触发运行(先create experiment 再 create run)。

无论是python sdk的client,还是wei ui的方式,都是在访问ml pipeline服务的在线接口,该服务提供的接口可以参考https://www.kubeflow.org/docs/pipelines/reference/api/kubeflow-pipeline-api-spec/

上线以后我们就可以在web ui上看到pipeline结构图了。

5)跟踪运行效果

run-pipeline以后也就会创建workflow。workflow-controller监听到该自定义资源的创建,就开始根据workflow的定义逐步启动task,我们就可以跟踪查看效果。有几种方式来看运行情况。

1、通过web ui来看效果。

在run详情里面,我们可以看到本地运行的pipeline运行情况。绿色为运行成功,红色为运行失败。

当为task设置了重试时,运行效果图如下,会先展开一个task,依次运行该task的每次重试。当所有重试都结束还不成功,task才标记为失败。

点击每个task都可以查看该task的运行详情。

  • Input和Ouput:是在task构建时为task配置的输入输出,artifacts或者parameters形式。
  • Visualizations:为task的训练效果。
  • ML Metadata:为当前task运行在pipeline平台数据库中记录的信息,也就是本地task run示例的元数据。
  • Volumes:为task启动使用的k8s挂载项。
  • Logs:为pod的日志。
  • Pod:为task启动的pod的yaml格式的信息。

2、通过k8s dashboard查看

既然task都是容器,那么就可以通过各种k8s pod开源管理软件进行管理。包括kubernetes dashboard/rancher等。或者通过kubectl命令行进行查看,这个就看个人习惯了。

task跟踪-outputs

用户配置的output artifacts都会自动打包上传到minio上。在output里面可以看到除了用户配置的artifacts还有业务容器的日志也会自动进行打包上传到minio上。注意:在output 里面main容器的log是task运行结束后打包上传到minio上的,在横向Logs菜单中的显示是实时拉取当前pod的日志,所以Logs菜单下支持实时查看日志,但是当pod被删除后,日志也不再能被拉取到。而在output中的main log,必须在task结束后才能看到,但是即使pod被删除也能看到main log。

task跟踪-Visualizations

要想能够在task输出后进行效果可视化,需要用户配置两个东西,

1、ui可视化元数据(如何显示图表的相关配置)。需用用户配置output包含{mlpipeline-ui-metadata: /mlpipeline-ui-metadata.json}(注意:json要严格正确),并且业务代码中将元数据写入该文件。这样生成的workflow中就会包含Outputs.artifacts: - {name: mlpipeline-ui-metadata, path: /mlpipeline-ui-metadata.json}

{"version": 1,"outputs": [{"storage": "inline","source": "# 大标题","type": "markdown"},{...}]
}

2、ui可视化数据(显示的数值,csv格式)。也就是用户要生成用于显示的真实的数值。目前只支持csv格式,这个csv要放在ui组件能够访问的地方。这样在运行完task,就可以在output里面看到ui元数据的json,

在Visualizations里面看到可视化效果了

目前支持的task的可视化图表类型支持:

  • Confusion matrix:混淆矩阵
  • Markdown
  • ROC curve
  • Table
  • TensorBoard
  • Web app:原生html

支持的数据文件存储类型包含:

  • Inline:数据内容直接在元数据中
  • http/https:下载csv
  • minio:打包压缩后的
  • tensorboard:目录

下面是几种mlpipeline-ui-metadata.json配置示例

{"outputs": [{"storage": "inline","source": "# 大标题","type": "markdown"},{"type": "web-app","storage": "inline","source": "<h1>Hello, World!</h1>"},{"type": "table","storage": "minio","format": "csv","header":["target", "predicted","count"],"source": "minio://public/pengluan/my-matrix.tgz"},{"type": "table","storage": "http","format": "csv","header":["target", "predicted","count"],"source": "http://kubeflow.music.woa.com/static/mnt/pengluan/test/my-matrix.csv"}]
}

可视化效果可以参考官网

另外需要专门说一下tensorboard。tensorboard为tf训练效果的查询,tf训练我们更倾向于训练中查看效果,而不是在训练后查看效果。并且为尽量满足多用户,多种模型,多次训练,实时离线都可以看的目的,我们推荐使用Jupyter-tensorboard的方案,集成方式为在jupyterlab中安装pip install tensorboard==2.2,新版本不太兼容Jupyter-tensorboard,然后再集成jupyter插件

jupyter labextension install jupyterlab_tensorboard

使用时,用户进入个人notebook,进入想查看的训练的目录

再点击tensorboard按钮,就会启动一个终端,这个终端会以当前目录为logdir参数传给tensorboard命令。然后就可以可视化该目录下和子目录下包含的训练效果数据了。

定时调度

当调试全部结束,我们确定pipeline和代码没有问题以后,就可以配置定时调度了。

Kubeflow Pipepline定时调度,是创建一个ScheduledWorkflow.kubeflow.org,由ml-pipeline-scheduledworkflow容器监听执行。

当然如果你觉的一些逻辑不符合你的使用,你完全可以全部自定义定时调度系统。cube机器学习平台没有使用ScheduledWorkflow,因为用户在使用中需要有很多自定义变量的需求,需要每次调度都有一些变量的值在变化,或者需要多用户,添加一些新字段。所以使用celery自己进行调度管理。

celery调度

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend-task result store)组成。

消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

celery调度框架比较成熟,可以在外网查询了解。

定时清理

为了能在pipeline的ui中实时看日志和更方便的日志回看,我们在pipeline中配置的task运行结束不自动删除,这样用户能反复回看日志。但是过期pod还是需要有清理机制,通过上面的celery,机器学习平台提供定时清理3天过期的workflow,tfjob,pytorchjob,xgbjob,以及超参训练的实验。

监控告警

调度系统都需要有个调度通知,在创建pipeline中,用户可以配置当期pipeline在什么样的状况下需要进行通知。比如任务开始运行时,挂起时,运行成功或者失败时。这些元数据会保存在数据库中,并且pipeline的创建者和运行者会通过label的形式被携带到task的pod上。

另外平台会启动watch组件,通过k8s api server,实时监听集群运行的任务,获取任务流的状态,根据用户的配置,和历史推送的情况,决定是否推送内容给用户。

另外需要注意的时,当pipeline开始运行时,pipeline状态转为Running,但是如果过程中遇到某个task pod挂起,整个pipeline的状态依然是Running,所以需要根据平台产品侧的设计来纠正某些状态,来进行报警。tfjob同样包含问题,如果部分worker挂起,整体的状态依然是Running。

cube云原生机器学习平台-架构(四)相关推荐

  1. cube云原生机器学习平台-架构(五)

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发,在线的 ...

  2. cube云原生机器学习平台-架构(三)

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发,在线的 ...

  3. cube云原生机器学习平台-架构(二)

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发,在线的 ...

  4. cube开源一站式云原生机器学习平台-架构(二)

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发,在线的 ...

  5. cube开源一站式云原生机器学习平台-架构(一)

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发,在线的 ...

  6. cube开源一站式云原生机器学习平台

    https://github.com/tencentmusic/cube-studio cube云原生机器学习平台-架构(一) cube云原生机器学习平台-架构(二) cube云原生机器学习平台-架构 ...

  7. cube一站式云原生机器学习平台-加速分布式任务的运行效率

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是tme开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发, ...

  8. cube一站式云原生机器学习平台-推理服务的工程化加速

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是tme开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发, ...

  9. cube开源一站式云原生机器学习平台-推理服务的工程化加速

    全栈工程师开发手册 (作者:栾鹏) 一站式云原生机器学习平台 前言:cube是tme开源的云原生机器学习平台,目前包含特征平台,支持在/离线特征:数据源管理,支持结构数据和媒体标注数据管理:在线开发, ...

最新文章

  1. 在 Python 中妙用短路机制
  2. banana pi BPI-R1 原理图下载地址:
  3. 05-JDBC学习手册:JDBC2.0 新特性之批量处理数据
  4. Android 2.2 带来全新拍照增强特性
  5. 为什么用java开发app_安卓开发为什么选择用Java语言
  6. chrome插件分享
  7. CXF发布RestFul WebService和SOAP WebService
  8. BigDecimal数据加法返回值接收
  9. Jquery ajax, Axios, Fetch区别之我见
  10. 单片机编程使用的c语言软件有哪些,单片机编程用什么语言_单片机编程语言推荐...
  11. 新手小白入门编程第1讲 计算机基础知识 JAVA基础知识
  12. 网络游戏外挂编写初级教程
  13. 树莓派学习笔记(通过网线连接)
  14. java translate_java – 当使用translate()方法时,JPanel中的Tit...
  15. Openwrt 下 L2TP搭建
  16. Android微信小尾巴,微信朋友圈小尾巴app
  17. Socket 套接字原理详解
  18. qst -sim 出现 license 错误
  19. Linda Rising:“你相信谁?”
  20. tikz中谐振子(弹簧)的绘制,以及声子色散关系的绘制

热门文章

  1. 苹果版App开发心得
  2. php禁用错误提示,php开启与关闭错误提示详解
  3. bundle java_Bundle类
  4. from injection shell sql to_FROM SQL INJECTION TO SHELL: POSTGRESQL EDITION
  5. background 覆盖 内容_web开发:利用background制作拉窗帘效果
  6. vue(vue-cli+vue-router)+babel+webpack项目搭建入门 (第二步)
  7. 【java笔记】map集合
  8. Tiny Jpeg Decoder (JPEG解码程序) 源代码分析 1:解码文件头
  9. NHK SHV 的 22.2 声道音频系统
  10. tnsnames.ora配置未生效_一文了解网络交换机的6种命令配置模式