文章目录

  • 定时任务库对比
  • 简介
  • 安装
  • 初试
  • 进阶
    • 项目结构
    • 配置文件
    • Celery实例化
    • 实时任务
    • 定时任务
    • 调用任务
    • 启动定时任务
  • 任务状态跟踪
  • 递归调用
  • Celery配置
  • 命令行参数
  • 分布式集群部署
  • 任务队列监控
  • 动态定时任务
  • 动态定时任务(指定时间)
  • 遇到的坑
  • 参考文献

定时任务库对比

推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler

大小 优点 缺点 适用场景
Schedule 轻量级 易用无配置 不能动态添加任务或持久化任务 简单任务
Celery 重量级 ①任务队列
②分布式
①不能动态添加定时任务到系统中,如Flask(Django可以)
②设置起来较累赘
任务队列
APScheduler 相对重量级 ①灵活,可动态增删定时任务并持久化
②支持多种存储后端
③集成框架多,用户广
重量级,学习成本大 通用
Rocketry 轻量级 易用功能强 尚未成熟,文档不清晰 通用

Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现

简介

Celery 是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。

Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

  • 提交任务给 Broker 队列
  • 如果是异步任务,Worker 会立即从队列中取出任务并执行,执行结果保存在 Backend 中
  • 如果是定时任务,任务由 Celery Beat 进程周期性地将任务发往 Broker 队列,Worker 实时监视消息队列获取队列中的任务执行

应用场景

  • 长时间任务的异步执行, 如上传大文件
  • 实时任务执行,支持集群部署,如支持高并发的机器学习推理
  • 定时任务执行,如定时发送邮件

安装

本文使用 Redis 作为 Broker 即消息队列

pip install celery
pip install redis

需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。

注意!从 Celery 4.x 开始官方不再支持Windows。

初试

Celery 的开发主要有四个步骤:

  1. 实例化 Celery
  2. 定义任务
  3. 启动任务 Worker
  4. 调用任务

启动 Redis

redis-server

实例化 Celery 和 定义任务

tasks.py

import time
from celery import Celerycelery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')  # 实例化 Celery@celery.task
def sendmail(mail):  # 定义任务。使用@task装饰器print('sending mail to %s...' % mail['to'])time.sleep(2.0)print('mail sent.')return True

启动任务 Worker

celery -A tasks worker --loglevel=info --pool=solo

调用任务

from tasks import sendmailresult = sendmail.delay(dict(to='celery@python.org'))
value = result.get()
print(value)  # 运算结果值
print(result.successful())  # 是否成功
# print(result.fail())  # 是否失败
print(result.ready())  # 是否执行完成
print(result.state)  # 状态 PENDING -> STARTED -> SUCCESS/FAILURE

结果

进阶

项目结构

.
└─projconfig.py       # 配置文件__init__.py     # Celery实例化tasks.py        # 实时任务period_task.py  # 定时任务

配置文件

config.py

BROKER_URL = 'redis://localhost:6379/0'  # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区配置
CELERY_IMPORTS = (  # 导入的任务模块'proj.tasks','proj.period_task'
)

流行使用RabbitMQ作为Broker中间件,Redis作为结果后端。

Celery实例化

__init__.py

from celery import Celeryapp = Celery('proj')  # 创建Celery实例
app.config_from_object('proj.config')  # 从配置文件中读取配置

实时任务

tasks.py

from proj import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y@app.task
def show(a):return a

启动任务Worker

celery worker -A proj -l info -c 4 -P solo

定时任务

period_task.py

from proj import app
from celery.schedules import crontab@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):"""按频率执行定时任务"""# 每5秒执行一次tostring('Hello')sender.add_periodic_task(5.0, tostring.s('Hello'), name='tostring')# 每周一07:30执行tostring('Happy Mondays!')sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),tostring.s('Happy Mondays!'),)# 每分钟执行一次sender.add_periodic_task(crontab(minute='*/1'),tostring.s('A minute'),name='A minute')@app.task
def tostring(s):return s

先忽略这部分

更细粒度定时设置查阅:

  1. Crontab schedules
  2. crontab 命令详解
  3. Periodic Tasks

定时任务配置也可以这样设置

