celery 学习笔记 01-介绍

celery 是 python 中的常用的任务队列框架,经常用于异步调用、后台任务等工作。celery 本身以 python 写,但协议可在不同的语言中实现,其它语言也可以用 celery 执行相应的任务。在 web 应用,为提高系统响应速度,发送邮件、数据整理等需要长时间执行的任务,通常以异步任务的方式执行,这时就需要用到像 celery 类的框架。另一种常见的场景是大型系统的分布式处理,为了提升系统性能,各个组件通常以多个实例运行不同主机上,而组件之间的调用就需要用到 celery 这样的框架。使用 celery (或消息队列),有助于降低系统组件之间的耦合,有助于实现灰度发布、实现服务的分布式、实现水平扩展,最终提升系统健壮性和处理性能。

celery (和类似框架)的核心是任务队列。用户发起任务,celery 负责把任务排队和整理,然后交到任务执行器 worker 中。 worker 监视任务队列,获取新任务并执行。在 celery 内部,以消息机制协调各个组件工作,消息需要借助一个中间人 broker 进行,如下 ::

client → celery task → broker → celery worker↑                                   ↓←       ←        ←        ← result backend

client 发起任务时,一般是以异步方式(除非必要的同步 rpc ),获得一个任务的 id 并保存下来,后续可通过 id 到 result backend 中查询任务执行结果。broker 是第三方组件,可使用消息队列( rabbitmq 等)、redis、数据库等,只要能实现消息的存储和分发理论上都能使用。 worker 以线程或进程的形式运行,从 broker 中取任务执行,然后把结果保存到 result backend 。

目前 rabbitmq 的 broker 实现的功能最完备,在开发环境中也可以使用 sqlite 等比较方便的方式,但性能会很差,不能用在生产环境上。

另外需要注意的是,由于不同操作系统的进程模型的差异,celery 会在 windows 上产生一些配置方面的怪异问题。

celery 可直接通过 pip 安装,在 virtualenv 下,直接运行 ::

pip install celery

再安装 broker 所需要的驱动,例如使用 rabbitmq ,则安装 ::

pip install amqp

同时安装好 rabbitmq (建议通过 docker 安装,使用 rabbitmq:management 镜像,可在 15672 端口查看管理控制台)。

然后使用下面的代码示例(摘录来自: Ask Solem. “Celery Manual, Version 3.1“) ::

# hello.py
from celery import Celeryapp = Celery('hello', broker='amqp://guest:guest@localhost//')@app.task
def hello():return 'hello world'if __name__ == '__main__':r = hello.delay()

然后,启动 worker ::

celery -A hello worker --loglevel=info

client 执行任务 ::

python hello.py

app.task 装饰器标记一个函数为 celery 任务,client 用 delay 方法执行时。 delay 调用 apply_async() 进行异步执行, apply_async 还可配置如队列、countdown 等执行选项。 celery 返回一个 AsyncResult 对象,如果 result backend 配置正确,client 可暂时把对象中的任务 id 保存到数据库,后面再通过这个 id 获取异步执行的结果。

上面的简单例子是没有参数的,如果增加参数,如下 ::

# add.py
from celery import Celeryapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task
def add(x, y):return x+yif __name__ == '__main__':r = add.delay(1, 2)print(r.wait())

启动 worker ::

celery -A add worker --l info

调用 ::

python add.py

当任务结果用 amqp 保存时,结果只能取一次, 因此无法在后续调用中查询任务结果。这个例子用 sqlite 保存了任务执行结果,因此 client 可在 r.wait() 查询任务的结果、任务的状态等等很多信息,可把 r.id 保存到数据库,然后未来查询任务的 AsyncResult ::

r2 = app.AsyncResult(r.id)
print(r2.wait())
print(r2.successful())

add.py 中使用了两个参数 x y ,而 celery 需要通过 broker 传递这两个参数,这时需要对数据进行序列化,将 x y 对象转换为无结构的数据,然后 worker 接收到后再把数据还原为 x y 对象。 celery 内置的序列化方法包括 pickle 、 json 等等,如果对象比较复杂,需要自己定义序列化方法。

如果不想立即执行任务,而是把任务传递到其它地方,通过 celery 的 subtask 支持。 subtask 是对 task 的调用参数和执行选项的一个封装,如 ::

add.subtask((2,2), countdown=10)
add.s(2,2)

subtask 或 s 返回的是一个 task 的签名(celery.canvas.Signature),它可实现工作流、偏函数等效果。subtask 支持和 task 同样的调用方法,如 ::

s = add.s(2)    # subtask ,partial
s.delay(2)      # 发送消息开始异步执行

在 celery 工作流中组织 subtask 的方式有 group / chain / chord 等等, group 中任务并发执行,chain 中任务顺序执行,chord 中进行回调。而这些组织方式本身也是 subtask ,可嵌套使用 ::

# workflow.py
from celery import Celery, group, chainapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task
def add(x, y):return x+yif __name__ == '__main__':g = group((add.s(i, i) for i in range(10)))r = g.delay()print(r.get())c = chain(add.s(1, 2) | add.s(3))r2 = c.delay()print(r2.get())

celery 的任务调用通过网络发送任务的名字和参数,不发送任务代码, worker 收到任务后根据任务名和参数执行相应的代码。因此不同 worker 中的代码版本不一样时,会有不同的处理结果。如果 worker 中不能处理相应的任务名,就会报错。

