综述

同task,job结束时,并不会返回job finish 的信息,也是在DagScheduler.schedule_running_job() 轮询时,通过calculate_job_status() 方法获取job的状态。
获取job已经finish的信息后,会返还资源和进行相关环境清理。

执行步骤

  1. dag_scheduler.py:当调度完task后,执行calculate_job_status,计算job当前的状态。执行calculate_job_progress,计算job当前的完成进度。
    在${job_log_dir}/fate_flow_schedule.log 输出日志
[INFO] [2021-07-26 08:20:47,782] [1:140259369826048] - dag_scheduler.py[line:310]: Job 202107260820309976351 status is success, calculate by task status list: ['success']
  1. dag_scheduler.py:根据job 状态和进度变化,同步信息,save model等
    下面这段代码没执行,逻辑上有点问题
            if int(new_progress) - job.f_progress > 0:job.f_progress = new_progressFederatedScheduler.sync_job(job=job, update_fields=["progress"])cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])

存疑
当task状态改变时TaskScheduler.schedule 返回的initiator_tasks_group 中的task状态未改变
结果导致 DAGScheduler.schedule_running_job 中 total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)
无改变,不会触发 后续流程,会多跑一遍。
建议在task schedule结束之后,再query一遍task的信息,更新状态。

            if new_job_status != job.f_status:job.f_status = new_job_statusif EndStatus.contains(job.f_status):FederatedScheduler.save_pipelined_model(job=job)FederatedScheduler.sync_job_status(job=job)cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])

job 状态变换,且job 已经finish,调用FederatedScheduler.save_pipelined_model
调用链 FederatedScheduler.save_pipelined_model() -> api_utils.federated_coordination_on_http() 这里endpoint 是model-> fate_flow_server通过flask -> party_app.save_pipelined_model() -> JobController.save_pipelined_model
在${job_log_dir}/fate_flow_schedule.log 输出日志

[INFO] [2021-07-26 08:20:47,783] [1:140259369826048] - federated_scheduler.py[line:78]: try to save job 202107260820309976351 pipelined model
[INFO] [2021-07-26 08:20:47,791] [1:140259119585024] - job_controller.py[line:300]: job 202107260820309976351 on local 0 start to save pipeline
[INFO] [2021-07-26 08:20:47,800] [1:140259119585024] - job_controller.py[line:340]: job 202107260820309976351 on local 0 save pipeline successfully
[INFO] [2021-07-26 08:20:47,804] [1:140259369826048] - federated_scheduler.py[line:81]: save job 202107260820309976351 pipelined model success

注意:如果是predict,因为使用的是已经存在的model,这里不会再存储
模型存储的路径在
/data/projects/fate/model_local_cache/KaTeX parse error: Expected 'EOF', got '#' at position 7: {role}#̲{party}#KaTeX parse error: Expected 'EOF', got '#' at position 17: …component_name}#̲model/{jobid}
/data/projects/fate/model_local_cache/local#0#local-0#model/202107260820309976351

  1. dag_scheduler.py:save完成,同步信息执行FederatedScheduler.sync_job_status(job=job) 调用链FederatedScheduler.sync_job_status() ->-api_utils.federated_coordination_on_http() 这里endpoint 是status -> fate_flow_server通过flask -> party_app.job_status -> JobController.update_job_status -> ResourceManager.return_job_resource
    在update_job_status中,判断job处于finish 状态,回收资源。
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志
[INFO] [2021-07-26 08:20:47,804] [1:140259369826048] - federated_scheduler.py[line:68]: job 202107260820309976351 is success, sync to all party
[INFO] [2021-07-26 08:20:47,810] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
[INFO] [2021-07-26 08:20:47,830] [1:140259119585024] - job_saver.py[line:48]: update job 202107260820309976351 status successfully
[INFO] [2021-07-26 08:20:47,869] [1:140259119585024] - resource_manager.py[line:175]: return job 202107260820309976351 resource(cores 4 memory 0) on local 0 successfully, remaining cores: 20 remaining memory: 0
[INFO] [2021-07-26 08:20:47,874] [1:140259369826048] - federated_scheduler.py[line:71]: sync job 202107260820309976351 status success to all party success
  1. dag_scheduler.py:update_job_on_initiator() ,
    调用链 update_job_on_initiator -> JobSaver.update_job_status() -> JobSaver.update_status -> JobSaver.update_entity_table (由于是local,jobstatus 已经更新,前一步失败,这里不会执行)-> JobSaver.update_job() -> JobSaver.update_entity_table
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志
[INFO] [2021-07-26 08:20:47,884] [1:140259369826048] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
[INFO] [2021-07-26 08:20:47,894] [1:140259369826048] - job_saver.py[line:56]: update job 202107260820309976351 status does not take effect
[INFO] [2021-07-26 08:20:47,894] [1:140259369826048] - job_saver.py[line:61]: try to update job 202107260820309976351
[WARNING] [2021-07-26 08:20:47,904] [1:140259369826048] - job_saver.py[line:66]: job 202107260820309976351 update does not take effect: {'job_id': '202107260820309976351', 'role': 'local', 'party_id': '0', 'status': 'success'}
  1. dag_scheduler.py:执行finish 执行stop_job
    DagScheduler.stop_job() -> FederatedScheduler.stop_job -> api_utils.federated_coordination_on_http() 这里endpoint 是stop -> fate_flow_server通过flask -> party_app.stop_job -> JobController.stop_jobs -> JobController.stop_job ->TaskController.stop_task -> JobController.update_job_status
    注意,因为job是拆解成一个个task执行,故执行进程只有task的,没有job的进程。stop的job的操作是依次遍历job下的task,进行stop_task。在正常流程下,由于所有task都stop之后,才会触发stop_job。故而这里stop_task 都是can not found。
    对应在${job_log_dir}/fate_flow_schedule.log 输出日志
