为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程 和 文档
本文,我们将阅读教程中slices这一节,并在最后写一个应用。

文章目录

  • 阅读原教程
    • 两个OP
    • OP的使用:slice功能
    • 工作流
  • 浅试Slice功能
    • 问题阐述
    • 传统的脚本
    • dflow v1--non slice
    • dflow v2--slice_para
    • dflow v3--slice_artifact

阅读原教程

两个OP

我们首先看源码中的第一个OP

from typing import Listfrom dflow import Step, Workflow, argo_range
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate, Slicesclass Hello(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'filename': str})@classmethoddef get_output_sign(cls):return OPIOSign({'out_art': Artifact(str)})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:file_num = int(op_in["filename"].split('.')[0][1:])open(op_in["filename"], "w").write("Hello" * file_num)op_out = OPIO({'out_art': op_in["filename"]})return op_out

结合上文,我们知道,所谓OP就是一个操作手册,撰写OP需要遵循特定的模板,而该模板可以简化为下面这个普通的python函数。

def aaa(in_msg: str, in_artifact: Artifact):op_in = {"in_msg": in_msg, "in_artifact": in_artifact}op_out = execute(op_in)out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]return out_msg, out_artifact

我们对照这两个代码块,首先读前两个类方法:hello OP的输入是filename,输出是Artifact类型,初始化该类型需要一个str
下面我们看execute,首先是接收了文件名,然后从文件名中获取file_num,然后新建一个文件,往里写Hello,并重复file_num次,最后把该文件名对应的文件作为输出
结合下文,filename的形式是这样的:

"filename": [f"f{x}.txt" for x in range(10)]

单个的filename是这样的:

f"f{x}.txt" for x in range(10)

所以x是一个数字

f"f{x}.txt"

execute第一句话就是对该文件名进行处理

op_in["filename"].split('.')[0][1:]

下面我们看第二个OP

class Check(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'filename': Artifact(List[str])})@classmethoddef get_output_sign(cls):return OPIOSign()@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:print(op_in["filename"])return OPIO()

再重复一下
所谓OP就是一个操作手册,撰写OP需要遵循特定的模板,而该模板可以简化为下面这个普通的python函数。

def aaa(in_msg: str, in_artifact: Artifact):op_in = {"in_msg": in_msg, "in_artifact": in_artifact}op_out = execute(op_in)out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]return out_msg, out_artifact

我们对照这两个代码块看这个OP
该OP输入是Artifact类型,初始化该类型,需要往Artifact里塞一个list,list里面是文件路径
也就是说,输入是大包裹,里面有一群小包裹
输出是一个空的,啥都没有
execute就是把输入的包裹打印出来

OP的使用:slice功能

该功能是template附属的功能,如果说,template是工人,OP是操作手册,那么slice就相当于:工人重复某个操作手册n次(并行)。
下面是使用方法:

hello = Step("hello",PythonOPTemplate(Hello, image="python:3.8",slices=Slices("{{item}}",input_parameter=["filename"],output_artifact=["out_art"])),parameters={"filename": [f"f{x}.txt" for x in range(10)]},with_param=argo_range(10))

这里的item应该是一个全局变量,应该是slice中的某个item,是一个int

整个step的输入:

parameters={"filename": [f"f{x}.txt" for x in range(10)]}

slice中每步的输入输出是:

input_parameter=["filename"],
output_artifact=["out_art"]

当然了,还可以是其他输入输出,下面是slice的源码:

之所以选择上面两个,是因为我们的hello OP只需要这两个
下面我们对用check step 检查hello OP的结果

check = Step("check",PythonOPTemplate(Check, image="python:3.8"),artifacts={"filename": hello.outputs.artifacts["out_art"]},)

可以看到,该OP接收的是单个的artifacts,结合check OP的定义,我们看到,这是一个大包裹,里面是一堆小包裹,而hello OP的step输出就是一堆小包裹。

工作流

wf = Workflow("slices")
wf.add(hello)
wf.add(check)
wf.submit()

我们定义好了两个OP,然后定义了使用两个OP的step,最后将他们组装即可。

浅试Slice功能

问题阐述

我们有6个xyz文件在in_dir目录里

每个文件第二行有能量信息

我们需要做的事情是,打开文件,读取能量数据。将能量数据和文件名称、文件路径对应起来,存到同一个dataframe里面。

传统的脚本

import os
import pandas as pdroot = r'H:\dflow\dflow_gym\dummy_slice\in_dir'
dump = r'H:\dflow\dflow_gym\dummy_slice'
e_list = []
name_list = []
path_list = []
for a_file in os.listdir(root):a_path = os.path.join(root, a_file)with open(a_path, 'r') as f:f.readline()a_e = float((f.readline()).split()[1])e_list.append(a_e)name_list.append(a_file)path_list.append(a_path)
info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}
info_df = pd.DataFrame(info)
os.chdir(dump)
info_df.to_pickle('info.pickle')

