文章目录

  • celery简单使用
  • Celery执行异步任务
  • 多任务结构
  • Celery执行定时任务
  • 类似于contab的定时任务
  • Django中使用Celery

celery简单使用

安装celery

pip install celery消息中间件:RabbitMQ/Redisapp=Celery('任务名',backend='xxx',broker='xxx')

Celery执行异步任务

基本使用

创建项目celerytest

  • 创建py文件:celery_app_task.py
import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密码
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):return x+y
  • 创建py文件:add_task.py,添加任务
from celery_app_task import add
result = add.delay(4,5)
print(result.id)
  • 创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel
if __name__ == '__main__':cel.worker_main()# cel.worker_main(argv=['--loglevel=info')
  • 创建py文件:result.py,查看任务执行结果
from celery.result import AsyncResult
from celery_app_task import celasync = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')

执行 add_task.py,添加任务,并获取任务ID

执行 run.py ,或者执行命令:celery worker -A celery_app_task -l info

执行 result.py,检查任务状态并获取结果

多任务结构

pro_cel├── celery_task# celery相关文件夹│   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字│   └── tasks1.py    #  所有任务函数│   └── tasks2.py    #  所有任务函数├── check_result.py # 检查结果└── send_task.py    # 触发任务

celery.py

from celery import Celerycel = Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_task.tasks1','celery_task.tasks2'])# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import cel@cel.task
def test_celery(res):time.sleep(5)return "test_celery任务结果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):time.sleep(5)return "test_celery2任务结果:%s"%res

check_result.py

from celery.result import AsyncResult
from celery_task.celery import celasync = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除,执行完成,结果不会自动删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2# 立即告知celery去执行test_celery任务,并传入一个参数
result = test_celery.delay('第一个的执行')
print(result.id)
result = test_celery2.delay('第二个的执行')
print(result.id)

添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)

Celery执行定时任务

设定时间让celery执行一个任务

add_task.py

from celery_app_task import add
from datetime import datetime# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay# 使用apply_async并设定时间
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

类似于contab的定时任务

多任务结构中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontabcel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_task.tasks1','celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字随意命名'add-every-10-seconds': {# 执行tasks1下的test_celery函数'task': 'celery_task.tasks1.test_celery',# 每隔2秒执行一次# 'schedule': 1.0,# 'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=2),# 传递参数'args': ('test',)},# 'add-every-12-seconds': {#     'task': 'celery_task.tasks1.test_celery',#     每年4月11号,8点42分执行#     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),#     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),#     'args': (16, 16)# },
}

启动一个beat:celery beat -A celery_task -l info

启动work执行:celery worker -A celery_task -l info -P eventlet

Django中使用Celery

  • 在项目目录下创建celeryconfig.py
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=('app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30
  • 在app01目录下创建tasks.py
from celery import task
@task
def add(a,b):with open('a.text', 'a', encoding='utf-8') as f:f.write('a')print(a+b)
  • 视图函数views.py
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):# result=add.delay(2,3)ctime = datetime.now()# 默认用utc时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())from datetime import timedeltatime_delay = timedelta(seconds=5)task_time = utc_ctime + time_delayresult = add.apply_async(args=[4, 3], eta=task_time)print(result.id)return HttpResponse('ok')
  • settings.py
#INSTALLED_APPS = [
#    'djcelery',
#    'app01'
#]from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

参考链接

Celery的简单使用相关推荐

  1. celery定时任务简单使用

    celery介绍 Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务( async task )和定时任务( cr ...

  2. Celery框架简单实例

    Python 中可以使用Celery框架 Celery框架是提供异步任务处理的框架,有两种用法,一种:应用程式发布任务消息,后台Worker监听执行,好处在于不影响应用程序继续执行.第二种,设置定时执 ...

  3. django admin celery beat简单的定时任务管理平台

    一直都在想写一个这样平台,前端比较low,所以就使用了Django 自带的后端写了一个定时任务管理平台 具体结构如下 djangotask/ ├── app01 │?? ├── admin.py │? ...

  4. [转载] Flask+Celery+Redis简单操作

    参考链接: 使用Flask,Redis和Celery的异步任务 使用celery可以使得任务异步执行,在处理复杂或耗时长的任务时经常需要,废话不多说,直接进入正题. 结构树  安装各种包 pip in ...

  5. 在python下比celery更加简单的异步任务队列RQ

    前言: 关于python rq有几点没有描述清楚,修正后的文章在这. http://xiaorui.cc/2014/11/09/%E5%9F%BA%E4%BA%8Eredis%E5%8F%88%E6% ...

  6. Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

    1.什么是Celery? Celery 是芹菜 Celery 是基于Python实现的模块, 用于执行异步定时周期任务的 其结构的组成是由     1.用户任务 app     2.管道 broker ...

  7. python 任务调度 celery_斑马斑马-09-白云之上-python任务调度之celery

    一.celery简介 1:celery是什么 Celery是一个python开发的异步分布式任务调度模块. 2:celery是使用场景 异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短 ...

  8. celery学习笔记:celery安装,并运行第一个应用

    1.celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给 ...

  9. 任务队列:celery快速入门及django中celery的用法

    文章目录 一.celey的简介 1.1 celery的工作机制 1.2 安装celery(5.2版本) 二.celery快速入门 2.1 选择broker 2.2 celery的简单使用 2.2.1 ...

最新文章

  1. 局域网交换(交换机三大原理.基本配置)
  2. 009_html标准属性
  3. 【学术相关】CVPR2021最新接收论文合集!22个方向100+篇论文汇总|持续更新
  4. 应用市场高速下载以及网页端调起APP页面研究与实现
  5. Oracle就业课第六课之游标和触发器
  6. SOA ESB 微服务 浅析
  7. 同一字段降序个升序_5个打印小技巧,表格打印没烦恼
  8. Spring Boot——2分钟构建spring web mvc REST风格HelloWorld
  9. 对软件研发项目管理的深入探讨
  10. 散列函数之双重散列算法解决冲突问题
  11. JS 中的== 与 ===
  12. Windows下的 Redis 安装教程
  13. 21天学通C语言-学习笔记(1)
  14. 安微六安二中2021高考成绩查询,喜报!六安一中、六安二中、毛坦厂中学......高考“成绩单”出炉...
  15. 阿里云盘登录空白问题解决
  16. 解决bmp图片文件宽度不是4的倍数
  17. 本地文件共享到云服务器,本地和云服务器文件共享
  18. 刨根问底:linux中bash shell中SIGHUP和SIGTERM信号的处理
  19. 身份证工具-IdcardUtil案例
  20. debug 最全教程

热门文章

  1. 数据中心或许会成为未来5G最强大的技术支撑
  2. 数据中心外包面临法律考验
  3. python列表、集合、字典、元祖用途_Python-函数作用域和集合列表字典元祖
  4. Python语言学习:利用sorted对字典按照value进行递减排序,输出列表,并给定排名索引,组成新字典输出
  5. Interview:算法岗位面试—11.05下午上海某银行信息(总行,四大行之一)技术岗笔试记录
  6. CUMCM:05B  DVD在线租赁
  7. Py之chatterbot:python包之Chatterbot包/wxpy包简介、安装、使用方法之详细攻略
  8. OS_CORE.C(4)
  9. [嵌入式]Bootloader的作用
  10. Mosquitto配置----日志设置