[INFO] [2021-07-26 08:20:47,904] [1:140259369826048] - dag_scheduler.py[line:502]: Job 202107260820309976351 finished with success, do something...
[INFO] [2021-07-26 08:20:47,905] [1:140259369826048] - dag_scheduler.py[line:436]: request stop job 202107260820309976351 with success
[INFO] [2021-07-26 08:20:47,913] [1:140259369826048] - dag_scheduler.py[line:445]: request stop job 202107260820309976351 with success to all party
[INFO] [2021-07-26 08:20:47,913] [1:140259369826048] - federated_scheduler.py[line:88]: try to stop job 202107260820309976351
[INFO] [2021-07-26 08:20:47,937] [1:140259119585024] - job_utils.py[line:396]: try to stop job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,937] [1:140259119585024] - job_utils.py[line:399]: can not found job 202107260820309976351 task 202107260820309976351_upload_0 local 0 with success party status process pid:90
[INFO] [2021-07-26 08:20:47,939] [1:140259119585024] - job_utils.py[line:423]: start run subprocess to stop task session 202107260820309976351_upload_0_0_local_0
[INFO] [2021-07-26 08:20:47,939] [1:140259119585024] - job_utils.py[line:310]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop
[INFO] [2021-07-26 08:20:47,953] [1:140259119585024] - job_utils.py[line:333]: start process command: python3 /data/projects/fate/python/fate_flow/utils/session_utils.py -j 202107260820309976351_upload_0_0_local_0 --computing EGGROLL --federation EGGROLL --storage EGGROLL -c stop successfully, pid is 153
[INFO] [2021-07-26 08:20:47,954] [1:140259119585024] - task_controller.py[line:254]: job 202107260820309976351 task 202107260820309976351_upload_0 0 on local 0 process 90 kill success

然后调用 JobController.update_job_status 更新信息,不同于3中,这里执行失败,不会进行资源回收。

[INFO] [2021-07-26 08:20:47,984] [1:140259119585024] - job_saver.py[line:45]: try to update job 202107260820309976351 status to success
[INFO] [2021-07-26 08:20:47,991] [1:140259119585024] - job_saver.py[line:56]: update job 202107260820309976351 status does not take effect

返回成功信息

[INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - federated_scheduler.py[line:92]: stop job 202107260820309976351 success
[INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - dag_scheduler.py[line:448]: stop job 202107260820309976351 with success successfully
  1. dag_scheduler.py:执行finish 执行clean_job
    FederatedScheduler.clean_job() -> api_utils.federated_coordination_on_http() 这里endpoint 是clean -> fate_flow_server通过flask -> party_app.stop_job -> JobController.stop_jobs -> JobController.clean_job
    然后clean_job,只打了日志,啥也没干
    def clean_job(cls, job_id, role, party_id, roles):schedule_logger(job_id).info('Job {} on {} {} start to clean'.format(job_id, role, party_id))# todoschedule_logger(job_id).info('job {} on {} {} clean done'.format(job_id, role, party_id))

对应在${job_log_dir}/fate_flow_schedule.log 输出日志

[INFO] [2021-07-26 08:20:47,997] [1:140259369826048] - federated_scheduler.py[line:107]: try to clean job 202107260820309976351
[INFO] [2021-07-26 08:20:48,003] [1:140259119585024] - job_controller.py[line:344]: Job 202107260820309976351 on local 0 start to clean
[INFO] [2021-07-26 08:20:48,003] [1:140259119585024] - job_controller.py[line:346]: job 202107260820309976351 on local 0 clean done
[INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - federated_scheduler.py[line:110]: clean job 202107260820309976351 success
  1. dag_scheduler.py: finish执行结束,输出日志
[INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - dag_scheduler.py[line:505]: Job 202107260820309976351 finished with success, done

至此,job finish 结束,当然,调度器本轮调度也结束了,打出日志

[INFO] [2021-07-26 08:20:48,007] [1:140259369826048] - dag_scheduler.py[line:325]: finish scheduling job 202107260820309976351

FATE学习:跟着日志读源码(九)upload任务job finsih阶段相关推荐

  1. Flink跟着问题读源码 - SlidingEventTimeWindows接reduce结果数据倍增

    1.异常情况 使用 SlidingEventTimeWindows 窗口,后面直接一个reduce算子,数据会出现倍增情况. 代码: DataStreamSource<String> lo ...

  2. Spring读源码系列之AOP--03---aop底层基础类学习

    Spring读源码系列之AOP--03---aop底层基础类学习 引子 Spring AOP常用类解释 AopInfrastructureBean---免被AOP代理的标记接口 ProxyConfig ...

  3. 这样读源码,不牛X也难

    程序员在工作过程中,会遇到很多需要阅读源码的场景,比如技术预研.选择技术框架.接手以前的项目.review他人的代码.维护老产品等等.可以说,阅读源代码是程序员的基本功,这项基本功是否扎实,会在很大程 ...

  4. 微信读书vscode插件_跟我一起读源码 – 如何阅读开源代码

    阅读是最好的老师 在学习和提升编程技术的时候,通过阅读高质量的源码,来学习专家写的高质量的代码,是一种非常有效的提升自我的方式.程序员群体是一群乐于分享的群体,因此在互联网上有大量的高质量开源项目,阅 ...

  5. 读源码:PopupWindow

    读源码是为了了解并学习它的实现机制,并更好的运用它,如果在读源码之前已经知道它的怎么运用,这将会更容易理解源码.所以在这读源码开头我推荐阅读一下一位大神写的相关博文,浅显易懂,条理清晰: PopUpW ...

  6. 如何学习一个开源项目源码

    你有个任务,需要用到某个开源项目;或者老大交代你一个事情,让你去了解某个东西.怎么下手呢?如何开始呢?我的习惯是这样: 首先,查找和阅读该项目的博客和资料,通过google你能找到某个项目大体介绍的博 ...

  7. 京东CTO的笔记23种设计模式和5大读源码方法...!网友:这次稳了...

    大家都知道源码框架有23个设计模式,但是我们大多停留在概念层面,真实开发中很少应用到,也不知道如何落地!!!那有没有办法解决了? 我整理了 Mybatis 和 Spring 源码中使用了大量的设计模式 ...

  8. 我是怎么读源码的,授之以渔

    点击上方"视学算法",选择"设为星标" 做积极的人,而不是积极废人 作者 :youzhibing 链接 :https://www.cnblogs.com/you ...

  9. myisam怎么读_耗时半年,我成功“逆袭”,拿下美团offer(刷面试题+读源码+项目准备)...

    欢迎关注专栏[以架构赢天下]--每天持续分享Java相关知识点 以架构赢天下​zhuanlan.zhihu.com 以架构赢天下--持续分享Java相关知识点 每篇文章首发此专栏 欢迎各路Java程序 ...

最新文章

  1. 当思想与机器融合:脑机接口与人类的现在、困境与未来
  2. Shell 输入/输出重定向
  3. CTF--base64编码过程中隐藏信息解密脚本
  4. python多线程输出_萌新python多线程
  5. Ubuntu14.04安装mysql
  6. springboot (spring mvc)集成swagger
  7. 快速学习、快速决策、快速改变;突破自我
  8. as3实现(可以操纵的)真正的由惯性导致的漂移
  9. jsp % % ! 区别
  10. Java输出100以内质数
  11. 【群晖秘籍】群晖添加第三方套件,让可用功能更多更好(任性拓展)
  12. 2022年起重机械指挥特种作业证考试题库及答案
  13. Scrum如何拥抱变化
  14. android毛玻璃壁纸效果,【手机教程大赛】制作 毛玻璃效果 壁纸
  15. 点集的读入与输出操作
  16. K8S二进制环境搭建苹果电脑(M1芯片)
  17. 数据库中的三种完整性
  18. Android 屏幕适配 pt暴力适配
  19. 漏洞复现:通过CVE-2022-30190上线CS
  20. 前端Vue项目搭建过程概述

热门文章

  1. VEGAS中的项目工程文件打不开该如何解决?
  2. poj2236 Wireless Network
  3. (附源码)ssm校园拼车服务系统 毕业设计211633
  4. PACS医学影像工作站源码 采用VC++编程语言,提供先进的3D图像处理和高级算法开发
  5. 在vue项目中统一管理api
  6. 1.2 nuclei sdk gd32vf03 启动文件分析
  7. java界面初始化,初始化jsp页面方法
  8. 图的一些基本知识:关联矩阵、拉普拉斯矩阵
  9. day 01 - 1-2-Python 猜数字游戏
  10. 商品期货CTA策略系列文章 -- 主流量化CTA策略之趋势跟踪策略