我们把这段脚本打包成一个函数:
(涉及路径的时候加上path,更规范,不容易出错)

import os
import pandas as pd
from pathlib import Pathdef get_e(src_root: str, dump_root: str):e_list = []name_list = []path_list = []for a_file in os.listdir(Path(src_root)):a_path = os.path.join(Path(src_root), a_file)with open(a_path, 'r') as f:f.readline()a_e = float((f.readline()).split()[1])e_list.append(a_e)name_list.append(a_file)path_list.append(a_path)info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}info_df = pd.DataFrame(info)os.chdir(Path(dump_root))info_df.to_pickle('info.pickle')

dflow v1–non slice

下面我们把这段函数改成一个template,外包step和workflow
我们对照一下template的模板:

def aaa(in_msg: str, in_artifact: Artifact):op_in = {"in_msg": in_msg, "in_artifact": in_artifact}op_out = execute(op_in)out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]return out_msg, out_artifact

运行该函数不需要指令,需要包裹,即in_artifact
该函数没有输出指令,但是输出了一个包裹info.pickle,我们需要把info.pickle下载下来。

流程大概是,我们上传in_dir,在云端处理信息以后,把info.pickle下载到指定文件夹。
我们“借鉴”一下源码中example里面的文件上传和下载,相对路径即可。
下面是dflow第一版:

import os
from pathlib import Path
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,upload_packages)
import pandas as pdif "__file__" in locals():upload_packages.append(__file__)class step1(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'in_dir': Artifact(Path),})@classmethoddef get_output_sign(cls):return OPIOSign({'info': Artifact(Path),})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:e_list = []name_list = []path_list = []cwd_ = os.getcwd()os.chdir(op_in['in_dir'])for a_file in os.listdir('./'):with open(a_file, 'r') as f:f.readline()a_e = float((f.readline()).split()[1])e_list.append(a_e)name_list.append(a_file)path_list.append(os.path.abspath(a_file))info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}info_df = pd.DataFrame(info)os.chdir(cwd_)info_df.to_pickle(r'info.pickle')op_out = OPIO({"info": Path(r'info.pickle'),})return op_out

对比二者不难发现,其实就是把程序输入换成了op_in['in_dir'],输出给套了个壳。
下面我们给这个OP套上template,step,workflow的壳。


def test_dflow_v1():wf = Workflow(name='test1')art_in = upload_artifact(Path('in_dir'))print(art_in)step = Step(name='step1',template=PythonOPTemplate(Parse, image="python_diy:3.8"),artifacts={'in_dir': art_in})wf.add(step)wf.submit()while wf.query_status() in ["Pending", "Running"]:time.sleep(1)assert(wf.query_status() == "Succeeded")step = wf.query_step(name="step1")[0]assert(step.phase == "Succeeded")download_artifact(artifact=step.outputs.artifacts["info"])if __name__ == '__main__':test_dflow_v1()

程序运行的时候会遇到如下几个问题:

  1. workflow和step的命名,不能有下划线,可以有中短线,而且不能大写开头
  2. 程序运行需要pandas库,直接upload会出现版本兼容问题,最好在镜像里提前打好,方法参见这篇
    上述这种实现是顺序读取列表

    我们只调用了一次step1就完成了3个文件的处理。
    下面我们用slice功能

dflow v2–slice_para

使用slice功能,我们需要把OP功能缩减成只读取单个文件的功能,然后重复调用。
因此我们需要调整OP的输入为:单个文件
输出为:e, name, path

import os
from pathlib import Path
import pandas as pd
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,upload_packages)
import timeif "__file__" in locals():upload_packages.append(__file__)class getInfo(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'in_f': Artifact(Path),})@classmethoddef get_output_sign(cls):return OPIOSign({'e': float,'name': str,'xyz_path': Path,})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:with open(op_in['in_f'], 'r') as f:f.readline()a_e = float((f.readline()).split()[1])op_out = OPIO({"e": a_e,'name': os.path.basename(op_in['in_f']),'xyz_path':os.path.abspath(op_in['in_f'])})return op_out

另一方面,我们需要汇总所有的信息到 df 里面,所以增加一个OP

class getDF(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'in_es': List[float],'in_names': List[str],'in_paths': List[Path]})@classmethoddef get_output_sign(cls):return OPIOSign({'out_df': Artifact(Path)})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:info = pd.DataFrame({'e': op_in['in_es'], 'names': op_in['in_names'], 'xyz_paths': op_in['in_paths']})info.to_pickle('info_v2.pickle')op_out = OPIO({"out_df": Path('info_v2.pickle'),})return op_out

