Python定时任务库Celery——分布式任务队列
文章目录
- 定时任务库对比
- 简介
- 安装
- 初试
- 进阶
- 项目结构
- 配置文件
- Celery实例化
- 实时任务
- 定时任务
- 调用任务
- 启动定时任务
- 任务状态跟踪
- 递归调用
- Celery配置
- 命令行参数
- 分布式集群部署
- 任务队列监控
- 动态定时任务
- 动态定时任务(指定时间)
- 遇到的坑
- 参考文献
定时任务库对比
推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler
库 | 大小 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 |
Celery | 重量级 |
①任务队列 ②分布式 |
①不能动态添加定时任务到系统中,如Flask(Django可以) ②设置起来较累赘 |
任务队列 |
APScheduler | 相对重量级 |
①灵活,可动态增删定时任务并持久化 ②支持多种存储后端 ③集成框架多,用户广 |
重量级,学习成本大 | 通用 |
Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 |
Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现
简介
Celery
是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。
Celery
采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。
- 提交任务给 Broker 队列
- 如果是异步任务,Worker 会立即从队列中取出任务并执行,执行结果保存在 Backend 中
- 如果是定时任务,任务由 Celery Beat 进程周期性地将任务发往 Broker 队列,Worker 实时监视消息队列获取队列中的任务执行
应用场景
- 长时间任务的异步执行, 如上传大文件
- 实时任务执行,支持集群部署,如支持高并发的机器学习推理
- 定时任务执行,如定时发送邮件
安装
本文使用 Redis 作为 Broker 即消息队列
pip install celery
pip install redis
需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。
注意!从 Celery 4.x 开始官方不再支持Windows。
初试
Celery 的开发主要有四个步骤:
- 实例化 Celery
- 定义任务
- 启动任务 Worker
- 调用任务
启动 Redis
redis-server
实例化 Celery 和 定义任务
tasks.py
import time
from celery import Celerycelery = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1') # 实例化 Celery@celery.task
def sendmail(mail): # 定义任务。使用@task装饰器print('sending mail to %s...' % mail['to'])time.sleep(2.0)print('mail sent.')return True
启动任务 Worker
celery -A tasks worker --loglevel=info --pool=solo
调用任务
from tasks import sendmailresult = sendmail.delay(dict(to='celery@python.org'))
value = result.get()
print(value) # 运算结果值
print(result.successful()) # 是否成功
# print(result.fail()) # 是否失败
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAILURE
结果
进阶
项目结构
.
└─projconfig.py # 配置文件__init__.py # Celery实例化tasks.py # 实时任务period_task.py # 定时任务
配置文件
config.py
BROKER_URL = 'redis://localhost:6379/0' # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 导入的任务模块'proj.tasks','proj.period_task'
)
流行使用RabbitMQ作为Broker中间件,Redis作为结果后端。
Celery实例化
__init__.py
from celery import Celeryapp = Celery('proj') # 创建Celery实例
app.config_from_object('proj.config') # 从配置文件中读取配置
实时任务
tasks.py
from proj import app@app.task
def add(x, y):return x + y@app.task
def mul(x, y):return x * y@app.task
def show(a):return a
启动任务Worker
celery worker -A proj -l info -c 4 -P solo
定时任务
period_task.py
from proj import app
from celery.schedules import crontab@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):"""按频率执行定时任务"""# 每5秒执行一次tostring('Hello')sender.add_periodic_task(5.0, tostring.s('Hello'), name='tostring')# 每周一07:30执行tostring('Happy Mondays!')sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),tostring.s('Happy Mondays!'),)# 每分钟执行一次sender.add_periodic_task(crontab(minute='*/1'),tostring.s('A minute'),name='A minute')@app.task
def tostring(s):return s
先忽略这部分
更细粒度定时设置查阅:
- Crontab schedules
- crontab 命令详解
- Periodic Tasks
定时任务配置也可以这样设置
from proj import app
from celery.schedules import crontab@app.task
def tostring(s):return sapp.conf.beat_schedule.update(hello={'task': tostring.name,'schedule': 5.0, # 每5秒执行一次tostring('Hello')'args': ('Hello',)},happy_mondays={'task': tostring.name,'schedule': crontab(hour=7, minute=30, day_of_week=1), # 每周一07:30执行tostring('Happy Mondays!')'args': ('Happy Mondays!',)},a_minute={'task': tostring.name,'schedule': crontab(minute='*/1'), # 每分钟执行一次'args': ('A minute',)},
)
调用任务
常规任务
delay()
:直接调用任务,是apply_async()
的封装apply_async()
:通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数signature()
:创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数s()
:创建签名的快捷方式
from wedo.tasks import mulresult = mul.delay(1, 2) # 直接调用
print(result.get())result = mul.apply_async((1, 2), countdown=2) # 2s后执行
print(result.get())t1 = mul.signature((1, 2), countdown=2) # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())t1 = mul.s(1, 2).set(countdown=2) # 创建签名的快捷方式
result = t1.delay()
print(result.get())
组合任务
group()
:组合,接受一个可并行调用的任务列表chain()
:串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)chord()
:和弦,类似group()
但包含回调,在所有任务执行完后再调用任务map()
:将参数列表应用于该任务starmap()
:将复合参数列表应用于该任务chunks()
:将一个很长的参数列表分块成若干部分
from proj.tasks import add, mul, show
from celery import group, chain, chordresult = group(add.s(i, i) for i in range(5))() # 组合
print(result.get()) # [0, 2, 4, 6, 8]result = chain(add.s(1, 2), add.s(3), mul.s(3))() # 串联
print(result.get()) # ((1+2)+3)*3=18result = chord((add.s(i, i) for i in range(5)), show.s())() # 和弦
print(result.get()) # [0, 2, 4, 6, 8]result = ~show.map(['Hello', 'World'])
print(result) # ['Hello', 'World']result = ~add.starmap([(2, 2), (4, 4)])
print(result) # [4, 8]res = add.chunks(zip(range(10), range(10)), 2)()
print(res.get()) # [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
详细查阅:
- 签名Signatures
- Celery函数
启动定时任务
启动定时任务Beat
celery beat -A proj.period_task -l info
结果
任务状态跟踪
项目结构
.
└─monitorconfig.py # 配置文件__init__.py # Celery实例化tasks.py # 实时任务main.py # 调用任务
config.py
BROKER_URL = 'redis://localhost:6379/0' # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ( # 导入的任务模块'monitor.tasks'
)
__init__.py
from celery import Celery, Task
from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__) # 日志app = Celery('monitor') # 创建Celery实例
app.config_from_object('monitor.config') # 从配置文件中读取配置class TaskMonitor(Task):def on_success(self, retval, task_id, args, kwargs):"""success时回调"""logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))def on_retry(self, exc, task_id, args, kwargs, einfo):"""retry时回调"""logger.info('task id:{} , arg:{} , retry ! einfo: {}'.format(task_id, args, exc))def on_failure(self, exc, task_id, args, kwargs, einfo):"""failure时回调"""logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))
tasks.py
from monitor import app, TaskMonitor@app.task(base=TaskMonitor)
def success():return 1@app.task(bind=True, base=TaskMonitor)
def retry(self):try:raise Exceptionexcept Exception as exc:self.retry(exc=exc)@app.task(base=TaskMonitor)
def failure():raise Exception
main.py
from monitor.tasks import success, retry, failureresult = success.delay()
print(result.successful()) # 是否成功
print(result.ready()) # 是否执行完成
print(result.state) # 状态 PENDING -> STARTED -> SUCCESS/FAILURE
print()result = retry.delay()
print(result.successful())
print(result.ready())
print(result.state)
print()result = failure.delay()
print(result.successful())
print(result.ready())
print(result.state)
效果
递归调用
tasks.py
from celery import Celeryapp = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',imports=['tasks']
)
app.conf.timezone = 'Asia/Shanghai'
i = 0@app.task
def show(a):global ii = i + 1s = '{} {}'.format(a, i)print(s)if i >= 20:i = 0returnshow.apply_async(args=(a,), countdown=i) # 递归
调用任务
from tasks import showshow.apply_async(args=('Hi',))
结果
Celery配置
可以直接通过代码配置而不用 Celery.config_from_object()
app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',# imports=['tasks']
)
app.conf.imports = ['tasks']
详细查阅:Configuration and defaults
命令行参数
参数 | 含义 | 全称 |
---|---|---|
-A | 指定模块 | |
-l | 日志level | –loglevel |
-c | 进程数 | –concurrency |
-Q | 指定队列 | –queue |
-B | 周期性任务 | –beat |
-P | 池的实现 | –pool |
详细查阅:Command Line Interface
分布式集群部署
部署过程和单机启动一样,使用相同项目代码和启动命令。
实现原理是共享 Broker 队列。
任务队列监控
flower
是一款 Celery 的监控工具
安装
pip install flower
启动
flower -A wedo --port=5555
Celery 版本大于4.4.7可能会报错
- ImportError: cannot import name ‘Command’ from ‘celery.bin.base’ - Stack Overflow
动态定时任务
Celery不适合动态添加定时任务,但本人认为可以通过数据库+递归调用自身实现
最好用 APSchedule 实现,因为动态定时任务需要用到长时间的 countdown 或 eta,若这样的定时任务过多,会大量占用内存,导致重启和执行非延迟任务会很耗时。
并且使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行
更好的解决办法是每天定时跑!!即实现from celery.schedules import crontab
celery.conf.beat_schedule.update
tasks.py
import sqlite3
from celery import Celery
from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__) # 日志app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',imports=['tasks'],
)
app.conf.timezone = 'Asia/Shanghai'conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks`
(`id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,`name` TEXT,`countdown` INTEGER
);
'''
c.execute(sql) # 创建数据库def create(name='job', countdown=5):"""创建定时任务"""sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'c.execute(sql, (name, countdown))conn.commit()return c.lastrowiddef read(id=None, verbose=False):"""查询定时任务"""sql = 'SELECT * FROM `tasks` 'if id:sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)all_rows = c.execute(sql).fetchall()if verbose:print(all_rows)return all_rowsdef update(id, countdown):"""修改定时任务"""sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'c.execute(sql, (countdown, id))conn.commit()def delete(id, verbose=False):"""删除定时任务"""sql = 'DELETE FROM `tasks` WHERE `id`=?'affected_rows = c.execute(sql, (id,)).rowcountif verbose:print('已删除{}行数据'.format(affected_rows))conn.commit()@app.task
def job(id):# 读取定时任务数据id = read(id)if id:id, name, countdown = id[0]else:logger.info('stop')return# 需要进行的任务logger.warning('id={}'.format(id))logger.warning('name={}'.format(name))logger.warning('countdown={}'.format(countdown))# 递归调用job.apply_async(args=(id,), countdown=countdown)
main.py
from tasks import *print('创建定时任务')
id = create(name='job', countdown=5)
job(id) # 立即运行
# job.apply_async((id,), countdown=5) # 5s后运行print('查询定时任务:', read())input('回车修改定时任务')
update(id, countdown=1)input('回车删除定时任务')
delete(id, verbose=True)
启动
celery -A tasks worker --loglevel=info --pool=solo
效果
动态定时任务(指定时间)
参数 eta
受时区影响,本人感觉较麻烦,可以直接使用 countdown
from datetime import datetime, timedeltadef next_weekday(weekday, d=datetime.now()):""" 获取下周几日期:param weekday: weekday取值1-7:param d: 原日期,默认当前时间:return: datetime.datetime"""delta = weekday - d.isoweekday()if delta == 0:delta = 7return d + timedelta(delta)d1 = datetime.now()
d2 = next_weekday(1, d1) # 下周一
delta = d2 - d1
countdown = delta.total_seconds()
print(countdown) # 604800.0
Redis默认不过期
长时间的countdown可能会过期,考虑 Redis 设为不过期或使用 RabbitMQ
redis-cli
KEYS *
TTL _kombu.binding.celery
遇到的坑
1. 报错 ValueError: not enough values to unpack (expected 3, got 0)
启动 Celery 添加参数 --pool=solo
2. 报错 Cannot connect to redis://localhost:6379/0: Error 11002 connecting to localhost:6379. Lookup timed out
参考这篇文章
3. 重启Celery后任务丢失
task_reject_on_worker_lost = True
task_acks_late = True
or
celery.conf.CELERY_REJECT_ON_WORKER_LOST = True
celery.conf.CELERY_ACKS_LATE = True
4. 使用Redis作为Broker并且异步任务执行时间延迟超过1小时,Celery会重复发布任务,导致任务重复执行
将Celery升级到4.5以上并增大visibility_timeout的时间
默认visibility_timeout为3600秒即一小时
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
或者
app.conf.broker_transport_options = {'visibility_timeout': 43200}
还可以通过 MySQL 的唯一索引特性实现锁:在业务中添加锁,能插入就执行
参考文献
- Celery Documentation
- Celery 中文手册
- Celery GitHub
- 任务调度利器:Celery
- Celery 从入门到进阶
- Flower Documentation
- Flower GitHub
- Celery 常见问题
- Python动态处理定时任务的生态与深坑
- Celery 报错 Windows 平台:ValueError: not enough values to unpack (expected 3, got 0)
- How to dynamically add / remove periodic tasks to Celery (celerybeat)
- Common Issues Using Celery
- 解决Celery进程重启后,正在进行中的任务丢失或者标记为失败
- Celery重复执行同一个任务
- celery异步任务重复执行的解决办法
- Visibility timeout
- 使用Celery踩过的坑
- Celery重复执行一个Task的解决方案
Python定时任务库Celery——分布式任务队列相关推荐
- python 分布式队列_〖Python〗-- Celery分布式任务队列
[Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...
- Celery 分布式任务队列快速入门
Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...
- Celery分布式任务队列的认识和基本操作
一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...
- python定时任务contrib_django+celery配置(定时任务+循环任务)
下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery ...
- Python 第三方库之 Celery 分布式任务队列
一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...
- python Celery 分布式任务队列快速入门
本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...
- Celery分布式任务队列学习记录
安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...
- Python定时任务库schedule的使用
参考 https://schedule.readthedocs.io/en/stable/ 安装 pip install schedule 基本使用 基本的时间调度,调用语句已经说明地比较清晰: im ...
- 分布式任务队列 Celery — 深入 Task
目录 文章目录 目录 前文列表 前言 Task 的实例化 任务的名字 任务的绑定 任务的重试 任务的请求上下文 任务的继承 前文列表 分布式任务队列 Celery 分布式任务队列 Celery -- ...
最新文章
- GitHub标星1.5w+,从此我只用这款全能高速下载工具
- 定义一个有参宏SWAP(t,x,y),用以交换t类型的两个参数的值
- leetcode算法题--学生分数的最小差值
- C# 获取FormData数据
- Android开发之自定义ImageView圆角图片的方法
- React Native的键盘遮挡问题(input/webview里)
- 大到创业,小到做一份副业
- mysql单张表数据量极限_极限数据量范围的安全测试
- mysql分组取每组前几条记录_[转] mysql分组取每组前几条记要(排名)
- AC9560网卡linux驱动安装
- OPNET开发教程合集
- python爬虫百度网盘_python爬取百度云网盘资源
- PS动作怎么做爆炸火焰效果特效
- 推荐一款低代码报表开发工具,操作类似Excel
- 图片如何转PDF?这两种方法很好用
- 怎样看待Android的发展前景?以及Android开发的职业规划
- STVD 编译提示 #error clnk :1 missing output file 的问题
- 网络技能大赛-2018年国赛真题[2018年全国职业技能大赛高职组计算机网络应用赛项真题-I卷]路由交换部分答案详解
- 感冒药盒上请看清这6个字,一定要注意! “美”:支气管炎患者慎用
- 单片机 利用C语言产生正弦波DA数据
热门文章
- Numba witch makes Python code fast
- 被客户抓住了尾巴,28理论与长尾理论
- 《决战大数据》读书笔记
- 手机终端高级测试工程师经验总结
- 算法笔记-快速排序(C版本与Python版本)
- 动态再结晶CA法模拟基础之元胞机的邻居类型及边界条件
- python 正则表达式(Regular Expression)基础学习笔记
- 计算机减法函数word,谁说Excel才能运算?Word计算功能同样强大,公式函数都不在话下-excel减法函数...
- 超实用的Java面试宝典
- codeforces1474D. Cleaning