介绍

celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。

定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。

要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。

  • user:用户程序,用于告知celery去执行一个任务。
  • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
  • worker:执行任务

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery version 4.0 runs onPython ❨2.7, 3.4, 3.5❩PyPy ❨5.4, 5.5❩This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:Python 2.6: Celery series 3.1 or earlier.Python 2.5: Celery series 3.0 or earlier.Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

版本和要求

环境准备:

  • 安装rabbitMQ或Redis
    https://www.cnblogs.com/L5251/articles/9146825.html

     https://www.cnblogs.com/L5251/articles/9325586.html

  • 安装celery
         pip3 install celery

快速上手

import time
from celery import Celeryapp = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')@app.task
def xxxxxx(x, y):time.sleep(10)return x + y

s1.py

s2.py

from celery.result import AsyncResult
from s1 import appasync = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')

s3.py

# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
    pip3 install eventlet
# 后期运行修改为:celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
    python3 s2.py# 执行 s3.py ,检查任务状态并获取结果:python3 s3.py

多任务结构

pro_cel├── celery_tasks# celery相关文件夹│   ├── celery.py   # celery连接和配置相关文件│   └── tasks.py    #  所有任务函数├── check_result.py # 检查结果└── send_task.py    # 触发任务

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celerycelery = Celery('xxxxxx',broker='redis://192.168.0.111:6379',backend='redis://192.168.0.111:6379',include=['celery_tasks.tasks'])# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False

pro_cel/celery_tasks/celery

#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def xxxxx(*args, **kwargs):time.sleep(5)return "任务结果"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任务结果"

pro_cel/celery_tasks/tasks.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-from celery.result import AsyncResult
from celery_tasks.celery import celeryasync = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')

pro_cel/check_result.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks# 立即告知celery去执行xxxxxx任务,并传入两个参数
result = celery_tasks.tasks.xxxxx.delay(4, 4)print(result.id)

pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定时任务

1. 设定时间让celery执行一个任务

import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetimev1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10# 使用apply_async并设定时间
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

2. 类似于contab的定时任务

"""
celery beat -A proj
celery worker -A proj -l info"""
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = Falseapp.conf.beat_schedule = {# 'add-every-10-seconds': {#     'task': 'proj.s1.add1',#     'schedule': 10.0,#     'args': (16, 16)# },'add-every-12-seconds': {'task': 'proj.s1.add1','schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),'args': (16, 16)},
}

注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。

Flask中应用Celery

pro_flask_celery/
├── app.py
├── celery_tasks├── celery.py└── tasks.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-from flask import Flask
from celery.result import AsyncResultfrom celery_tasks import tasks
from celery_tasks.celery import celeryapp = Flask(__name__)TASK_ID = None@app.route('/')
def index():global TASK_IDresult = tasks.xxxxx.delay()# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))TASK_ID = result.idreturn "任务已经提交"@app.route('/result')
def result():global TASK_IDresult = AsyncResult(id=TASK_ID, app=celery)if result.ready():return result.get()return "xxxx"if __name__ == '__main__':app.run()

app.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontabcelery = Celery('xxxxxx',broker='redis://192.168.10.48:6379',backend='redis://192.168.10.48:6379',include=['celery_tasks.tasks'])# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False

celery_tasks/celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def hello(*args, **kwargs):print('执行hello')return "hello"@celery.task
def xxxxx(*args, **kwargs):print('执行xxxxx')return "xxxxx"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任务结果"

celery_task/tasks.py

记录

为了定时调用任务,你必须添加记录到打点列表中:

from celery import Celery
from celery.schedules import crontabapp = Celery()@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每10秒调用 test('hello') .sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# 每30秒调用 test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10)# 每周一上午7:30执行
    sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task
def test(arg):print(arg)

用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。

add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:

例子: 每30秒运行 tasks.add .

app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}
app.conf.timezone = 'UTC'

一般会使用配置文件进行配置,如下 
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}

程序里使用:

app.config_from_object('celeryconfig')注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。

时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)

可使用的属性

  • task

    要执行的任务名字

  • schedule

    执行的频率

    可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口。

  • args

    位置参数 (list 或 tuple).

  • kwargs

    键值参数 (dict).

  • options

    执行选项 (dict).

    这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等。

  • relative

    如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔。 
    默认relative是false,频率不会近似,会相对于celery的启动时间。

    Crontab 表达式语法非常灵活。

例子 含义
crontab() 每分钟执行
crontab(minute=0, hour=0) 每天午夜执行
crontab(minute=0, hour=’*/3’) 每三个小时执行: 午夜, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm.
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’) 同上
crontab(minute=’*/15’) 每15分钟执行
crontab(day_of_week=’sunday’) 星期日每分钟
crontab(minute=’‘,hour=’‘, day_of_week=’sun’) 同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’) 每10分钟执行,仅限于周六日3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’) 偶数小时或者能被3整除的小时数执行
crontab(minute=0, hour=’*/5’) 被5整除的小时数,如3pm
crontab(minute=0, hour=’*/3,8-17’) 8am-5pm能被3整除的
crontab(0, 0, day_of_month=’2’) 每月第2天
crontab(0, 0,day_of_month=’2-30/3’) 每偶数天
crontab(0, 0,day_of_month=’1-7,15-21’) 每月1和3周
crontab(0, 0, day_of_month=’11’,month_of_year=’5’) 每年5月11日
crontab(0, 0,month_of_year=’*/3’) 每个季度第1月

