Celery 应用:Application

Celery 在使用前必须要进行实例化,实例化之后被称为应用程序(简称 app)。 应用程序是线程安全的,可以配置多个不同配置、组件、任务的 Celery 程序在同一个进程中运行。 创建一个实例:

from celery import Celery
app = Celery()
app
<Celery __main__ at 0x250bbc65eb8>

最后一行显示的内容包含 app 类的 Celery 名称,当前主模块 (main) 以及对象的内存地址(0x250bbc65eb8)。

celery的主名称:Main Name

其中主模块的名称是比较重要的。 使用 Celery 进行发送任务消息时,消息内容不会含有源代码信息,只有执行任务的名称。这与主机名在互联网上的工作类似:每一个职程(Worker)通过任务名称与实际的功能进行对应,这样的方式被称为 任务注册 。 每当定义任务时,该任务也将添加到本地注册表中:

@app.task
... def add(x, y):
...     return x + y
...
add
<@task: __main__.add of __main__ at 0x250bbc65eb8>
add.name
'__main__.add'
app.tasks['__main__.add']
<@task: __main__.add of __main__ at 0x250bbc65eb8>

在上面的实例中 __main__再次出现,这是因为 Celery 无法检测到该函数属于哪个模块时,自动会使用主模块的名称作为任务名称的开头。 这只是一组有限实例中的问题:

  • 如果在其中定义任务的模块作为程序运行。

  • 如果应用程序是在python shell(repl)中创建的。

例如,这里的 tasks 模块还用于用 app.worker_main() 启动一个职程(Worker):

tasks.py

from celery import Celery
app = Celery()@app.task
def add(x, y): return x + yif __name__ == '__main__':app.worker_main()

当模块执行时,任务名以 __main__ 开头,被另外一个模块导入时,例如调用一个任务时,任务的名称将以 tasks (模块真实的名称)进行开头命名:

>>> from tasks import add>>> add.nametasks.add

也可以设置主模块的名称:

>>> app = Celery('tasks')>>> app.main'tasks'>>> @app.task... def add(x, y):... return x + y>>> add.nametasks.add

celery配置:Configuration

您可以设置几个选项来更改 Celery 的工作方式。这些选项可以直接在app实例上设置,也可以使用专用的配置模块。

你可以设置几个选项来改变 Celery 的工作方式。这些选项可以直接在应用程序实例上设置,也可以使用专用的配置模块。

可以通过配置几个选项来进行配置 Celery,该配置通过直接在程序实例中配置,也可以通过专用的配置模块进行配置。 配置如下 app.conf

>>> app.conf.timezone
'Europe/London'

也可以直接设置对应的值:

>>> app.conf.enable_utc = True

也可以通过 update 进行一次设置多个键值:

>>> app.conf.update(... enable_utc=True,... timezone='Europe/London',...)

配置对象由多个字典组成,按以下顺序进行查询:

  1. 在运行时所做的更改。

  2. 配置模块(如果有的话)

  3. 默认配置(celery.app.defaults) 也通过 app.add_defaults() 进行配置新的默认配置。

config_from_object

config_from_object() 可以从配置对象中进行加载配置。

加载的内容可以为配置模块、也可以为用于属性的对象。

注意config_from_object() 在进行调用时会恢复默认的配置,如果需要设置其他的配置,建议在调用完毕之后进行操作。

案例1:使用模块名称

app.config_from_object() 可以从一个 python 模块中(包含属性名称)进行加载,例如:celeryconfigmyproj.config.celerymyproj.config:CeleryConfig

from celery import Celeryapp = Celery()app.config_from_object('celeryconfig')

其中 celeryconfig 模块内容如下:

celeryconfig.pyenable_utc = Truetimezone = 'Europe/London'

导入 celeryconfig 程序就可以应用。

案例2:配置模块对象

可以配置模块对象,但不建议这样用。

import celeryconfigfrom celery import Celeryapp = Celery()app.config_from_object(celeryconfig)

案例3:使用配置类/对象

from celery import Celeryapp = Celery()class Config:enable_utc = Truetimezone = 'Europe/London'app.config_from_object(Config)# or using the fully qualified name of the object:# app.config_from_object('module:Config')

config_from_envvar

app.config_from_envvar() 可以从环境变量获取信息进行配置, 例如,从名称为 CELERY_CONFIG_MODULE 的环境变量中加载配置:

