celery的初次使用

基本步骤:

  • 选择并且安装一个消息中间件(Broker)

  • 安装 Celery 并且创建第一个任务

  • 运行职程(Worker)以及调用任务

  • 跟踪任务的情况以及返回值

应用

创建第一个 Celery 实例程序,把创建 Celery 程序成为 Celery 应用,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点,例如创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。

首先创建 tasks.py:

#  tasks.pyfrom celery import Celery#  app = Celery('tasks', broker='amqp://guest@localhost//')
app = Celery('tasks', broker='redis://127.0.0.1/2')@app.taskdef add(x, y):return x + y

第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称。

第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。

更多相关的 Celery 中间人(Broker)的选择方案,可查阅官方文档。例如,对于 RabbitMQ 可以写为 amqp://localhost ,使用 Redis 可以写为 redis://localhost。

创建了一个名称为 add 的任务,返回的俩个数字的和。

运行 Celery 职程(Worker)服务

现在可以使用 worker 参数进行执行刚刚创建职程(Worker):

celery -A tasks worker -l info

关于 Celery 可用的命令完整列表,可以通过以下命令进行查看:

celery worker --help

也可以通过以下命令查看一些 Celery 帮助选项:

celery help

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay()apply_async() 的快捷方法,可以更好的控制任务的执行:

可以在交互式shell环境中运行下列代码

from tasks import addadd.delay(4, 4)

该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。

调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,如果开启则需要配置 Celery 的结果后端。

执行上面代码可能遇到的问题:

新的报错

ValueError: not enough values to unpack (expected 3, got 0)
  • 安装
pip install eventlet
  • 然后启动worker的时候加一个参数,如下:(执行任务)
celery worker -A <mymodule> -l INFO
  • 替换它
celery -A <mymodule> worker -l info -P eventlet

保存结果

如果您需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。Celery 内置了一些后端结果:SQLAlchemy/Django ORM、Memcached、Redis、 RPC (RabbitMQ/AMQP)以及自定义的后端结果存储中间件。

本次实例,使用 RPC 作为结果后端,将状态信息作为临时消息回传。后端通过 backend 参数指定给 Celery(或者通过配置模块中的 result_backend 选项设定):

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

可以使用Redis作为Celery结果后端,使用RabbitMQ作为中间人(Broker)可以使用以下配置(这种组合比较流行):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

现在已经配置结果后端,重新调用执行任务。会得到调用任务后返回的一个 AsyncResult 实例:

>>> result = add.delay(4, 4)

ready() 可以检测是否已经处理完毕:

>>> result.ready()
False

整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

>>> result.get(timeout=1)8

如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

>>> result.get(propagate=False)

如果任务出现异常,可以通过以下命令进行回溯:

>>> result.traceback

配置

Celery 像家用电器一样,不需要任何配置,开箱即用。它有一个输入和输出,输入端必须连接中间人(Broker),输出端可以连接到结果后端。如果仔细观察一些家用电器,会发现有很多到按钮,这就是配置。

大多数情况下,使用默认的配置就可以满足,也可以按需配置。

可以直接在程序中进行配置,也可以通过配置模块进行专门配置。例如,通过 task_serializer 选项可以指定序列化的方式:

app.conf.task_serializer = 'json'

如果需要配置多个选项,可以通过 update 进行配置:

app.conf.update(task_serializer='json',accept_content=['json'], # Ignore other contentresult_serializer='json',timezone='Europe/Oslo',enable_utc=True,)

针对大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。集中化配置可以像系统管理员一样,当系统发生故障时可针对其进行微调。

可以通过 app.config_from_object() 进行加载配置模块:

app.config_from_object('celeryconfig')

其中 celeryconfig 为配置模块的名称,这个是可以自定义修改的、

在上面的实例中,需要在同级目录下创建一个名为 celeryconfig.py 的文件,添加以下内容:

celeryconfig.pybroker_url = 'pyamqp://'result_backend = 'rpc://'task_serializer = 'json'result_serializer = 'json'accept_content = ['json']timezone = 'Europe/Oslo'enable_utc = True

可以通过以下命令来进行验证配置模块是否配置正确:

$ python -m celeryconfig

Celery 也可以设置任务执行错误时的专用队列中,这只是配置模块中一小部分,详细配置如下:

#  celeryconfig.pytask_routes = {'tasks.add': 'low-priority',}

Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:

#  celeryconfig.pytask_annotations = {'tasks.add': {'rate_limit': '10/m'}}

如果使用的是 RabbitMQ 或 Redis 的话,可以在运行时进行设置任务的速率:

$ celery -A tasks control rate_limit tasks.add 10/mworker@example.com: OKnew rate limit set successfully

celery的初次使用相关推荐

  1. celery任务:Tasks(一)

    celery任务:Tasks 任务是构建 Celery 应用程序的组成模块. 任务是从任何可调用创建的类,它有两种角色,一种角色定义了调用任务时发生的事情(发送消息),另外一种角色为职程(Worker ...

  2. python消息队列celery高可用_分布式消息队列-Celery

    怎么能不恨呢,在我发现自己是恶鬼的时候,在我最绝望最虚弱的时候,这个世上最该跟我在一起的人却用刀把我的心刺穿了 Celery 是 Distributed Task Queue,分布式任务队列.分布式决 ...

  3. Django通过celery 异步发送邮件 : django开发之天天生鲜项目知识总结【5】

    这里初次学习celery,只简单讲解一下如何使用celery 异步发送邮件,在以后的总结中还会,多次提到celery,因为后面很多任务都需要用到celery执行任务,后面再专门针对celery做具体的 ...

  4. python定时任务contrib_django+celery配置(定时任务+循环任务)

    下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery ...

  5. django中使用celery简单介绍

    链客,专为开发者而生,有问必答! 此文章来自区块链技术社区,未经允许拒绝转载. 本章节我们重点在于实现,如何存储任务的结果. 我们将任务函数改为: from celery_demo.celery im ...

  6. python celery

    celery 一般用于做异步 和定时任务 不过听网上说 celery 坑还是蛮多的,特别定时任务,我们一般用来做定时任务,还有数据导入导出. celery 不支持 redis cluster 集群模式 ...

  7. celery源码分析-定时任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的定时任务与Django配置 celery也可以执行定时任务来执行相关操作,ce ...

  8. celery源码分析-Task的初始化与发送任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...

  9. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

最新文章

  1. 组合模式(Composite Pattern)
  2. Java 里的字符串处理类StringBuffer简介
  3. 公司年会在民俗文化村举行
  4. Win10/Win7小技巧:教你如何彻底关闭系统进程
  5. java 池化_溯本求源: JAVA线程池工作原理
  6. MyBatis框架学习笔记03:利用MyBatis实现关联查询
  7. 【Python】理解Python(1) - Python数据模型,is关键字,类型
  8. django使用Paginator分页展示数据
  9. python 持续集成_使用jenkins和Gitlab进行Python项目的持续集成
  10. SQL Server如何存储特殊字符、上标、下标
  11. 关于目标文件系统,文件过大的解决方法
  12. 负载均衡器 运行在2、3、4、7层之间的区别 [资料整理]
  13. win7电脑变身WiFi热点
  14. 《Cinema 4D + After Effects动态图形设计案例解析》——1.3 动态图形的应用领域
  15. ABAP 金额小写转大写
  16. c语言实现五子棋人人对战教程
  17. 2020-11-25博客营销及软文营销价值
  18. CSDN 2020 博客之星实时数据排名(Python 爬虫 + PyEcharts)
  19. base64编码解码js
  20. html流星雨页面,纯CSS流星雨背景的示例代码

热门文章

  1. 微信9年:张小龙指明方向,微信AI全面开放NLP能力
  2. 自动驾驶人的福音!Lyft公开Level 5部署平台Flexo细节
  3. 东大漆桂林、清华李涓子、复旦肖仰华等大牛确认出席CTA峰会!5月一起打卡杭州...
  4. 三摄正普及,四摄在路上?谷歌逆天AI算法,只做单摄虚化
  5. 与英特尔抢市场,英伟达的数据中心业务能增长到多大?
  6. 快来呀~120类萌狗祝你狗年大吉!
  7. 18个Java8日期处理的实践,太有用了!
  8. Spring MVC 到 Spring BOOT 的简化之路
  9. 时序预测竞赛之异常检测算法综述
  10. 错误率减半需要超过500倍算力!深度学习的未来,光靠烧钱能行吗?