Celery源码阅读 result
一、ResultBase
Result基类,parent用于某个任务是一条任务链中的子任务时标记父任务。
class ResultBase:"""Base class for results."""#: Parent result (if part of a chain)parent = None
二、AsyncResult
继承自ResultBase的Result类,task默认使用AsyncResult作为结果类。
1. 属性
def __init__(self, id, backend=None,task_name=None, # deprecatedapp=None, parent=None):if id is None:raise ValueError(f'AsyncResult requires valid id, not {type(id)}')self.app = app_or_default(app or self.app)self.id = idself.backend = backend or self.app.backendself.parent = parentself.on_ready = promise(self._on_fulfilled, weak=True)self._cache = Noneself._ignored = False
- app:app,若未指定则使用默认app。
- id:result对应的task的id。
- backend:结果后端,未指定则使用app绑定的结果后端。
- parent:父任务对应的Result对象。
- on_ready:
- _cache:用于保存result对应的元任务。
- _ignored:若为Ture则忽略任务结果。
2. property
- ignored,对应_ignored,若为True忽略任务结果。
@property
def ignored(self):"""If True, task result retrieval is disabled."""if hasattr(self, '_ignored'):return self._ignoredreturn False@ignored.setter
def ignored(self, value):"""Enable/disable task result retrieval."""self._ignored = value
观察get函数,可以发现当ignored为True时直接返回。
def get(self, timeout=None, propagate=True, interval=0.5,no_ack=True, follow_parents=True, callback=None, on_message=None,on_interval=None, disable_sync_subtasks=True,EXCEPTION_STATES=states.EXCEPTION_STATES,PROPAGATE_STATES=states.PROPAGATE_STATES):if self.ignored:return....
- graph,返回以当前节点为根的有向无环子图。
@cached_property
def graph(self):return self.build_graph()
查看build_graph方法:
def build_graph(self, intermediate=False, formatter=None):graph = DependencyGraph(formatter=formatter or GraphFormatter(root=self.id, shape='oval'),)for parent, node in self.iterdeps(intermediate=intermediate):graph.add_arc(node)if parent:graph.add_edge(parent, node)return graph
build_graph生成了一个DependencyGraph,DependencyGraph是一个有向无环图,通过拓扑排序确定任务的处理顺序。graph的根是当前result节点,通过iterdeps遍历图,最终生成以当前节点为根的子图。
查看iterdeps方法:
def iterdeps(self, intermediate=False):stack = deque([(None, self)])while stack:parent, node = stack.popleft()yield parent, nodeif node.ready():stack.extend((node, child) for child in node.children or [])else:if not intermediate:raise IncompleteStream()
iterdeps通过层次遍历的方式遍历以当前result 节点为根的result子图。生成器中的每一项是当前节点和当前节点的父节点组成的元组。
- supports_native_join,结果后端是否支持本地联接。
@property
def supports_native_join(self):return self.backend.supports_native_join
- children,返回当前任务的子任务。
@property
def children(self):return self._get_task_meta().get('children')def _get_task_meta(self):if self._cache is None:return self._maybe_set_cache(self.backend.get_task_meta(self.id))return self._cachedef _maybe_set_cache(self, meta):if meta:state = meta['status']if state in states.READY_STATES:d = self._set_cache(self.backend.meta_from_decoded(meta))self.on_ready(self)return dreturn metadef _get_task_meta(self):if self._cache is None:return self._maybe_set_cache(self.backend.get_task_meta(self.id))return self._cachedef _set_cache(self, d):children = d.get('children')if children:d['children'] = [result_from_tuple(child, self.app) for child in children]self._cache = dreturn d
_get_task_meta方法首先检查是否设置了缓存,如果没有设置则调用结果后端通过task id获取task,并通过_maybe_set_cache将任务进行缓存。_set_cache调用result_from_tuple()通过当前任务的子任务获取到子任务对应的AsyncResult对象或者GroupResult对象。
三、ResultSet(后续更新)
Celery源码阅读 result相关推荐
- 应用监控CAT之cat-client源码阅读(一)
CAT 由大众点评开发的,基于 Java 的实时应用监控平台,包括实时应用监控,业务监控.对于及时发现线上问题非常有用.(不知道大家有没有在用) 应用自然是最初级的,用完之后,还想了解下其背后的原理, ...
- 源码阅读:SDWebImage(六)——SDWebImageCoderHelper
该文章阅读的SDWebImage的版本为4.3.3. 这个类提供了四个方法,这四个方法可分为两类,一类是动图处理,一类是图像方向处理. 1.私有函数 先来看一下这个类里的两个函数 /**这个函数是计算 ...
- celery源码分析-定时任务
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的定时任务与Django配置 celery也可以执行定时任务来执行相关操作,ce ...
- celery源码分析-Task的初始化与发送任务
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...
- celery源码分析:multi命令分析
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...
- 24 UsageEnvironment使用环境抽象基类——Live555源码阅读(三)UsageEnvironment
24 UsageEnvironment使用环境抽象基类--Live555源码阅读(三)UsageEnvironment 24 UsageEnvironment使用环境抽象基类--Live555源码阅读 ...
- 源码阅读:AFNetworking(八)——AFAutoPurgingImageCache
该文章阅读的AFNetworking的版本为3.2.0. AFAutoPurgingImageCache该类是用来管理内存中图片的缓存. 1.接口文件 1.1.AFImageCache协议 这个协议定 ...
- 【Dubbo源码阅读系列】之远程服务调用(上)
今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道 ...
- TiDB 源码阅读系列文章(六)Select 语句概览
在先前的 TiDB 源码阅读系列文章(四) 中,我们介绍了 Insert 语句,想必大家已经了解了 TiDB 是如何写入数据,本篇文章介绍一下 Select 语句是如何执行.相比 Insert,Sel ...
最新文章
- Pliops XDP(Extreme Data Processor)数据库存储设计的新型加速硬件
- php mysql 删除数据库,MySQL——删除数据库
- mysql的如何输入dateadd_mysql中date_add()函数的使用?
- LeetCode 147. 对链表进行插入排序(链表)
- Android简单通讯录从list取数据并显示 eclipse开发
- UVAPOJ离散概率与数学期望入门练习[4]
- java 链表逆转_java 实现单链表逆转详解及实例代码
- mysql mybatis 主键id_MyBatis+MySQL 返回插入的主键ID
- S3cCTF-gyy-Writeup
- linux下开源电子设计软件
- 【开发心得】json解析报错Uncaught SyntaxError: Unexpected identifier的解决方法
- C#数据库教程2-ADO.NET常用SQL语句
- Kernel, tainted, 被污染的实例
- 这可能是最简单,精炼,有效的magisk 安装教程,附boot.img 提取方法
- 清理winsxs的小工具
- excel只显示公式,不显示结果
- 90后程序员职场报告:月薪普遍过万 超七成有房有车 女性程序员不足一成
- Android 应用进程保活APP常驻内存研究方案
- 王者荣耀天赐语音包怎么获得?天赐语音包获取方法介绍
- python3新式类_Python中新式类与经典类的区别详析