这两个OP功能加起来是原OP的功能
下面我们把壳套上,照着模板写:

def test_dflow_v2():wf = Workflow(name='test1')art_in = upload_artifact(Path('in_dir'))print(art_in)step1 = Step(name='step1',template=PythonOPTemplate(getInfo, image="python_diy:3.8", slices=Slices("{{item}}",input_parameter=['in_f'],output_parameter=['e', 'name', 'xyz_path'])),parameters = {'in_f': os.listdir(Path('in_dir'))},artifacts={'in_dir': art_in},with_param = argo_range(len(os.listdir(Path('in_dir')))),key='get-e-{{item}}')step2 = Step(name='step2',template=PythonOPTemplate(getDF, image='python_diy:3.8'),parameters={'in_es': step1.outputs.parameters['e'],'in_names': step1.outputs.parameters['name'],'in_paths': step1.outputs.parameters['xyz_path']})wf.add(step1)wf.add(step2)wf.submit()while wf.query_status() in ["Pending", "Running"]:time.sleep(1)assert(wf.query_status() == "Succeeded")step = wf.query_step(name="step2")[0]assert(step.phase == "Succeeded")download_artifact(artifact=step.outputs.artifacts['out_df'])if __name__ == '__main__':test_dflow_v2()

运行以后即为下图所示:

dflow v3–slice_artifact

注意到,上文在slice的时候是对parameter进行的slice,每个子节点都传了所有的文件。
但是我们理想状态是对文件进行slice,每个子任务处理一个文件,因此只需给OP传一个文件即可。
这个似乎没有很好的方法,只能在上传包裹的时候就完成切片,也就是上传一堆小包裹

import os
from pathlib import Path
import pandas as pd
from dflow import Step, Workflow, download_artifact, upload_artifact, argo_range
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate, Slices,upload_packages)
import time
from typing import Listif "__file__" in locals():upload_packages.append(__file__)class getInfo(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'in_f': Artifact(Path)})@classmethoddef get_output_sign(cls):return OPIOSign({'e': float,'name': str,'xyz_path': str,})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:with open(op_in['in_f'], 'r') as f:f.readline()a_e = float((f.readline()).split()[1])op_out = OPIO({"e": a_e,'name': os.path.basename(op_in['in_f']),'xyz_path': os.path.abspath(op_in['in_f'])})return op_outclass getDF(OP):def __init__(self):pass@classmethoddef get_input_sign(cls):return OPIOSign({'in_es': List[float],'in_names': List[str],'in_paths': List[str]})@classmethoddef get_output_sign(cls):return OPIOSign({'out_df': Artifact(Path)})@OP.exec_sign_checkdef execute(self,op_in: OPIO,) -> OPIO:info = pd.DataFrame({'e': op_in['in_es'], 'names': op_in['in_names'], 'xyz_paths': op_in['in_paths']})info.to_pickle('info_v2.pickle')op_out = OPIO({"out_df": Path('info_v2.pickle'),})return op_outdef test_dflow_v3():wf = Workflow(name='test1')upload_list = []for a_file in os.listdir('in_dir'):upload_list.append(Path(Path('in_dir')/a_file))art_in = upload_artifact(upload_list)print(art_in)step1 = Step(name='step1',template=PythonOPTemplate(getInfo, image="python_diy:3.8", slices=Slices("{{item}}",input_artifact=['in_f'],output_parameter=['e', 'name', 'xyz_path'])),artifacts={'in_f': art_in},with_param = argo_range(len(os.listdir(Path('in_dir')))),key='get-e-{{item}}')step2 = Step(name='step2',template=PythonOPTemplate(getDF, image='python_diy:3.8'),parameters={'in_es': step1.outputs.parameters['e'],'in_names': step1.outputs.parameters['name'],'in_paths': step1.outputs.parameters['xyz_path']})wf.add(step1)wf.add(step2)wf.submit()while wf.query_status() in ["Pending", "Running"]:time.sleep(1)assert(wf.query_status() == "Succeeded")step = wf.query_step(name="step2")[0]assert(step.phase == "Succeeded")download_artifact(artifact=step.outputs.artifacts['out_df'])if __name__ == '__main__':test_dflow_v3()

完整代码如上,需要注意的是:
os.path.abspath() 是返回一个str,而云端(linux)上的path和local(win)下面的path不一样
所以在拼info的时候要用str,而不是path

