[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型

文章目录

  • [源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型
    • 0x00 摘要
    • 0x01 Consumer 组件 Pool bootstep
      • 1.1 bootsteps
    • 0x02 进程池入口 -- TaskPool
      • 2.1 进程池初始化
      • 2.2 进程池启动 start
    • 0x03 进程池实现 -- AsynPool
      • 3.1 实例化
      • 3.2 建立通讯机制 queues
        • 3.2.1 _SimpleQueue
        • 3.2.2 Pipe
        • 3.2.3 Connection
      • 3.3 进程池基类构造方法
        • 3.3.1 建立子进程
          • 3.3.1.1 子进程工作代码
          • 3.3.1.2 子进程抽象封装 --- WorkerProcess
            • 3.3.1.2.1 WorkerProcess 具体执行
            • 3.3.1.2.2 基类 BaseProcess
            • 3.3.1.2.3 加入进程列表
          • 3.3.1.3 fork 过程
        • 3.4.2 辅助管理 Supervisor
        • 3.3.3 给子进程分配任务 ---- TaskHandler
        • 3.4.3 处理子进程返回 --- ResultHandler
      • 3.5 配置file 到 queue 的关系
      • 3.6 AsynPool 总体结果
    • 0xFF 参考

0x00 摘要

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。因为 Celery 通过多进程来提高执行效率,所以本文将带领大家初步了解 Celery 之 多进程架构和模型。

通过本文,大家可以了解为了实现一个多进程架构,Celery 都有哪些思考,做了哪些抽象,比如:

  • Celery 作为一个整体系统如何把多进程模型融入进来,从而得到进程池;
  • 如何根据不同 OS 实例化不同的多进程模型;
  • 如何建立父子进程之间的通讯机制,如何读写分离;
  • 如何生成子进程,子进程工作逻辑为何,如何抽象子进程;
  • 如何辅助管理子进程;
  • 如何给子进程分配任务;
  • 如何处理子进程返回;

我们先给出一个粗略逻辑,让大家有一个大致逻辑,便于后续的理解。下图中需要注意的是:

  • TaskHandler 是父进程给子进程分配任务的逻辑;
  • ResultHandler 是父进程处理子进程返回的逻辑;
  • Supervisor 是辅助管理handler;
  • Worker 是子进程逻辑业务代码,_pool 是进程池,ForkProcess(也就是 WorkerProcess)是子进程抽象,每个子进程抽象都会运行 Worker。

这几个逻辑概念一定要分清楚。

+--------------------------+
| AsynPool                 |
|                          |
|                          |
|         ResultHandler  +------->  celery.concurrency.asynpool.ResultHandler
|                          |
|         Supervisor     +------->  billiard.pool.Supervisor
|                          |
|         TaskHandler    +------->  billiard.pool.TaskHandler
|                          |
|         TimeoutHandler +------->  billiard.pool.TimeoutHandler
|                          |
|         Worker         +------->  celery.concurrency.asynpool.Worker
|                          |
|         _pool +-----------------+--->  <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+      |+--->  <ForkProcess(ForkPoolWorker-2, started daemon)>|+--->  <ForkProcess(ForkPoolWorker-3, started daemon)>|+--->  <ForkProcess(ForkPoolWorker-4, started daemon)>

手机如下

多进程入口位于 Consumer 的 pool step,所以我们从 Consumer 组件启动入手。

0x01 Consumer 组件 Pool bootstep

首先,Consumer Pool 启动从 bootsteps 开始。这个 Bootstep 是 worker 真正的执行引擎

这里的 Pool bootstrap 之所以又做了一层封装,是因为它需要设定一个伸缩值,也就是所谓的 autoscaler。因为我们这里已经构建了各种池,后面有 task 直接往 Pool 里头丢就行。

1.1 bootsteps

代码位于:celery/worker/components.py。这就是一个入口,目的为:

  • 做各种配置;
  • 引入 TaskPool,TaskPool 是 worker 多进程的入口;
class Pool(bootsteps.StartStopStep):def __init__(self, w, autoscale=None, **kwargs):w.pool = Noneif isinstance(autoscale, str):max_c, _, min_c = autoscale.partition(',')autoscale = [int(max_c), min_c and int(min_c) or 0]w.autoscale = autoscaleif w.autoscale:w.max_concurrency, w.min_concurrency = w.autoscalesuper().__init__(w, **kwargs)def create(self, w):semaphore = Nonemax_restarts = Noneif w.app.conf.worker_pool in GREEN_POOLS:  # pragma: no cover  判断worker_pool是在'eventlet', 'gevent'中默认的是preforkthreaded = not w.use_eventloop or IS_WINDOWS            # user_eventloop是否为True和是否是windows如果是则使用线程procs = w.min_concurrency                               # 最小缓冲池个数,默认为4w.process_task = w._process_task                        # 将worker的_process_task绑定到process_taskif not threaded:                                        # 不使用线程的话semaphore = w.semaphore = LaxBoundedSemaphore(procs)  # 通过LaxBoundedSemaphore实现原子操作,利用队列实现w._quick_acquire = w.semaphore.acquire                # 将相关的操作方法赋值给workerw._quick_release = w.semaphore.release                max_restarts = 100                                    # 最大重启次数if w.pool_putlocks and w.pool_cls.uses_semaphore:     # 通过查看类配置是否更新process_task方法w.process_task = w._process_task_sem              # 默认配置更新process_taskallow_restart = w.pool_restarts                           # 是否允许重启        pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,  # w.pool_cls默认是prefork.TaskPoolinitargs=(w.app, w.hostname),maxtasksperchild=w.max_tasks_per_child,max_memory_per_child=w.max_memory_per_child,timeout=w.time_limit,soft_timeout=w.soft_time_limit,putlocks=w.pool_putlocks and threaded,lost_worker_timeout=w.worker_lost_wait,threads=threaded,max_restarts=max_restarts,allow_restart=allow_restart,forking_enable=True,semaphore=semaphore,sched_strategy=self.optimization,app=w.app,)_set_task_join_will_block(pool.task_join_will_block)return pool

这里 w.pool_cls 是 <class ‘celery.concurrency.prefork.TaskPool’>,逻辑如下:

+-------------------------------+
| Pool(bootsteps.StartStopStep) |
|                               |
|                               |
|  celery/worker/components.py  |
+---------------+---------------+|||v__init__+|||vcreate+||v+--------+----------+|       TaskPool    ||                   ||            Pool +------->  celery.concurrency.asynpool.AsynPool|                   ||            app  +------->  Celery|                   |+-------------------+

0x02 进程池入口 – TaskPool

TaskPool 是多进程的入口,这里对于所有操作系统都是统一的

因为 这里 w.pool_cls 是 <class ‘celery.concurrency.prefork.TaskPool’>,所以代码来到TaskPool,在初始化时候,instantiate会先来到基类 BasePool,位置在:celery/concurrency/base.py。

2.1 进程池初始化

这里 __init__注意的为:self.app = app ,即初始化时会传入 celery 应用自己

class BasePool:"""Task pool."""Timer = timer2.Timer_state = None_pool = Nonedef __init__(self, limit=None, putlocks=True, forking_enable=True,callbacks_propagate=(), app=None, **options):self.limit = limitself.putlocks = putlocksself.options = optionsself.forking_enable = forking_enableself.callbacks_propagate = callbacks_propagateself.app = app

2.2 进程池启动 start

Blueprint 会调用start。

class Blueprint:def start(self, parent):self.state = RUNif self.on_start:self.on_start()for i, step in enumerate(s for s in parent.steps if s is not None):self.started = i + 1step.start(parent)

由于TaskPool中没有声明start函数,因此这里会调用到其父类BasePool中定义的函数,定义如下

class BasePool(object):"""Task pool."""def start(self):self._does_debug = logger.isEnabledFor(logging.DEBUG)self.on_start()self._state = self.RUN

这里会调用到on_start函数,由于各子类覆盖了该函数,因此会调用子类中的on_start函数,同样地,以TaskPool为例,on_start函数的定义如下

class TaskPool(BasePool):"""Multiprocessing Pool implementation."""Pool = AsynPoolBlockingPool = BlockingPooluses_semaphore = Truewrite_stats = Nonedef on_start(self):forking_enable(self.forking_enable)Pool = (self.BlockingPool if self.options.get('threads', True)else self.Pool) # 若使用多线程则使用BlockingPool否则使用AsynPoolP = self._pool = Pool(processes=self.limit,initializer=process_initializer,on_process_exit=process_destructor,enable_timeouts=True,synack=False,**self.options) # 创建Pool# Create proxy methods 创建代理self.on_apply = P.apply_async # 将pool中的方法设置到Pool类上self.maintain_pool = P.maintain_poolself.terminate_job = P.terminate_jobself.grow = P.growself.shrink = P.shrinkself.flush = getattr(P, 'flush', None)  # FIXME add to billiard

可以看到,on_start函数主要完成了3个工作

  • 根据选项参数确定使用BlockingPool还是AsynPool(分别为billiard.pool.Poolcelery.concurrency.asynpool.AsyncPool);
  • 创建Pool;
  • 创建代理方法;

这里 windows 系统中,对应的 _pool 为 <class ‘billiard.pool.Pool’>,mac系统则为:AsyncPool

此时具体逻辑如下:

+------------------------------+
| Pool(bootsteps.StartStopStep)|
+-------------+--------------+|||1 | instantiate|                          2 on_start|        +--------------------------------+v        |                                |+-------+--------+--+                             ||    TaskPool       |                             ||                   |      +------+               ||       app   +----------> |celery|               ||                   |      +------+               ||                   |                             ||                   |      +-----------+          ||      _pool  +----------> | AsynPool  |          ||                   |      +-----------+          |+----------------+--+                             |^                                ||                                |+--------------------------------+

0x03 进程池实现 – AsynPool

_pool 是根据操作系统的不同进行了分别实现。可以看到这里配置了管道,file,queue,以及真实的 具体线程池。

假设系统为 mac,于是来到了 AsynPool 这个进程池。位置在:celery/concurrency/asynpool.py

3.1 实例化

主要是执行了进程池 Pool的实例化。这个实例化就是 prefork 的具体实现。这个 Pool 其实就是 AsyncPool

具体工作为:

  • 配置调度策略;

  • 根据本机配置进程数量,就是需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值;

  • 创建出来一堆读和写的管道。根据流向的不同和主进程与子进程的不同,之后会分别关闭对应的的一端的管道,比如父进程把写关闭,子进程就把读关闭。并会用抽象的数据结构进行封装以便于管理。这个数据结构的实例用来为主进程和即将 fork 的子进程提供双向的数据传输。同样的,会根据子进程的数量创建出多个管道实例来;

  • 调用基类构造方法。这里为 fork 的关键所在;

  • 根据建立子进程结果,配置file 到 queue 的关系;

代码如下:

class AsynPool(_pool.Pool):"""AsyncIO Pool (no threads)."""ResultHandler = ResultHandlerWorker = Workerdef WorkerProcess(self, worker):worker = super().WorkerProcess(worker)worker.dead = Falsereturn workerdef __init__(self, processes=None, synack=False,sched_strategy=None, proc_alive_timeout=None,*args, **kwargs):self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,sched_strategy)processes = self.cpu_count() if processes is None else processes #需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值self.synack = synack# create queue-pairs for all our processes in advance.self._queues = {self.create_process_queues(): None for _ in range(processes) #创建出来一堆读和写的管道}# inqueue fileno -> process mappingself._fileno_to_inq = {}# outqueue fileno -> process mappingself._fileno_to_outq = {}# synqueue fileno -> process mappingself._fileno_to_synq = {}# We keep track of processes that haven't yet# sent a WORKER_UP message.  If a process fails to send# this message within _proc_alive_timeout we terminate it# and hope the next process will recover.self._proc_alive_timeout = (PROC_ALIVE_TIMEOUT if proc_alive_timeout is Noneelse proc_alive_timeout)self._waiting_to_start = set()# denormalized set of all inqueues.self._all_inqueues = set()# Set of fds being written to (busy)self._active_writes = set()# Set of active co-routines currently writing jobs.self._active_writers = set()# Set of fds that are busy (executing task)self._busy_workers = set()self._mark_worker_as_available = self._busy_workers.discard# Holds jobs waiting to be written to child processes.self.outbound_buffer = deque()self.write_stats = Counter()super().__init__(processes, *args, **kwargs) #调用基类构造方法for proc in self._pool:# create initial mappings, these will be updated# as processes are recycled, or found lost elsewhere.self._fileno_to_outq[proc.outqR_fd] = procself._fileno_to_synq[proc.synqW_fd] = procself.on_soft_timeout = getattr(self._timeout_handler, 'on_soft_timeout', noop,)self.on_hard_timeout = getattr(self._timeout_handler, 'on_hard_timeout', noop,)

3.2 建立通讯机制 queues

实例化代码中,queues 的建立需要重点说明,因为父进程 和 子进程 之间使用 queue 来进行通讯

代码如下:

self._queues = {self.create_process_queues(): None for _ in range(processes)
}

这里创建出来一堆读和写的管道,这里进程数量为 4,因此建立 4 组管道列表,每组列表包括两个_SimpleQueue,具体如下。

self._queues = {dict: 4} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} (<billiard.queues._SimpleQueue object>, <billiard.queues._SimpleQueue object at 0x = {NoneType} __len__ = {int} 4

建立 queues 方法如下,这里建立了 inq, outq, synq:

def create_process_queues(self):"""Create new in, out, etc. queues, returned as a tuple."""# NOTE: Pipes must be set O_NONBLOCK at creation time (the original# fd), otherwise it won't be possible to change the flags until# there's an actual reader/writer on the other side.inq = _SimpleQueue(wnonblock=True)outq = _SimpleQueue(rnonblock=True)synq = Noneif self.synack:synq = _SimpleQueue(wnonblock=True)return inq, outq, synq

3.2.1 _SimpleQueue

_SimpleQueue为一个locked pipe,即管道。定义如下:

class _SimpleQueue(object):'''Simplified Queue type -- really just a locked pipe'''def __init__(self, rnonblock=False, wnonblock=False, ctx=None):self._reader, self._writer = connection.Pipe(duplex=False, rnonblock=rnonblock, wnonblock=wnonblock,)self._poll = self._reader.pollself._rlock = self._wlock = None

变量举例如下:

self._poll = {method} <bound method _ConnectionBase.poll of <billiard.connection.Connection self = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7fc46ae049e8>_reader = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>_writer = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

3.2.2 Pipe

上文中,_SimpleQueue 的 self._reader, self._writer 是 pipe 类型,所以需要看看。

pipe 的定义如下:

其实就建立了两个Connection,返回给_SimpleQueue,这两个Connection一个为读抽象,一个为写抽象

if sys.platform != 'win32':def Pipe(duplex=True, rnonblock=False, wnonblock=False):'''Returns pair of connection objects at either end of a pipe'''if duplex:s1, s2 = socket.socketpair()s1.setblocking(not rnonblock)s2.setblocking(not wnonblock)c1 = Connection(detach(s1))c2 = Connection(detach(s2))else:fd1, fd2 = os.pipe()if rnonblock:setblocking(fd1, 0)if wnonblock:setblocking(fd2, 0)c1 = Connection(fd1, writable=False)c2 = Connection(fd2, readable=False)return c1, c2

3.2.3 Connection

上面又涉及到了 Connection,注意这里不是 Kombu 的connection,而是多进程内部自己的 Connection 定义

class Connection(_ConnectionBase):"""Connection class based on an arbitrary file descriptor (Unix only), ora socket handle (Windows)."""

Connection 是 基于 file descriptor 的连接类。

class _ConnectionBase(object):_handle = Nonedef __init__(self, handle, readable=True, writable=True):if isinstance(handle, _SocketContainer):self._socket = handle.sock  # keep ref so not collectedhandle = handle.sock.fileno()handle = handle.__index__()self._handle = handleself._readable = readableself._writable = writable

现在变量如下:

c1 = {Connection} <billiard.connection.Connection object at 0x7fc46ae68c18>
c2 = {Connection} <billiard.connection.Connection object at 0x7fc46ae726a0>

于是 AsynPool 最终如下:

    +------------------------------+                                                                     +----------------+| Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |+-------------+--------------+                                            |  _SimpleQueue   |        |                ||                                                           |                 |        |        _write  ||                                                           |      _reader +---------> |        _read   ||                                                           |                 |        |        _send   |1 | instantiate                                               |                 |        |        _recv   ||                                                           |                 |        |        _handle |2 on_start       |                                                           |                 |        +----------------+|                                                           |      _poll   +--------->  _ConnectionBase.poll
+-------------+   |                                                           |                 |
|             |   |                                                           |                 |        +------------+
|             |   v                                                           |      _writer +---------> | Connection |
|         +---+---+-----------+                                               |                 |        +------------+
|         |    TaskPool       |                                               +-------+---------+
|         |                   |      +------+                                         ^
|         |       app   +----------> |celery|                                         |
|         |                   |      +------+                                         |
|         |                   |                                                       +
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)
|         |      _pool  +----------> | AsynPool                 |     |
|         |                   |      |                          |     |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)
|             ^                      |                          |     |
|             |                      |                          |     |
|             |                      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)
+-------------+                                                       ||+---->  (<_SimpleQueue>, <_SimpleQueue>)