开启调度

开启celery定时服务:

$ celery -A proj beat

可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用:

$ celery -A proj worker -B

定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

转载于:https://www.cnblogs.com/L5251/articles/9332304.html

python拓展7(Celery消息队列配置定时任务)相关推荐

  1. python队列线程池_实例详解:python高级编程之消息队列(Queue)与进程池(Pool)

    今天为大家带来的内容是:python高级编程之消息队列(Queue)与进程池(Pool),结合了实例的形式详细分析了Python消息队列与进程池的相关原理.使用技巧与操作注意事项!!! Queue消息 ...

  2. python openstack rabbitmq_OpenStack--Rabbitmq组件消息队列

    概念 队列 MQ 全称为Message Queue,消息队列( MQ ) 是一种应用程序的通信方法.应用程序通过读写入列队的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们. 消息传递指的是 ...

  3. python使用redis的消息队列_Redis实现简单消息队列

    任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大 ...

  4. 阿里云消息队列python_41. Python Queue 多进程的消息队列 PIPE

    消息队列: 消息队列是在消息传输过程中保存消息的容器. 消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程.生产者往管道中写消息,消费者从管道中读消息. 相当 ...

  5. Python使用redis的消息队列

    Redis 服务 1.安装 yum install redis 2. python安装支持模块 /opt/python2.7.13/bin/pip install redis 3. 和redis的简单 ...

  6. 基于Python语言使用RabbitMQ消息队列(一)

    介绍 RabbitMQ 是一个消息中间人(broker): 它接收并且发送消息. 你可以把它想象成一个邮局: 当你把想要寄出的信放到邮筒里时, 你可以确定邮递员会把信件送到收信人那里. 在这个比喻中, ...

  7. python 消息队列 get是从队首还是队尾取东西_从零开始Python对redis作为消息队列的使用...

    一.Redis 服务 1.安装 yum install redis 2. python安装支持模块 /opt/python2.7.13/bin/pip install redis 3. 和redis的 ...

  8. 基于Python语言使用RabbitMQ消息队列(三)

    发布/订阅 前面的教程中我们已经创建了一个工作队列.在一个工作队列背后的假设是每个任务恰好会传递给一个工人.在这一部分里我们会做一些完全不同的东西--我们会发送消息给多个消费者.这就是所谓的" ...

  9. python 消息队列 get是从队首还是队尾取东西_python分布式爬虫中消息队列知识点详解...

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的 ...

最新文章

  1. webbrowser1 脚本报错_c# winform程序 webBrowser 当前页面的脚本发生异常 找不到成员...
  2. 华为云发布全新DevOps实践,大幅提升交付效率
  3. 大学开设python课程吗_在大学为什么你一定要学会Python?
  4. 菜鸟也玩WebMatrix
  5. c# DataTable DataBinding 应用笔记
  6. 动态分区分配_关于硬盘的磁盘分区,干货分享!
  7. 【NLP基础】手把手带你fastText文本分类(附代码)
  8. [Unity3D]Unity3D圣骑士当游戏开发商遭遇Mecanim动画系统
  9. NFS网络文件系统配置
  10. UVA1218 完美的服务 Perfect Service
  11. 搭建高性能计算环境(九)、应用软件的安装之gaussian 09
  12. QQ空间迁移_【山特C3KS_连接ESXI虚拟机】
  13. SecureCRT 破解版v7.1.1.264中文汉化绿色版
  14. 嵌入式软件开发笔试面试知识点总结-操作系统部分
  15. 翻译: 2.7. 如何利用帮助文档 深入神经网络 pytorch
  16. 我的世界电脑正版服务器地址大全,《我的世界》服务器地址大全 各个服务器一览分享...
  17. 软件行业的税收优惠政策包含哪些
  18. HashMap、哈希表、哈希函数
  19. 短视频副业做什么比较靠谱,副业赚钱的路子有哪些
  20. Excel VBA初级系列培训--课时3

热门文章

  1. 安卓网络编程(Socket、WebView控件)
  2. 全国计算机等级考试题库二级C操作题100套(第97套)
  3. halcon轮廓擦除_halcon第十二讲,毛刺去除
  4. 帧同步_微信小游戏接入“熊孩子噩梦”健康系统 帧同步能力上线
  5. Http协议(3)—HTTP实体和编码
  6. gsettings命令使用简介
  7. 做事用人 用人做事_做事:构建我的第一个Web应用程序的经验教训
  8. python并行运算库_最佳并行绘图Python库简介:“ HiPlot”
  9. 3 年前端面经和他在创业公司的成长历程
  10. Android社会化分享详解