import osfrom celery import Celery#: Set default configuration module nameos.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')app = Celery()app.config_from_envvar('CELERY_CONFIG_MODULE')

然后通过指定的环境变量进行配置使用的配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

celery过滤配置:Censored configuration

如果您希望将配置作为调试信息或类似信息打印出来,那么您也可能希望过滤掉敏感信息,如密码和API密钥。 Celery 提供了集中打印配置信息工具,其中一个为 humanize()

>>> app.conf.humanize(with_defaults=False, censored=True)

该方法将配置信息转换为列表字符串返回,默认情况下,仅包含修改的键值,可以通过 with_defaults 参数进行包含默认的配置信息。 可以通过 table() 方法将返回结果转换为字典:

>>> app.conf.table(with_defaults=False, censored=True)

注意:Celery 不会删除所有的敏感配置信息,通过正则表达式来进行检索通常命名的信息,如果包含敏感信息的自定义配置,Celery 会标识为机密的名称来下进行命名秘钥。

如果命名中含有子字符串,将会进行过滤:

APITOKENKEYSECRETPASSSIGNATUREDATABASE

懒加载:Laziness

程序是懒加载的,在没有实际使用的情况下,是不会进行加载的。 创建一个 Celery 程序的流程如下:

  • 创建用于事件的逻辑时钟实例

  • 创建任务注册表

  • 将自身设置为当前应用程序(如果禁用 set_as_current 参数则不会)

  • 调用 app.on_init() 回调函数(默认情况下不执行任何操作)

    app.task() 装饰器不会在定义任务时创建任务,创建任务通常在使用该任务或应用程序完成后进行创建。

该实例会显示,在使用任务或访问属性之前,是如何创建任务的:

>>> @app.task>>> def add(x, y):... return x + y​>>> type(add)<class 'celery.local.PromiseProxy'>​>>> add.__evaluated__()False​>>> add # <-- causes repr(add) to happen<@task: __main__.add>​>>> add.__evaluated__()True

应用程序的终结可以通过调用 app.finalize() 显式执行,也可以通过访问 app.tasks 属性隐式执行。 完成创建对象将:

  • 复制必须在应用之间共享的任务

    默认情况下共享任务,如果装饰器的 shared 参数,该任务为私有任务。

  • 评估所有待处理的任务装饰器

  • 确保当前所有的人呢我都已经绑定到当前应用程序

    任务绑带到应用程序,便于从配置中获取配置信息。

打破链式操作:Breaking the chain

虽然可以依赖于当前设置的应用程序,但最佳做法是始终将应用程序实例传递给需要它的任何内容。 通常将这种做法称为 app chain,根据传递的应用程序创建一系列实例。 下面的这个实例实不可取的:

from celery import current_appclass Scheduler(object):def run(self):app = current_app

应该将 app 作为参数进行传递:

class Scheduler(object):​
def __init__(self, app):self.app = app

在celery内部实现中,使用 celery.app_or_default() 函数使得模块级别的 API 也能正常使用:

from celery.app import app_or_default​class Scheduler(object):def __init__(self, app=None):self.app = app_or_default(app)

在开发环境中,可以通过设置 CELERY_TRACE_APP 环境变量在应用实例链被打破时抛出一个异常:

$ CELERY_TRACE_APP=1 celery worker -l info

抽象任务:Abstract Tasks

使用 task() 装饰器创建的所有任务都将从应用程序继承 Task 类。 也可以通过 base 参数进行指定基类:

@app.task(base=OtherTask):def add(x, y):return x + y

创建自定义任务类,需要继承 celery.Task

from celery import Taskclass DebugTask(Task):​def __call__(self, *args, **kwargs):print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))return super(DebugTask, self).__call__(*args, **kwargs)

tip 如果重写 taks 中的 __call__ 函数,就必须要要调用 super() 函数,这样基本调用方法可以设置直接调用任务时使用的默认请求。 基类是比较特殊的,因为它还没有绑定到任何特定的应用程序。一旦任务绑定到应用程序,它将读取配置以设置默认值,等等。 要实现基类,需要使用 app.taks() 装饰器创建任务:

@app.task(base=DebugTask)def add(x, y):return x + y

也可以通过更改应用程序的 app.task() 属性来更改其默认基类:

>>> from celery import Celery, Task​>>> app = Celery()​>>> class MyBaseTask(Task):... queue = 'hipri'​>>> app.Task = MyBaseTask>>> app.Task<unbound MyBaseTask>​>>> @app.task... def add(x, y):... return x + y​>>> add<@task: __main__.add>​>>> add.__class__.mro()[<class add of <Celery __main__:0x1012b4410>>,<unbound MyBaseTask>,<unbound Task>,<type 'object'>]

Celery 应用:Application相关推荐

  1. 魔坊APP项目-25-种植园,植物的状态改动、当果树种植以后在celery的异步任务中调整浇水的状态、客户端通过倒计时判断时间,显示浇水道具

    种植园 植物的状态改动 一.当果树种植以后在celery的异步任务中调整浇水的状态 在进行果树种植的时候, 在服务端设置当前果树到等待浇水的redis变量中.通过celery不断进行周期任务的处理, ...

  2. 魔方APP项目-06-用户注册,完成短信验证码的校验、基于Celery实现短信异步发送、用户登录,jwt登陆认证、服务端提供用户登录的API接口

    一.用户注册- 1.完成短信验证码的校验 application.utils.language.message,代码: class ErrorMessage():ok = "ok" ...

  3. Celery中文翻译-Application

    Celery在使用前必须实例化,称为application或app.app是线程安全的,具有不同配置.组件.task的多个Celery应用可以在同一个进程空间共存. # 创建Celery应用 > ...

  4. python定时任务contrib_django+celery配置(定时任务+循环任务)

    下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery ...

  5. celery源码分析-定时任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的定时任务与Django配置 celery也可以执行定时任务来执行相关操作,ce ...

  6. celery源码分析-wroker初始化分析(上)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery与Django的配合使用 首先,在安装有django的环境中创建一个django ...

  7. celery源码分析:multi命令分析

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...

  8. Django + Nginx + Uwsgi + Celery + Rabbitmq 做一个高速响的应网站架构

    Django :渲染页面,站点访问控制,管理网站,网站框架 Nginx:ip访问管理,数据传输控制 Uwsgi:处理链接Diango和Nginx的传输协议,也可以看作是处理客户端数据和Django之间 ...

  9. celery 可视化_在Flask中使用Celery进行多任务分布执行

    关键字:Flask, Redis, RabbitMQ, Celery, Broker, Backend 前言 在后端服务器有时候需要处理耗时较长的任务,例如发送电子邮件,在处理这些任务时,这个线程就处 ...

最新文章

  1. Vs2010编译错误集
  2. TCP/UDP,SOCKET,HTTP,FTP协议简析
  3. script到底应该放在哪里
  4. Java 并发:Executor ExecutorService ThreadPoolExecutor
  5. VisDA-2020亚军技术方案分享
  6. idea 注释中 类 跳转_javaSE第一部分 数据类型、idea快捷键
  7. 迷宫问题 (dfs)
  8. 《南溪的python灵隐笔记》——tqdm的学习笔记
  9. .net学习笔记之协变和抗变(原创)
  10. haproxy实现mysql从库负载均衡
  11. SQL:postgresql中判断一个点是否落在指定区域
  12. 面试记录:冒泡排序都不会,大哥你会编程吗
  13. oracle11g和10的区别,同平台升级 oracle 10 到 oracle11g的一些考虑和实际操作
  14. 转帖:算法好学吗?——《大话数据结构》读者书评
  15. 百度SEO站群PTCMS全自动采集小说网站源码
  16. 生物信息相关国家自然科学基金汇总(持续更新中)
  17. elementUI 相同元素合并行
  18. python之数据库-表操作
  19. 知识图谱中的实体定义
  20. Chango的数学Shader世界(九)流体模拟-散度,梯度,二阶导与拉普拉斯

热门文章

  1. 万字长文总结机器学习的模型评估与调参 | 附代码下载
  2. ​MMIT冠军方案 | 用于行为识别的时间交错网络,商汤公开视频理解代码库
  3. 为什么校招面试中总被问“线程与进程的区别”?我该如何回答?
  4. 技术驰援抗疫一线, Python 线上峰会免费学!
  5. 作为一名程序员,数学到底对你有多重要?
  6. 数据为王的时代,如何用图谱挖掘商业数据背后的宝藏?
  7. 手机芯片谁是AI之王?高通、联发科均超华为
  8. 干货!这里有一份神经网络入门指导,请收下!
  9. 重磅 | 周志华最新论文:首个基于决策树集成的自动编码器,表现优于DNN
  10. MySQL模糊查询再也用不着 like+% 了!