from proj import app
from celery.schedules import crontab@app.task
def tostring(s):return sapp.conf.beat_schedule.update(hello={'task': tostring.name,'schedule': 5.0,  # 每5秒执行一次tostring('Hello')'args': ('Hello',)},happy_mondays={'task': tostring.name,'schedule': crontab(hour=7, minute=30, day_of_week=1),  # 每周一07:30执行tostring('Happy Mondays!')'args': ('Happy Mondays!',)},a_minute={'task': tostring.name,'schedule': crontab(minute='*/1'),  # 每分钟执行一次'args': ('A minute',)},
)

调用任务

常规任务

  • delay():直接调用任务,是 apply_async() 的封装
  • apply_async():通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数
  • signature():创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数
  • s():创建签名的快捷方式
from wedo.tasks import mulresult = mul.delay(1, 2)  # 直接调用
print(result.get())result = mul.apply_async((1, 2), countdown=2)  # 2s后执行
print(result.get())t1 = mul.signature((1, 2), countdown=2)  # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())t1 = mul.s(1, 2).set(countdown=2)  # 创建签名的快捷方式
result = t1.delay()
print(result.get())

组合任务

  • group():组合,接受一个可并行调用的任务列表
  • chain():串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)
  • chord():和弦,类似 group() 但包含回调,在所有任务执行完后再调用任务
  • map():将参数列表应用于该任务
  • starmap():将复合参数列表应用于该任务
  • chunks():将一个很长的参数列表分块成若干部分
from proj.tasks import add, mul, show
from celery import group, chain, chordresult = group(add.s(i, i) for i in range(5))()  # 组合
print(result.get())  # [0, 2, 4, 6, 8]result = chain(add.s(1, 2), add.s(3), mul.s(3))()  # 串联
print(result.get())  # ((1+2)+3)*3=18result = chord((add.s(i, i) for i in range(5)), show.s())()  # 和弦
print(result.get())  # [0, 2, 4, 6, 8]result = ~show.map(['Hello', 'World'])
print(result)  # ['Hello', 'World']result = ~add.starmap([(2, 2), (4, 4)])
print(result)  # [4, 8]res = add.chunks(zip(range(10), range(10)), 2)()
print(res.get())  # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]

详细查阅:

  1. 签名Signatures
  2. Celery函数

启动定时任务

启动定时任务Beat

celery beat -A proj.period_task -l info

结果

任务状态跟踪

项目结构

.
└─monitorconfig.py       # 配置文件__init__.py     # Celery实例化tasks.py        # 实时任务main.py         # 调用任务

config.py

BROKER_URL = 'redis://localhost:6379/0'  # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区配置
CELERY_IMPORTS = (  # 导入的任务模块'monitor.tasks'
)

__init__.py

from celery import Celery, Task
from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)  # 日志app = Celery('monitor')  # 创建Celery实例
app.config_from_object('monitor.config')  # 从配置文件中读取配置class TaskMonitor(Task):def on_success(self, retval, task_id, args, kwargs):"""success时回调"""logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))def on_retry(self, exc, task_id, args, kwargs, einfo):"""retry时回调"""logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))def on_failure(self, exc, task_id, args, kwargs, einfo):"""failure时回调"""logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

tasks.py

from monitor import app, TaskMonitor@app.task(base=TaskMonitor)
def success():return 1@app.task(bind=True, base=TaskMonitor)
def retry(self):try:raise Exceptionexcept Exception as exc:self.retry(exc=exc)@app.task(base=TaskMonitor)
def failure():raise Exception

main.py

from monitor.tasks import success, retry, failureresult = success.delay()
print(result.successful())  # 是否成功
print(result.ready())  # 是否执行完成
print(result.state)  # 状态 PENDING -> STARTED -> SUCCESS/FAILURE
print()result = retry.delay()
print(result.successful())
print(result.ready())
print(result.state)
print()result = failure.delay()
print(result.successful())
print(result.ready())
print(result.state)

效果

递归调用

tasks.py

from celery import Celeryapp = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',imports=['tasks']
)
app.conf.timezone = 'Asia/Shanghai'
i = 0@app.task
def show(a):global ii = i + 1s = '{} {}'.format(a, i)print(s)if i >= 20:i = 0returnshow.apply_async(args=(a,), countdown=i)  # 递归

调用任务

from tasks import showshow.apply_async(args=('Hi',))