转载于:https://www.cnblogs.com/fengyc/p/5655287.html

celery 学习笔记 01-介绍相关推荐

  1. HTML/CSS学习笔记01【概念介绍、基本标签】

    w3cschool菜鸟教程.CHM(腾讯微云):https://share.weiyun.com/c1FaX6ZD HTML/CSS学习笔记01[概念介绍.基本标签.表单标签][day01] HTML ...

  2. C#设计模式(学习笔记[01])

    C#设计模式(学习笔记[01])<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com:office:office&quo ...

  3. 智能语音:好玩的语音控制是怎么实现的?学习笔记01

    智能语音:好玩的语音控制是怎么实现的?学习笔记01 智能音箱的技术架构 智能音箱主要涉及拾音.前端信号处理.语音识别.自然语言处理和语音合成等技术,现在一些产品甚至提供了声纹识别技术. 当然,智能音箱 ...

  4. C#学习笔记 01.01

    C#学习笔记 01.01 (学习视频来自bilibili的传智播客赵老师基础教学视频) 服务器与客户端的区别 服务器其实本质上还是一个电脑,只是加装了很多的硬盘,从而实现对数据的大规模存储. 而客户端 ...

  5. 大数据Hadoop教程-学习笔记01【大数据导论与Linux基础】

    视频教程:哔哩哔哩网站:黑马大数据Hadoop入门视频教程,总时长:14:22:04 教程资源:https://pan.baidu.com/s/1WYgyI3KgbzKzFD639lA-_g,提取码: ...

  6. javaSE学习笔记01 入门篇

    javaSE学习笔记01 入门篇 java语言概述 Java背景知识 java是 美国 sun 公司 在1995年推出的一门计算机高级编程语言. java早期称为Oak(橡树),后期改名为Java. ...

  7. NODEMCU学习笔记-01 esp8266 WIFI杀手 源码上传版

    NODEMCU学习笔记-01 esp8266WIFI杀手 动手前的准备 NODEMCU和ESP8266 ARDUINO IDE GITHUB CSDN 让我们开始吧 连接开发板并安装驱动 安装ardu ...

  8. MySQL技术内幕-InnoDB存储引擎第2版-学习笔记-01

    MySQL技术内幕-InnoDB存储引擎第2版-学习笔记-01 1. MySQL体系结构和存储引擎 1.1 定义数据库和实例 数据库database: 物理操作系统文件或其他形式文件类型的集合. 当使 ...

  9. UE4 Material 101学习笔记——01-07 介绍/PBR基础/UV扭曲/数据类型/翻页动画/材质混合/性能优化

    UE4 Material 101学习笔记--01-07 介绍/PBR基础/UV扭曲/数据类型/翻页动画/材质混合/性能优化 Lec 01 什么是着色器 What Is A Shader? 1.1 介绍 ...

  10. 【Mybatis】学习笔记01:连接数据库,实现增删改

    需要数据库SQL的请跳转到文末 哔哩哔哩 萌狼蓝天[转载资料][尚硅谷][MyBatis]2022版Mybatis配套MD文档[Mybatis]学习笔记01:连接数据库,实现增删改[Mybatis]学 ...

最新文章

  1. SSM框架——Spring+SpringMVC+Mybatis的搭建教程
  2. ant Table td 溢出隐藏(省略号)
  3. 【Kotlin】Kotlin 中使用 ButterKnife ( 仅用于适配 Kotlin 语言 | 不推荐新项目使用 )
  4. ios查看ipa是否函数特定字符_iOS 中基础字符判断函数收集(如判断大小写、数字等)...
  5. #1182 : 欧拉路·三(有向图的欧拉路)
  6. mysql innodb flush method_对innodb_flush_method的一点解释
  7. 使用memcmp函数判断两个函数的前n位字节数是否相等
  8. 【Markdown】新手快速入门基础教程
  9. Jsoup解析html某片段的问题
  10. 《Kali+Linux渗透测试的艺术》学习总结之----Kali Linux简介
  11. 历史文章之机器学习和深度学习
  12. Springboot+Vue实现物业管理系统
  13. 【Oracle19C】数据库基本知识
  14. wifi已连接不可上网服务器无响应,为什么手机连上wifi却上不了网
  15. 全球与中国家庭捕鱼船市场深度研究分析报告
  16. springboot的商品设计热销排行实现
  17. MapReduce优劣,理解MapReduce与Hadoop
  18. 强化学习(Reinforcement Learning)背景介绍
  19. adas记录仪app_路影行车记录仪app
  20. 80x86的寻址方式及Dosbox演示

热门文章

  1. @EnableAsync @Async 的详解
  2. 自动配置原理精讲||@Conditional ||怎么知道哪些自动配置类生效? 启用debug=true属性(在配置文件配置);
  3. 达梦数据库DM8飞腾版本、芯版本获取地址,最新达梦数据库各国产化版本获取方法,达梦数据库DM8使用手册、产品文档获取
  4. 使用Matlab画心形线
  5. 常用的图像增强处理办法
  6. bitand( ) 函数用法
  7. vector中erase函数
  8. 查看回滚事物sql_卧槽:这款 SQL自动检查神器,吊炸天的功能,真TMD多!!
  9. golang var 初始化时机_你应该知道的 Go 调度器知识:Go 核心原理 — 协程调度时机...
  10. 录取5秒钟的KNN取景效果gif(Opencv) Python实现