dflow入门2——Slices相关推荐

  1. 有没有python的班_【万字长文】别再报班了,一篇文章带你入门Python

    最近有许多小伙伴后台联系我,说目前想要学习Python,但是没有一份很好的资料入门.一方面的确现在市面上Python的资料过多,导致新手会不知如何选择,另一个问题很多资料内容也很杂,从1+1到深度学习 ...

  2. python基础论文_北大博士Python学习笔记,Python基础语法总结,一篇文章带你入门...

    image.png 网上现在Python学习资料有很多,但是很杂.很多初学Python的朋友就不知道该怎么去抉择,那些是自己当下所需要的. 刚好朋友是北大的博士,在IT行业也工作八年了.就把他学习Py ...

  3. 万字长文,一篇文章带你入门Python

    注释 Python中用#表示单行注释,#之后的同行的内容都会被注释掉. 很多人学习python,不知道从何学起. 很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手. 很多已经做案例 ...

  4. 零基础入门深度学习(7) - 递归神经网络

    无论即将到来的是大数据时代还是人工智能时代,亦或是传统行业使用人工智能在云上处理大数据的时代,作为一个有理想有追求的程序员,不懂深度学习(Deep Learning)这个超热的技术,会不会感觉马上就o ...

  5. 簇的局部变量中布尔类型_GO语言入门(go的基本类型)

    本文节选自<go入门指南> 如果觉得文章太长,可以直接看末尾的总结. 常量 常量使用关键字 const 定义,用于存储不会改变的数据. 存储在常量中的数据类型只可以是布尔型.数字型(整数型 ...

  6. Spark入门阶段一之扫盲笔记

    介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率 ...

  7. VideoCodec 入门篇 - 00 (编解码简介)

    目录 1.基本术语 (Basic Terminology) 1.1.图像 (Image) 1.2.像素 (Pixel) 1.3.颜色深度 (Color Depth) 1.4.分辨率 (Resoluti ...

  8. Python从放弃到入门,公众号历史文章爬取成pdf的项目实践与自主学习法

    这篇文章不谈江流所专研的营销与运营,而聊一聊技能学习之路,聊一聊Python这门最简单的编程语言该如何学习,我完成的第一个Python项目,将任意公众号的所有历史文章导出成PDF电子书. 或许我这个P ...

  9. python_pandas入门(by offical document/reference)/loc和iloc操作/dataframe插入操作/pandas读取无表头的文件/查找某一列是否有某个值

    文章目录 Pandas starter starter:学习第一步 pandas数据结构概念 十分钟了解pandas的基本特性 ten minutes to learn about the basci ...

  10. Python 数据科学入门教程:Matplotlib

    Matplotlib 入门教程 来源:Introduction to Matplotlib and basic line 译者:飞龙 协议:CC BY-NC-SA 4.0 在线阅读 PDF格式 EPU ...

最新文章

  1. 高校选课成绩管理系统
  2. 极市分享|第34期 袁源Jerry:机器学习及深度学习在自动驾驶中的应用
  3. import导入模块
  4. asp.net mvc3 网站退出系统后使用浏览器的回退按钮依然能返回到内容页的处理方法...
  5. java 安卓权限_java – Android运行时权限 – 如何实现
  6. 文都计算机考研辅导班哪个好,考研辅导班哪个好,海文还是文都?
  7. 2018-09-25 参考博客Hadoop
  8. python上传excel文件_POST上传的excel(xls)文件,如何直接读进pandas,避免写入到磁盘?...
  9. 骇基-黑客攻防实战入门⑴
  10. 着色近似算法——韦尔奇-鲍威尔(Welch-Powell)点着色算法
  11. CrossApp推出移动应用开发神器 CrossApp Style
  12. 2022年九款大数据数据分析软件工具推荐
  13. 【转载】Java并发编程:volatile关键字解析 by 海子
  14. Chemistry Methods | Image2SMILES+: Transformer-Based Molecular Optical Recognition Engine
  15. Python入门学习笔记第五章——if条件句~~~
  16. 执行this.$destory()指令后,原生DOM也没有响应的问题
  17. 如何剪辑视频,视频画中画制作教程分享
  18. Web超简单入门(附带项目的讲解)
  19. HEVC 常用之 CU、PU、TU分析
  20. 英国脱欧给云计算行业带来震动:六大典型场景解析

热门文章

  1. 基于WizFi220的Cosmo WiFi扩展板
  2. 户外运动手持GPS设备常识汇总
  3. axure中继器求和_Axure学习笔记-中继器
  4. linux win10虚拟内存,Win10虚拟内存设置多少合适?这样设置就最好!
  5. 虚拟内存怎么设置最好?虚拟内存设置多少合适
  6. yyds,Python爬虫从小白到Bigboss全套学习路线+视频+资料
  7. 如何进行app消息推送(push)
  8. c语言total用法,C语言 这个表达式怎么理解 新手请大神详述total += isalnum(ch[i])!=0;...
  9. linux安装程序时Cannot uninstall XXX. It is a distutils installed project and thus we cannot accurately
  10. 手动修改dns服务器设置,【当贝市场】怎样手动设置DNS服务器?教程如下