结果

Celery配置

可以直接通过代码配置而不用 Celery.config_from_object()

app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',# imports=['tasks']
)
app.conf.imports = ['tasks']

详细查阅:Configuration and defaults

命令行参数

参数 含义 全称
-A 指定模块
-l 日志level –loglevel
-c 进程数 –concurrency
-Q 指定队列 –queue
-B 周期性任务 –beat
-P 池的实现 –pool

详细查阅:Command Line Interface

分布式集群部署

部署过程和单机启动一样,使用相同项目代码和启动命令。

实现原理是共享 Broker 队列。

任务队列监控

flower 是一款 Celery 的监控工具

安装

pip install flower

启动

flower -A wedo --port=5555

Celery 版本大于4.4.7可能会报错

  • ImportError: cannot import name ‘Command’ from ‘celery.bin.base’ - Stack Overflow

动态定时任务

Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现

最好用 APSchedule 实现,因为动态定时任务需要用到长时间的 countdown 或 eta,若这样的定时任务过多,会大量占用内存,导致重启和执行非延迟任务会很耗时。

并且使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行

更好的解决办法是每天定时跑!!即实现from celery.schedules import crontab

celery.conf.beat_schedule.update

tasks.py

import sqlite3
from celery import Celery
from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)  # 日志app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',imports=['tasks'],
)
app.conf.timezone = 'Asia/Shanghai'conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks`
(`id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,`name` TEXT,`countdown` INTEGER
);
'''
c.execute(sql)  # 创建数据库def create(name='job', countdown=5):"""创建定时任务"""sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'c.execute(sql, (name, countdown))conn.commit()return c.lastrowiddef read(id=None, verbose=False):"""查询定时任务"""sql = 'SELECT * FROM `tasks` 'if id:sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)all_rows = c.execute(sql).fetchall()if verbose:print(all_rows)return all_rowsdef update(id, countdown):"""修改定时任务"""sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'c.execute(sql, (countdown, id))conn.commit()def delete(id, verbose=False):"""删除定时任务"""sql = 'DELETE FROM `tasks` WHERE `id`=?'affected_rows = c.execute(sql, (id,)).rowcountif verbose:print('已删除{}行数据'.format(affected_rows))conn.commit()@app.task
def job(id):# 读取定时任务数据id = read(id)if id:id, name, countdown = id[0]else:logger.info('stop')return# 需要进行的任务logger.warning('id={}'.format(id))logger.warning('name={}'.format(name))logger.warning('countdown={}'.format(countdown))# 递归调用job.apply_async(args=(id,), countdown=countdown)

main.py

from tasks import *print('创建定时任务')
id = create(name='job', countdown=5)
job(id)  # 立即运行
# job.apply_async((id,), countdown=5)  # 5s后运行print('查询定时任务:', read())input('回车修改定时任务')
update(id, countdown=1)input('回车删除定时任务')
delete(id, verbose=True)

启动

celery -A tasks worker --loglevel=info --pool=solo

效果

动态定时任务(指定时间)

参数 eta 受时区影响,本人感觉较麻烦,可以直接使用 countdown

from datetime import datetime, timedeltadef next_weekday(weekday, d=datetime.now()):""" 获取下周几日期:param weekday: weekday取值1-7:param d: 原日期,默认当前时间:return: datetime.datetime"""delta = weekday - d.isoweekday()if delta == 0:delta = 7return d + timedelta(delta)d1 = datetime.now()
d2 = next_weekday(1, d1)  # 下周一
delta = d2 - d1
countdown = delta.total_seconds()
print(countdown)  # 604800.0

Redis默认不过期

长时间的countdown可能会过期,考虑 Redis 设为不过期或使用 RabbitMQ

redis-cli
KEYS *
TTL _kombu.binding.celery

遇到的坑

1. 报错 ValueError: not enough values to unpack (expected 3, got 0)

启动 Celery 添加参数 --pool=solo

2. 报错 Cannot connect to redis://localhost:6379/0: Error 11002 connecting to localhost:6379. Lookup timed out

参考这篇文章

3. 重启Celery后任务丢失

task_reject_on_worker_lost = True
task_acks_late = True

or

celery.conf.CELERY_REJECT_ON_WORKER_LOST = True
celery.conf.CELERY_ACKS_LATE = True

4. 使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行

将Celery升级到4.5以上并增大visibility_timeout的时间

默认visibility_timeout为3600秒即一小时

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
或者
app.conf.broker_transport_options = {'visibility_timeout': 43200}

还可以通过 MySQL 的唯一索引特性实现锁:在业务中添加锁,能插入就执行

参考文献

  1. Celery Documentation
  2. Celery 中文手册
  3. Celery GitHub
  4. 任务调度利器:Celery
  5. Celery 从入门到进阶
  6. Flower Documentation
  7. Flower GitHub
  8. Celery 常见问题
  9. Python动态处理定时任务的生态与深坑
  10. Celery 报错 Windows 平台:ValueError: not enough values to unpack (expected 3, got 0)
  11. How to dynamically add / remove periodic tasks to Celery (celerybeat)
  12. Common Issues Using Celery
  13. 解决Celery进程重启后,正在进行中的任务丢失或者标记为失败
  14. Celery重复执行同一个任务
  15. celery异步任务重复执行的解决办法
  16. Visibility timeout
  17. 使用Celery踩过的坑
  18. Celery重复执行一个Task的解决方案

Python定时任务库Celery——分布式任务队列相关推荐

  1. python 分布式队列_〖Python〗-- Celery分布式任务队列

    [Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...

  2. Celery 分布式任务队列快速入门

    Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...

  3. Celery分布式任务队列的认识和基本操作

    一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...

  4. python定时任务contrib_django+celery配置(定时任务+循环任务)

    下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery ...

  5. Python 第三方库之 Celery 分布式任务队列

    一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...

  6. python Celery 分布式任务队列快速入门

    本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...

  7. Celery分布式任务队列学习记录

    安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...

  8. Python定时任务库schedule的使用

    参考 https://schedule.readthedocs.io/en/stable/ 安装 pip install schedule 基本使用 基本的时间调度,调用语句已经说明地比较清晰: im ...

  9. 分布式任务队列 Celery — 深入 Task

    目录 文章目录 目录 前文列表 前言 Task 的实例化 任务的名字 任务的绑定 任务的重试 任务的请求上下文 任务的继承 前文列表 分布式任务队列 Celery 分布式任务队列 Celery -- ...

最新文章

  1. GitHub标星1.5w+,从此我只用这款全能高速下载工具
  2. 定义一个有参宏SWAP(t,x,y),用以交换t类型的两个参数的值
  3. leetcode算法题--学生分数的最小差值
  4. C# 获取FormData数据
  5. Android开发之自定义ImageView圆角图片的方法
  6. React Native的键盘遮挡问题(input/webview里)
  7. 大到创业,小到做一份副业
  8. mysql单张表数据量极限_极限数据量范围的安全测试
  9. mysql分组取每组前几条记录_[转] mysql分组取每组前几条记要(排名)
  10. AC9560网卡linux驱动安装
  11. OPNET开发教程合集
  12. python爬虫百度网盘_python爬取百度云网盘资源
  13. PS动作怎么做爆炸火焰效果特效
  14. 推荐一款低代码报表开发工具,操作类似Excel
  15. 图片如何转PDF?这两种方法很好用
  16. 怎样看待Android的发展前景?以及Android开发的职业规划
  17. STVD 编译提示 #error clnk :1 missing output file 的问题
  18. 网络技能大赛-2018年国赛真题[2018年全国职业技能大赛高职组计算机网络应用赛项真题-I卷]路由交换部分答案详解
  19. 感冒药盒上请看清这6个字,一定要注意! “美”:支气管炎患者慎用
  20. 单片机 利用C语言产生正弦波DA数据

热门文章

  1. Numba witch makes Python code fast
  2. 被客户抓住了尾巴,28理论与长尾理论
  3. 《决战大数据》读书笔记
  4. 手机终端高级测试工程师经验总结
  5. 算法笔记-快速排序(C版本与Python版本)
  6. 动态再结晶CA法模拟基础之元胞机的邻居类型及边界条件
  7. python 正则表达式(Regular Expression)基础学习笔记
  8. 计算机减法函数word,谁说Excel才能运算?Word计算功能同样强大,公式函数都不在话下-excel减法函数...
  9. 超实用的Java面试宝典
  10. codeforces1474D. Cleaning