手机如下:

3.3 进程池基类构造方法

我们要再说说 AsynPool 的基类,这是 Celery 作者专门为 python 多进程做的修改,封装。这里建立了各种 消息处理函数,并且建立了子进程

位置在:billiard/pool.py

这里关键工作如下:

  • 用 self._Process = self._ctx.Process 设置成为 <class 'billiard.context.ForkProcess'>

  • 根据子进程数量通过 _create_worker_process(i) 建立子进程;

  • 建立 self._worker_handler = self.Supervisor(self);

  • 建立分配任务 TaskHandler;

  • 建立 TimeoutHandler;

  • 建立 ResultHandler;

具体代码如下:

class Pool(object):'''Class which supports an async version of applying functions to arguments.'''def __init__(self, processes=None, initializer=None, initargs=(),..., **kwargs):self._ctx = context or get_context()self._setup_queues()self._taskqueue = Queue()self._cache = {}self._state = RUN.....self.readers = {}self._processes = self.cpu_count() if processes is None else processesself.max_restarts = max_restarts or round(self._processes * 100)self.restart_state = restart_state(max_restarts, max_restart_freq or 1)self._Process = self._ctx.Processself._pool = []self._poolctrl = {}self._on_ready_counters = {}self.putlocks = putlocksself._putlock = semaphore or LaxBoundedSemaphore(self._processes)for i in range(self._processes):self._create_worker_process(i)self._worker_handler = self.Supervisor(self)if threads:self._worker_handler.start()self._task_handler = self.TaskHandler(self._taskqueue,self._quick_put,self._outqueue,self._pool,self._cache)if threads:self._task_handler.start()# Thread killing timedout jobs.if self.enable_timeouts:self._timeout_handler = self.TimeoutHandler(self._pool, self._cache,self.soft_timeout, self.timeout,)self._timeout_handler_mutex = Lock()self._timeout_handler_started = Falseself._start_timeout_handler()# If running without threads, we need to check for timeouts# while waiting for unfinished work at shutdown.if not threads:self.check_timeouts = self._timeout_handler.handle_event# Thread processing results in the outqueue.self._result_handler = self.create_result_handler()self.handle_result_event = self._result_handler.handle_eventif threads:self._result_handler.start()self._terminate = Finalize(self, self._terminate_pool,args=(self._taskqueue, self._inqueue, self._outqueue,self._pool, self._worker_handler, self._task_handler,self._result_handler, self._cache,self._timeout_handler,self._help_stuff_finish_args()),exitpriority=15,)

下面我们具体一一分析。

3.3.1 建立子进程

如下代码建立子进程。

for i in range(self._processes):self._create_worker_process(i)

_create_worker_process 主要工作如下:

  • inq, outq, synq = self.get_process_queues() 拿到的是一个读和写的管道的抽象对象。这个管道是之前预先创建好的(就是上面 self.create_process_queues() 创建的)。主要是给即将 fork 的子进程用的,子进程会监听这管道数据结构抽象实例中的读事件,还可以从写管道写数据。

  • w,也就是 self.WorkerProcess 的实例,其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发;

  • 把 WorkerProcess 的实例记录在 self._pool这个很重要,父进程就是用此变量来知道有哪几个子进程

  • w.start() 中包含具体的 fork 过程;

代码如下:

def _create_worker_process(self, i):sentinel = self._ctx.Event() if self.allow_restart else Noneinq, outq, synq = self.get_process_queues()on_ready_counter = self._ctx.Value('i')w = self.WorkerProcess(self.Worker(inq, outq, synq, self._initializer, self._initargs,self._maxtasksperchild, sentinel, self._on_process_exit,# Need to handle all signals if using the ipc semaphore,# to make sure the semaphore is released.sigprotection=self.threads,wrap_exception=self._wrap_exception,max_memory_per_child=self._max_memory_per_child,on_ready_counter=on_ready_counter,))self._pool.append(w)self._process_register_queues(w, (inq, outq, synq))w.name = w.name.replace('Process', 'PoolWorker')w.daemon = Truew.index = iw.start()self._poolctrl[w.pid] = sentinelself._on_ready_counters[w.pid] = on_ready_counterif self.on_process_up:self.on_process_up(w)return w

因为提到了 self.WorkerProcess(self.Worker…,所以我们分别介绍下 WorkerProcess 与 Worker。

此时逻辑简略如下:

    +----------------+|  StartStopStep |+-------+--------+||   start|v
+-----------+-------------------+
|        BasePool               |
|   celery/concurrency/base.py  |
+-----------+-------------------+||   start|v
+-----------+-------------------+
|        TaskPool               |
| celery/concurrency/prefork.py |
+-----------+-------------------+||  on_start|v
+-----------+--------------------+
|        AsynPool                |
| celery/concurrency/asynpool.py |
+-----------+--------------------+||v+--------+------------+|  class Pool(object) ||   billiard/pool.py  |+--------+------------+|+----+------+|           |v           v                          +----------------------+__init__    _create_worker_process  +--->  | class Worker(object) |+----------------------+
3.3.1.1 子进程工作代码

Worker 是子进程的工作代码。也有几种不同的实现方式,比如:

celery.concurrency.asynpool.Worker,billiard/pool.Worker 都是子进程工作循环

以 billiard/pool.Worker 为例看看。

Worker init 之中主要工作为:配置各种fd。

这里 obj.inqW_fd = self.inq._writer.fileno() 就为从 queues 的对应的 Connection 得到对应的 fd :

class _ConnectionBase(object):_handle = Nonedef fileno(self):"""File descriptor or handle of the connection"""self._check_closed()return self._handle

具体 Worker 定义如下:

class Worker(object):def __init__(self, inq, outq, synq=None, initializer=None, initargs=(),...):......self.max_memory_per_child = max_memory_per_childself._shutdown = sentinelself.inq, self.outq, self.synq = inq, outq, synqself.contribute_to_object(self)def contribute_to_object(self, obj):obj.inq, obj.outq, obj.synq = self.inq, self.outq, self.synqobj.inqW_fd = self.inq._writer.fileno()    # inqueue write fdobj.outqR_fd = self.outq._reader.fileno()  # outqueue read fdif self.synq:obj.synqR_fd = self.synq._reader.fileno()  # synqueue read fdobj.synqW_fd = self.synq._writer.fileno()  # synqueue write fdobj.send_syn_offset = _get_send_offset(self.synq._writer)else:obj.synqR_fd = obj.synqW_fd = obj._send_syn_offset = Noneobj._quick_put = self.inq._writer.sendobj._quick_get = self.outq._reader.recvobj.send_job_offset = _get_send_offset(self.inq._writer)return obj

变量为:

self = {Worker}  initargs = {tuple: 2} (<Celery tasks at 0x7f8a0a70dd30>, )inq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b66aba8>inqW_fd = {int} 7max_memory_per_child = {NoneType} Nonemaxtasks = {NoneType} Noneon_ready_counter = {Synchronized} <Synchronized wrapper for c_int(0)>outq = {_SimpleQueue} <billiard.queues._SimpleQueue object at 0x7f8a0b6844a8>outqR_fd = {int} 8sigprotection = {bool} Falsesynq = {NoneType} NonesynqR_fd = {NoneType} NonesynqW_fd = {NoneType} Nonewrap_exception = {bool} True

AsynPool 简略版逻辑如下:

下图中需要注意:

Worker 是子进程逻辑业务代码,_pool 是进程池,ForkProcess(也就是 WorkerProcess)是子进程抽象,每个子进程抽象都会运行 Worker,这几个逻辑概念一定要分清楚。

+--------------------------+
| AsynPool                 |
|                          |
|                          |
|         ResultHandler  +------->  celery.concurrency.asynpool.ResultHandler
|                          |
|         Supervisor     +------->  billiard.pool.Supervisor
|                          |
|         TaskHandler    +------->  billiard.pool.TaskHandler
|                          |
|         TimeoutHandler +------->  billiard.pool.TimeoutHandler
|                          |
|         Worker         +------->  celery.concurrency.asynpool.Worker
|                          |
|         _pool +-----------------+--->  <ForkProcess(ForkPoolWorker-1, started daemon)>
+--------------------------+      |+--->  <ForkProcess(ForkPoolWorker-2, started daemon)>|+--->  <ForkProcess(ForkPoolWorker-3, started daemon)>|+--->  <ForkProcess(ForkPoolWorker-4, started daemon)>

手机如下

精细版逻辑如下:

    +------------------------------+                                                                     +----------------+| Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |+-------------+--------------+                                            |  _SimpleQueue   |        |                ||                                                           |                 |        |        _write  ||                                                           |      _reader +---------> |        _read   ||                                                           |                 |        |        _send   |1 | instantiate                                               |                 |        |        _recv   ||                                                           |                 |        |        _handle+---> {int} 8  <-+2 on_start       |                                                           |                 |        +----------------+               ||                                                           |      _poll   +--------->  _ConnectionBase.poll            |
+-------------+   |                                                           |                 |                                         |
|             |   |                                                           |                 |        +----------------+               |
|             |   v                                                           |      _writer +---------> | Connection     |               |
|         +---+---+-----------+                                               |                 |        |                |               |
|         |    TaskPool       |                                               +-------+---------+        |       _handle+----> {int} 7    |
|         |                   |      +------+                                         ^                  |                |               |
|         |       app   +----------> |celery|                                         |                  +----------------+      ^        |
|         |                   |      +------+                                         |                                          |        |
|         |                   |                                                       +                                          |        |
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|         |      _pool  +----------> | AsynPool                 |     |                                                          |        |
|         |                   |      |                          |     |                                                          |        |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|             ^                      |                          |     |                                                          |        |
|             |                      |                          |     |                                                          |        |
|             |                      |                          |     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
+-------------+                      |                          |     |                                                          |        ||                          |     |                                                          |        |+--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        ||        |+----------------------+                                    |        ||                      |                                    |        || Worker     inq       |                                    |        ||                      |                                    |        ||            outq      |                                    |        ||                      |                                    |        ||            synq      |                                    |        ||                      |                                    |        ||         inqW_fd +-----------------------------------------+        ||                      |                                             ||         outqR_fd  +------------------------------------------------+|                      ||         workloop     ||                      ||        after_fork    ||                      |+----------------------+

手机如下:

3.3.1.2 子进程抽象封装 — WorkerProcess

WorkerProcess 其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发

WorkerProcess 的作用为封装了 ForkProcess。ForkProcess定义如下:

class ForkProcess(process.BaseProcess):_start_method = 'fork'@staticmethoddef _Popen(process_obj):from .popen_fork import Popenreturn Popen(process_obj)
3.3.1.2.1 WorkerProcess 具体执行

WorkerProcess 具体执行为:

def WorkerProcess(self, worker):worker = super().WorkerProcess(worker)worker.dead = Falsereturn worker

首先执行基类中的代码,因此最终返回 ForkProcess:

def Process(self, *args, **kwds):return self._Process(*args, **kwds)def WorkerProcess(self, worker):return worker.contribute_to_object(self.Process(target=worker))

在 self._Process(*args, **kwds) 调用中,相关变量为:

self._Process = {type} <class 'billiard.context.ForkProcess'>
args = {tuple: 0} ()
kwds = {dict: 1} {'target': <celery.concurrency.asynpool.Worker object at 0x7f9c306326a0>}
self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9c30604da0>

于是 调用到 ForkProcess(process.BaseProcess) 基类。

3.3.1.2.2 基类 BaseProcess

BaseProcess 基类如下,注意这里 run 就是子进程的 loop_target 就是 子进程的 运行代码

 _target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>

定义如下:

class BaseProcess(object):'''Process objects represent activity that is run in a separate processThe class is analagous to `threading.Thread`'''def __init__(self, group=None, target=None, name=None,args=(), kwargs={}, daemon=None, **_kw):count = next(_process_counter)self._identity = _current_process._identity + (count, )self._config = _current_process._config.copy()self._parent_pid = os.getpid()self._popen = Noneself._target = targetself._args = tuple(args)self._kwargs = dict(kwargs)self._name = (name or type(self).__name__ + '-' +':'.join(str(i) for i in self._identity))if daemon is not None:self.daemon = daemonif _dangling is not None:_dangling.add(self)self._controlled_termination = Falsedef run(self):'''Method to be run in sub-process; can be overridden in sub-class'''if self._target:self._target(*self._args, **self._kwargs)

基类处理完,于是得到 ForkProcess

self = {ForkProcess} <ForkProcess(ForkProcess-1, initial)>authkey = {AuthenticationString: 32} b''daemon = {bool} Falseexitcode = {NoneType} Noneident = {NoneType} Nonename = {str} 'ForkProcess-1'pid = {NoneType} None_args = {tuple: 0} ()_authkey = {AuthenticationString: 32} _children = {set: 0} set()_config = {dict: 2} {'authkey': b'', 'semprefix': '/mp'}_counter = {count} count(2)_daemonic = {bool} False_identity = {tuple: 1} 1_kwargs = {dict: 0} {}_name = {str} 'ForkProcess-1'_parent_pid = {int} 14747_popen = {NoneType} None_start_method = {str} 'fork'_target = {Worker} <celery.concurrency.asynpool.Worker object at 0x7f9ad358b240>_tempdir = {NoneType} None
3.3.1.2.3 加入进程列表

生成子进程之后,self._pool.append(w) 的作用就是把子进程 加入 父进程之中 的 子进程列表。并且配置 queues。

def _create_worker_process(self, i):sentinel = self._ctx.Event() if self.allow_restart else Noneinq, outq, synq = self.get_process_queues()on_ready_counter = self._ctx.Value('i')w = self.WorkerProcess(self.Worker(inq, outq, synq, self._initializer, self._initargs,self._maxtasksperchild, sentinel, self._on_process_exit,# Need to handle all signals if using the ipc semaphore,# to make sure the semaphore is released.sigprotection=self.threads,wrap_exception=self._wrap_exception,max_memory_per_child=self._max_memory_per_child,on_ready_counter=on_ready_counter,))self._pool.append(w) # 运行到了这里。self._process_register_queues(w, (inq, outq, synq)) #到了这里

此时变量如下:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>Supervisor = {type} <class 'billiard.pool.Supervisor'>TaskHandler = {type} <class 'billiard.pool.TaskHandler'>TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>Worker = {type} <class 'celery.concurrency.asynpool.Worker'>......outbound_buffer = {deque: 0} deque([])readers = {dict: 0} {}restart_state = {restart_state} <billiard.common.restart_state object at 0x7f9ad3668e80>sched_strategy = {int} 4timers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7f9ad36680f0>>: 5.0}write_stats = {Counter: 0} Counter()_Process = {type} <class 'billiard.context.ForkProcess'>_active_writers = {set: 0} set()_active_writes = {set: 0} set()_all_inqueues = {set: 0} set()_busy_workers = {set: 0} set()_cache = {dict: 0} {}_ctx = {ForkContext} <billiard.context.ForkContext object at 0x7f9ad27ad828>_fileno_to_inq = {dict: 0} {}_fileno_to_outq = {dict: 0} {}_fileno_to_synq = {dict: 0} {}_initargs = {tuple: 2} (<Celery myTest at 0x7f9ad270c128>, 'celery@me2koreademini')_inqueue = {NoneType} None_max_memory_per_child = {NoneType} None_maxtasksperchild = {NoneType} None_on_ready_counters = {dict: 0} {}_outqueue = {NoneType} None_poll_result = {NoneType} None_pool = {list: 1} [<ForkProcess(ForkPoolWorker-1, initial daemon)>]_poolctrl = {dict: 0} {}_proc_alive_timeout = {float} 4.0_processes = {int} 4_putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7f9ad354db70 value:4 waiting:0>_queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7f9ad35acef0>, <billiard.queues._SimpleQueue object at 0x7f9ad3668160>, None): <ForkProcess(ForkPoolWorker-1, initial daemon)>, (<billiard.queues._SimpleQueue object at 0x7f9ad36684a8>, <billiard.queues._SimpleQu_quick_get = {NoneType} None_quick_put = {NoneType} None_state = {int} 0_taskqueue = {Queue} <queue.Queue object at 0x7f9ad2a30908>_waiting_to_start = {set: 0} set()_wrap_exception = {bool} True
sentinel = {NoneType} None
synq = {NoneType} None
3.3.1.3 fork 过程

w.start() 中包含具体的 fork 过程。

def _create_worker_process(self, i):sentinel = self._ctx.Event() if self.allow_restart else Noneinq, outq, synq = self.get_process_queues()on_ready_counter = self._ctx.Value('i')w = self.WorkerProcess(self.Worker(inq, outq, synq, self._initializer, self._initargs,self._maxtasksperchild, sentinel, self._on_process_exit,# Need to handle all signals if using the ipc semaphore,# to make sure the semaphore is released.sigprotection=self.threads,wrap_exception=self._wrap_exception,max_memory_per_child=self._max_memory_per_child,on_ready_counter=on_ready_counter,))self._pool.append(w)self._process_register_queues(w, (inq, outq, synq))w.name = w.name.replace('Process', 'PoolWorker')w.daemon = Truew.index = iw.start() # 此时到了这里,将要进行 fork。self._poolctrl[w.pid] = sentinelself._on_ready_counters[w.pid] = on_ready_counterif self.on_process_up:self.on_process_up(w)return w

具体代码如下:

class BaseProcess(object):'''Process objects represent activity that is run in a separate processThe class is analagous to `threading.Thread`'''def start(self):'''Start child process'''assert self._popen is None, 'cannot start a process twice'assert self._parent_pid == os.getpid(), \'can only start a process object created by current process'_cleanup()self._popen = self._Popen(self)self._sentinel = self._popen.sentinel_children.add(self)

其中主要是 self._popen = self._Popen(self) 比较重要,我们看下 Popen 的源码 _launch:

class ForkProcess(process.BaseProcess):_start_method = 'fork'@staticmethoddef _Popen(process_obj):from .popen_fork import Popenreturn Popen(process_obj)

代码在:/billiard/popen_fork.py。

看到这里我们应该明白了。在执行 launch 方法的时候,会使用 os.fork() 派生出一个子进程,并且使用 ps.pipe() 创建出一对读写的管道,之后通过比较 [self.pid] 是否为 0,从而在主进程和子进程中执行不同的逻辑:

  • 子进程关闭 读 管道,之后执行 process_obj._bootstrap() 方法。
  • 父进程关闭 写管道,并且记录读管道的 fd。
class Popen(object):method = 'fork'sentinel = Nonedef __init__(self, process_obj):sys.stdout.flush()sys.stderr.flush()self.returncode = Noneself._launch(process_obj)def duplicate_for_child(self, fd):return fddef poll(self, flag=os.WNOHANG):if self.returncode is None:while True:try:pid, sts = os.waitpid(self.pid, flag)except OSError as e:if e.errno == errno.EINTR:continue# Child process not yet created. See #1731717# e.errno == errno.ECHILD == 10return Noneelse:breakif pid == self.pid:if os.WIFSIGNALED(sts):self.returncode = -os.WTERMSIG(sts)else:assert os.WIFEXITED(sts)self.returncode = os.WEXITSTATUS(sts)return self.returncodedef _launch(self, process_obj):code = 1parent_r, child_w = os.pipe()self.pid = os.fork()if self.pid == 0:try:os.close(parent_r)if 'random' in sys.modules:import randomrandom.seed()code = process_obj._bootstrap()finally:os._exit(code)else:os.close(child_w)self.sentinel = parent_r

3.4.2 辅助管理 Supervisor

Supervisor 类会定期对线程池进行维护,比如是否需要动态缩放。

class Supervisor(PoolThread):def __init__(self, pool):self.pool = poolsuper(Supervisor, self).__init__()def body(self):time.sleep(0.8)pool = self.pooltry:# do a burst at startup to verify that we can start# our pool processes, and in that time we lower# the max restart frequency.prev_state = pool.restart_statepool.restart_state = restart_state(10 * pool._processes, 1)for _ in range(10):if self._state == RUN and pool._state == RUN:pool._maintain_pool()time.sleep(0.1)# Keep maintaing workers until the cache gets drained, unless# the pool is termiantedpool.restart_state = prev_statewhile self._state == RUN and pool._state == RUN:pool._maintain_pool()time.sleep(0.8)except RestartFreqExceeded:pool.close()pool.join()raise

3.3.3 给子进程分配任务 ---- TaskHandler

这个类是负责具体业务,即在这里把任务消息从父进程传递给子进程

之前建立 TaskHandler 中,重要点就是

  • self._taskqueue 传递进来,这样以后就通过这个来传递任务消息,这个_taskqueue 就是简单的数据结构应用,用来在Celery Consumer worker 和 pool 之间做消息缓冲。

  • self._quick_put 传递进来,赋值给了 put,即 put 指向了 self._inqueue.put

  • 这样 TaskHandler 就通过 put(task) 这个来给 父子进程之前的 管道 _inqueue 发送消息。就是说,TaskHandler 内部,如果 父进程 接到消息,就 通过 self._inqueue.put 这个管道的函数 给 自己 的 子进程发消息self._taskqueue 就是一个中间变量而已。

此时 各种 queue 的来源是:

self._taskqueue = Queue()def _setup_queues(self):self._inqueue = Queue()self._outqueue = Queue()self._quick_put = self._inqueue.putself._quick_get = self._outqueue.getself._task_handler = self.TaskHandler(self._taskqueue,self._quick_put,self._outqueue,self._pool,self._cache)

所以初始化时候变量为:

outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001B55131AE88>pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>]put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001B55131AF08>>self = {TaskHandler} Unable to get repr for <class 'billiard.pool.TaskHandler'>
taskqueue = {Queue} <queue.Queue object at 0x000001B551334308>

TaskHandler简略版代码如下:

class TaskHandler(PoolThread):def __init__(self, taskqueue, put, outqueue, pool, cache):self.taskqueue = taskqueueself.put = putself.outqueue = outqueueself.pool = poolself.cache = cachesuper(TaskHandler, self).__init__()def body(self):cache = self.cachetaskqueue = self.taskqueueput = self.putfor taskseq, set_length in iter(taskqueue.get, None):task = Nonei = -1try:for i, task in enumerate(taskseq):if self._state:breakput(task)else:if set_length:set_length(i + 1)continuebreakself.tell_others()def tell_others(self):outqueue = self.outqueueput = self.putpool = self.pooltry:# tell result handler to finish when cache is emptyoutqueue.put(None)# tell workers there is no more workfor p in pool:put(None)def on_stop_not_started(self):self.tell_others()

此时逻辑为:

注意:这里图中的 Worker scope 是 celery/apps/worker.py,属于 Celery 之中逻辑范畴,不是子进程相关概念(下面各图 同)。Celery 中有多个同名类,这点很让人纠结

                           +Consumer               |message |v         strategy  +------------------------------------++------------+------+            | strategies                         || on_task_received  | <--------+ |                                    ||                   |            |[myTest.add : task_message_handler] |+------------+------+            +------------------------------------+||+------------------------------------------------------------------------------------+strategy                |||v                Request [myTest.add]+------------+-------------+                       +---------------------+| task_message_handler     | <-------------------+ | create_request_cls  ||                          |                       |                     |+------------+-------------+                       +---------------------+| _process_task_sem|+------------------------------------------------------------------------------------+Worker                  | req[{Request} myTest.add]v+--------+-----------+| WorkController     ||                    ||            pool +-------------------------++--------+-----------+                      ||                                  ||               apply_async        v+-----------+----------+                   +---+-------------------+|{Request} myTest.add  | +---------------> | TaskPool              |+----------------------+                   +----+------------------+myTest.add           ||
+--------------------------------------------------------------------------------------+|v+----+------------------+| billiard.pool.Pool    |+-------+---------------+||Pool              +---------------------------+                  || TaskHandler               |                  ||                           |                  |  self._taskqueue.put|              _taskqueue   |  <---------------+|                           |+---------------------------+

手机如下:

3.4.3 处理子进程返回 — ResultHandler

父进程 使用 ResultHandler 用来处理子进程的运行返回。

def create_result_handler(self):return super().create_result_handler(fileno_to_outq=self._fileno_to_outq,on_process_alive=self.on_process_alive,)class ResultHandler(_pool.ResultHandler):"""Handles messages from the pool processes."""def __init__(self, *args, **kwargs):self.fileno_to_outq = kwargs.pop('fileno_to_outq')self.on_process_alive = kwargs.pop('on_process_alive')super().__init__(*args, **kwargs)# add our custom message handlerself.state_handlers[WORKER_UP] = self.on_process_alive

具体变量如下:

ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>daemon = {property} <property object at 0x7f847454d638>fdel = {NoneType} Noneexitcode = {property} <property object at 0x7f8475c9e8b8>fdel = {NoneType} Nonefset = {NoneType} Noneident = {property} <property object at 0x7f847454d4f8>fdel = {NoneType} Nonefset = {NoneType} Nonename = {property} <property object at 0x7f847454d598>fdel = {NoneType} None_initialized = {bool} False

具体代码如下 ,可以看到使用 poll 阻塞等待消息。

    def _process_result(self, timeout=1.0):poll = self.pollon_state_change = self.on_state_changewhile 1:try:ready, task = poll(timeout)if ready:on_state_change(task)if timeout != 0:  # blockingbreakelse:breakyielddef handle_event(self, fileno=None, events=None):if self._state == RUN:if self._it is None:self._it = self._process_result(0)  # non-blockingtry:next(self._it)except (StopIteration, CoroStop):self._it = None

具体 poll 对应于 _poll_result,就是 self._outqueue._reader.poll(timeout)。

可见其阻塞在 outqueue上,就是子进程的管道外发接口。

def _setup_queues(self):self._inqueue = self._ctx.SimpleQueue()self._outqueue = self._ctx.SimpleQueue()self._quick_put = self._inqueue._writer.sendself._quick_get = self._outqueue._reader.recvdef _poll_result(timeout):if self._outqueue._reader.poll(timeout):return True, self._quick_get()return False, Noneself._poll_result = _poll_result

所以此时逻辑如下:

                           +Consumer               |message |v         strategy  +------------------------------------++------------+------+            | strategies                         || on_task_received  | <--------+ |                                    ||                   |            |[myTest.add : task_message_handler] |+------------+------+            +------------------------------------+||+------------------------------------------------------------------------------------+strategy                |||v                Request [myTest.add]+------------+-------------+                       +---------------------+| task_message_handler     | <-------------------+ | create_request_cls  ||                          |                       |                     |+------------+-------------+                       +---------------------+| _process_task_sem|+------------------------------------------------------------------------------------+Worker                  | req[{Request} myTest.add]v+--------+-----------+| WorkController     ||                    ||            pool +-------------------------++--------+-----------+                      ||                                  ||               apply_async        v+-----------+----------+                   +---+-------------------+|{Request} myTest.add  | +---------------> | TaskPool              |+----------------------+                   +----+------------------+myTest.add           ||
+--------------------------------------------------------------------------------------+|v+----+------------------+| billiard.pool.Pool    |+-------+---------------+||Pool              +---------------------------+                  || TaskHandler               |                  ||                           |                  |  self._taskqueue.put|              _taskqueue   |  <---------------+|                           |+------------+--------------+||  put(task)||                       +------------------+|                       |  ResultHandler   ||                       +------------------+||                                 ^|                                 ||                                 |
+--------------------------------------------------------------------------------------+|                                 |Sub process                    |                                 |v                                 +self._inqueue                   self._outqueue

手机如下:

3.5 配置file 到 queue 的关系

最后,根据建立子进程结果,配置file 到 queue 的关系。

可以看出来,这里配置了outq 和 synq 的关系,即这些 queue 指向哪一个 子进程。

代码如下:

class AsynPool(_pool.Pool):"""AsyncIO Pool (no threads)."""def __init__(self, processes=None, synack=False,sched_strategy=None, proc_alive_timeout=None,*args, **kwargs):......super().__init__(processes, *args, **kwargs)for proc in self._pool:# create initial mappings, these will be updated# as processes are recycled, or found lost elsewhere.self._fileno_to_outq[proc.outqR_fd] = procself._fileno_to_synq[proc.synqW_fd] = proc

配置完成 fd 之后,为:

self._fileno_to_outq = {dict: 4} 8 = {ForkProcess} <ForkProcess(ForkPoolWorker-1, started daemon)>12 = {ForkProcess} <ForkProcess(ForkPoolWorker-2, started daemon)>16 = {ForkProcess} <ForkProcess(ForkPoolWorker-3, started daemon)>20 = {ForkProcess} <ForkProcess(ForkPoolWorker-4, started daemon)>__len__ = {int} 4self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}

3.6 AsynPool 总体结果

最终 AsynPool 的结果如下,我们可以看到内部各种变量,大家可以对应前文进行理解:

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>ResultHandler = {type} <class 'celery.concurrency.asynpool.ResultHandler'>SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>Supervisor = {type} <class 'billiard.pool.Supervisor'>TaskHandler = {type} <class 'billiard.pool.TaskHandler'>TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>Worker = {type} <class 'celery.concurrency.asynpool.Worker'>allow_restart = {bool} Falseenable_timeouts = {bool} Truelost_worker_timeout = {float} 10.0max_restarts = {int} 100on_process_down = {NoneType} Noneon_process_up = {NoneType} Noneon_timeout_cancel = {NoneType} Noneon_timeout_set = {NoneType} Noneoutbound_buffer = {deque: 0} deque([])process_sentinels = {list: 4} [25, 27, 29, 31]putlocks = {bool} Falsereaders = {dict: 0} {}restart_state = {restart_state} <billiard.common.restart_state object at 0x7fe44f6644a8>sched_strategy = {int} 4soft_timeout = {NoneType} Nonesynack = {bool} Falsethreads = {bool} Falsetimeout = {NoneType} Nonetimers = {dict: 1} {<bound method Pool.maintain_pool of <celery.concurrency.asynpool.AsynPool object at 0x7fe44f664128>>: 5.0}write_stats = {Counter: 0} Counter()_Process = {type} <class 'billiard.context.ForkProcess'>_active_writers = {set: 0} set()_active_writes = {set: 0} set()_all_inqueues = {set: 0} set()_busy_workers = {set: 0} set()_cache = {dict: 0} {}_ctx = {ForkContext} <billiard.context.ForkContext object at 0x7fe44e7ac7f0>_fileno_to_inq = {dict: 0} {}_fileno_to_outq = {dict: 4} {8: <ForkProcess(ForkPoolWorker-1, started daemon)>, 12: <ForkProcess(ForkPoolWorker-2, started daemon)>, 16: <ForkProcess(ForkPoolWorker-3, started daemon)>, 20: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}_fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>}_initargs = {tuple: 2} (<Celery myTest at 0x7fe44e61cb38>, 'celery@me2koreademini')_inqueue = {NoneType} None_max_memory_per_child = {NoneType} None_maxtasksperchild = {NoneType} None_on_ready_counters = {dict: 4} {14802: <Synchronized wrapper for c_int(0)>, 14803: <Synchronized wrapper for c_int(0)>, 14804: <Synchronized wrapper for c_int(0)>, 14806: <Synchronized wrapper for c_int(0)>}_outqueue = {NoneType} None_poll_result = {NoneType} None_pool = {list: 4} [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkProcess(ForkPoolWorker-4, stopped[SIGABRT] daemon)>]_poolctrl = {dict: 4} {14802: None, 14803: None, 14804: None, 14806: None}_proc_alive_timeout = {float} 4.0_processes = {int} 4_putlock = {LaxBoundedSemaphore} <LaxBoundedSemaphore at 0x7fe44f54bf98 value:4 waiting:0>_queues = {dict: 4} {(<billiard.queues._SimpleQueue object at 0x7fe44f664160>, <billiard.queues._SimpleQueue object at 0x7fe44f664240>, None): <ForkProcess(ForkPoolWorker-1, started daemon)>, (<billiard.queues._SimpleQueue object at 0x7fe44f664550>, <billiard.queues._SimpleQu_quick_get = {NoneType} None_quick_put = {NoneType} None_result_handler = {ResultHandler} <ResultHandler(Thread-170, initial daemon)>_state = {int} 0_task_handler = {TaskHandler} <TaskHandler(Thread-168, initial daemon)>_taskqueue = {Queue} <queue.Queue object at 0x7fe44f664978>_terminate = {Finalize} <Finalize object, callback=_terminate_pool, args=(<queue.Queue object at 0x7fe44f664978>, None, None, [<ForkProcess(ForkPoolWorker-1, started daemon)>, <ForkProcess(ForkPoolWorker-2, started daemon)>, <ForkProcess(ForkPoolWorker-3, started daemon)>, <ForkP_timeout_handler = {TimeoutHandler} <TimeoutHandler(Thread-169, initial daemon)>_timeout_handler_mutex = {DummyLock} <kombu.asynchronous.semaphore.DummyLock object at 0x7fe44f6cb7b8>_timeout_handler_started = {bool} False_waiting_to_start = {set: 0} set()_worker_handler = {Supervisor} <Supervisor(Thread-151, initial daemon)>_wrap_exception = {bool} True

因此,本文最终图如下,其中 worker 就是子进程工作代码,ForkProcess 是子进程抽象(这里只展示出一个)

    +------------------------------+                                                                     +----------------+| Pool(bootsteps.StartStopStep)|                                          +-----------------+        | Connection     |+-------------+--------------+                                            |  _SimpleQueue   |        |                ||                                                           |                 |        |        _write  ||                                                           |      _reader +---------> |        _read   ||                                                           |                 |        |        _send   |1 | instantiate                                               |                 |        |        _recv   ||                                                           |                 |        |        _handle+---> {int} 8  <-+2 on_start       |                                                           |                 |        +----------------+               ||                                                           |      _poll   +--------->  _ConnectionBase.poll            |
+-------------+   |                                                           |                 |                                         |
|             |   |                                                           |                 |        +----------------+               |
|             |   v                                                           |      _writer +---------> | Connection     |               |
|         +---+---+-----------+                                               |                 |        |                |               |
|         |    TaskPool       |                                               +-------+---------+        |       _handle+----> {int} 7    |
|         |                   |      +------+                                         ^                  |                |               |
|         |       app   +----------> |celery|                                         |                  +----------------+      ^        |
|         |                   |      +------+                                         |                                          |        |
|         |                   |                                                       +                                          |        |
|         |                   |      +--------------------------+     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|         |      _pool  +----------> | AsynPool                 |     |                                                          |        |
|         |                   |      |                          |     |                                                          |        |
|         +---+---------------+      |               _queues +------->----->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
|             ^                      |                          |     |                                                          |        |
|             |                      |          _fileno_to_inq  |     |                                                          |        |
|             |                      |                          |     +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        |
+-------------+                      |         _fileno_to_outq +--+   |                                                          |        ||                          | |   |                                                          |        ||          _queues[queues] | |   +---->  (<_SimpleQueue>, <_SimpleQueue>)                   |        ||                       +  | |                                                              |        ||               _pool   |  | |  +----------------------+                                    |        ||                +      |  | |  |                      |                                    |        |+--------------------------+ |  | Worker     inq       |                                    |        ||      |    |  |                      |                                    |        ||      |    |  |            outq      |                                    |        |2.1 append(w)  |      |    |  |                      |                                    |        ||      |    |  |            synq      |                                    |        |v      |    |  |                      |                                    |        |+-------------+--+   |    |  |         inqW_fd +-----------------------------------------+        ||                | <-+    |  |                      |                                             ||  ForkProcess   |        |  |         outqR_fd  +------------------------------------------------+|                | <------+  |                      ||                |           |         workloop     ||     _target +------------> |                      ||                |           |        after_fork    ||                |           |                      |+----------------+           +----------------------+

手机如下:

0xFF 参考

Celery 源码学习(二)多进程模型

celery源码分析-worker初始化分析(下)

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。

[源码解析] 并行分布式任务队列 Celery 之 多进程架构和模型相关推荐

  1. [源码解析] PyTorch 分布式(2) ----- DataParallel(上)

    [源码解析] PyTorch 分布式(2) ----- DataParallel(上) 文章目录 [源码解析] PyTorch 分布式(2) ----- DataParallel(上) 0x00 摘要 ...

  2. [源码解析] PyTorch分布式优化器(1)----基石篇

    [源码解析] PyTorch分布式优化器(1)----基石篇 文章目录 [源码解析] PyTorch分布式优化器(1)----基石篇 0x00 摘要 0x01 从问题出发 1.1 示例 1.2 问题点 ...

  3. [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算

    [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算 文章目录 [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算 0x1. ...

  4. [源码解析] TensorFlow 分布式之 ClusterCoordinator

    [源码解析] TensorFlow 分布式之 ClusterCoordinator 文章目录 [源码解析] TensorFlow 分布式之 ClusterCoordinator 1. 思路 1.1 使 ...

  5. [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构

    [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构 文章目录 [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构 0x00 摘要 0x01使用 1.1 ...

  6. jQuery源码解析对象实例化与jQuery原型及整体构建模型分析(一)

    //源码剖析都基于jQuery-2.0.3版本,主要考虑到兼容IE 一.关于jQuery对象实例化的逻辑: 整个jQuery程序被包裹在一个匿名自执行行数内: (function(window,und ...

  7. EOS 之demux源码解析

    EOS 之demux源码解析 Demux从Facebook的Flux Architecture架构模式和Redux(JavaScript程序的状态容器,即应用数据流框架)中汲取灵感,创建了一个后端基础 ...

  8. [源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

    [源码解析] 模型并行分布式训练 Megatron (4) - 如何设置各种并行 文章目录 [源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行 0x00 摘要 0x0 ...

  9. [源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

    [源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush 文章目录 [源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush 0 ...

最新文章

  1. 如何理解机器学习中的嵌入 (Embeddings)?
  2. 算法与数据结构(归并排序)
  3. 全局描述符表(GDT)——《x86汇编语言:从实模式到保护模式》读书笔记09
  4. 怎么用计算机发出音乐声,解决方案:计算机技巧-如何使显示器的内置扬声器发出声音...
  5. redis的源码编译安装+发布订阅+RDB持久化
  6. SAP License:SAP顾问食品行业概述
  7. css如何设置文本垂直居中显示,css中怎么设置文本居中?css文本垂直居中的设置方法...
  8. win10树莓派改ip_在树莓派2上安装 Windows 10
  9. 浏览器Quirksmode(怪异模式)与CSScompat
  10. [?]Solaris下两个Oracle同时启动时遇到的问题
  11. 电机学重读(一)基础知识
  12. 5款Windows 界面原型设计工具
  13. 3.关于运动控制芯片
  14. 详解内存SDRAM原理(P-Bank、L-Bank、刷新、预充电等)
  15. nginx的多域http、https同时访问配置及http重定向https
  16. Python-10.1-面向对象
  17. office使用技巧大全
  18. STM32之温湿度DHT11驱动
  19. Java写单机版五子棋
  20. c语言数字转化为字母表,c语言字符串 数字转换函数大全

热门文章

  1. 用python画雨滴_Python使用Matplotlib实现雨点图动画效果的方法
  2. 说他是人像拍照大师,可有意见?
  3. 香港中文大学纯航拍多图51张(航拍深圳第3集)
  4. 瑞昱Realtek显示类芯片-简介(上篇)
  5. 爱奇艺会员哪里买便宜,什么时候最便宜
  6. 蓝桥-BASIC-11 十六进制转十进制
  7. 河北大学计算机复试题,河北大学复试形式复试经验分享
  8. try块的三种组合方式
  9. 精品:淘宝/天猫获取购买到的商品订单详情 API
  10. 安卓手机数据恢复软件-DiskDigger Pro