0. Celery介绍

Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

简单,易于使用和维护,有丰富的文档。

高效,单个celery进程每分钟可以处理数百万个任务。

灵活,celery中几乎每个部分都可以自定义扩展。

总之一句话,快

一. 官方文档

中文文档http://docs.jinkan.org/docs/celery/getting-started/index.html

二. 流程图

三. 异步回调使用

1.安装

 -U是update的意思,有就进行更新,没有就安装,后面单独将celery运行起来就可以了

pip install redis==3.4.1 -i https://pypi.douban.com/simple/
pip install celery==4.4.2 -i https://pypi.douban.com/simple/

2. 使用目录介绍.

a. 使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。

b. app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

c. 一般celery任务目录直接放在项目的根目录下即可,路径:

├── mycelery/├── config.py     # 配置文件├── __init__.py   ├── main.py       # 主程序└── sms/          # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录└── tasks.py  # 任务的文件,名称必须是这个!!!

3. 主要文件

1.main.py

# 主程序
import os
from celery import Celery# 创建celery实例对象
app = Celery("dbj")  # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做dbj# 把celery和django进行组合,需要识别和加载django的配置文件
import osos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbj.settings.dev')
# 如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import djangodjango.setup()# 通过app对象加载配置
app.config_from_object("mycelery.config")# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms",])

2. config.py

# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'# 2. redis无密码写法
# 任务队列的链接地址(变量名必须叫这个)
# broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
# result_backend = 'redis://127.0.0.1:6379/15'

3. 创建一个任务文件sms/tasks.py,并创建任务,代码:

# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelery.main import app@app.task(name="send_sms")  # name表示设置任务的名称,如果不填写,则默认使用函数名(路径)做为任务名
def send_sms():print("发送短信!!!")@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2():print("发送短信任务2!!!")

3.1 创建任务案例

from mycelery.main import app
from lyapi.libs.tencent.sms import send_sms_single# 任务列表,在main里面会被自动发现,但不会执行,只有send_sms.delay(code, phone) 才执行
@app.task(name='send_sms')  # 给任务起的别名叫send_sms
def send_sms(phone_num, template_id, template_param_list):ret = send_sms_single(phone_num, template_id, template_param_list)# ret 还是成功信息/错误信息return ret

4. 控制台启动

# 启动Celery的命令
# 切换目录到mycelery根目录下启动
# celery -A mycelery.main worker --loglevel=info

5. 最后一步,views使用

import datetime
from auc_celery.sms.tasks import send_sms# celery -A auc_celery.main worker -l info -P eventlet
def text(request):"""生成id"""# 1.立即执行res = send_sms.delay(1, 2)"""# 2.定时任务(只执行一次,不是周期性定时任务)ctime = datetime.datetime.now()# 本地时间转换成utc时间utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())target_time = utc_ctime + datetime.timedelta(seconds=3)res = send_sms.apply_async(args=[6, 3], eta=target_time)"""return HttpResponse(res.id)def text2(request):"""根据id找任务结果及相关"""nid = request.GET.get('nid')# 导入包from celery.result import AsyncResult# 导入appfrom auc_celery.sms.tasks import app# 导入内置的函数,获取任务对象result_object = AsyncResult(id=nid, app=app)# 获取返回结果# data1 = result_object.get()  # 3# 获取状态码# status = result_object.status  # SUCCESS# # 取消任务# result_object.revoke()# print('success', result_object.successful())# print(data1)if result_object.successful():# print('success', result_object.successful())# 获取结果data = result_object.get()print("data", data)# 清空# result_object.forget()# 失败的逻辑处理elif result_object.failed():print('失败')else:print('未知')return HttpResponse('...')# 周期定时器
# celery -A auc_celery.main beat
# celery启动 -A--->文件名 worker--->启动工作者  -l info--->日志等级  -P eventlet--->解决在windows上报错的问题
# celery  -A auc_celery.main worker -l info -P eventlet
"""
小贴士: 如果需要celery保存上次任务运行的时间在数据文件中,通过 -s 指定一个名为celerybeat-schedule的文件即可:
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""

四. 如果报以下错误

Celery ValueError: not enough values to unpack (expected 3, got 0)

报错原因:

win10上运行celery4.x以上的版本就会出现这个问题

解决办法:

1.安装一个eventlet模块

pip3 install eventlet -i https://pypi.douban.com/simple/

2. 然后启动celery的时候加一个参数

celery -A <mymodule> worker -l info -P eventlet

启动案例:

celery -A mycelery.main worker -l info -P eventlet

输入到文件

celery -A cat_celery.main worker -l info -P eventlet --logfile=D:/pythonProject/cat/logs/celery.log

ps:<mymodule>指代的是,你的app所在的文件

详细描述错误原因

错误详细介绍https://blog.csdn.net/qq_52385631/article/details/122786364?spm=1001.2014.3001.5501

五. 定时器使用(配置同上,)

定时器官方文档http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

a. 在clery文件夹里面创建一个包,order

 b. 在config.py配置中生成定时器

# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'# 2. redis无密码写法
# 任务队列的链接地址(变量名必须叫这个)
# broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
# result_backend = 'redis://127.0.0.1:6379/15'# 定时器的配置文件
# 导入定时器
from celery.schedules import crontab
from .main import app# 定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行'check_order_outtime': {# 本次调度的任务'task': 'check_order',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递},
}

c. 定时器执行的任务 tasks.py(文件一定要叫这个名字)

from mycelery.main import app
from order.models import Order
import datetime@app.task(name="check_order")
def check_order():# 进行订单时间的校验#  支付时间 < 当前时间 + 时间段 ,把订单状态进行修改# 我们配置的1分钟执行1次return 'ok'

定时器执行的任务案例

from mycelery.main import app
from order.models import Order
import datetime@app.task(name="check_order")
def check_order():# 进行订单时间的校验#  支付时间 < 当前时间 + 时间段 ,把订单状态进行修改timer = datetime.datetime.now() + datetime.timedelta(days=1)Order.objects.filter(pay_time__lte=timer, order_status=0).update(order_status=3)return 'ok'

d. main.py 配置自动发现

app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])

完整版 main.py

# 主程序
import os
from celery import Celery# 创建celery实例对象
app = Celery("xxxx")  # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做xxx# 把celery和django进行组合,需要识别和加载django的配置文件
import osos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lyapi.settings.dev')
# 如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import djangodjango.setup()# 通过app对象加载配置
app.config_from_object("mycelery.config")# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])

六. 启动流程

# 启动Celery的命令
# 切换目录到mycelery根目录下启动
# 会报错
# celery -A mycelery.main worker --loglevel=info# 报错的类型
# Celery ValueError: not enough values to unpack (expected 3, got 0)# 解决办法
# pip install eventlet
# celery -A <mymodule> worker -l info -P eventlet# 定时器启动
# celery -A mycelery.main beat
# 启动celery
# celery -A mycelery.main worker -l info -P eventlet

七:多个定时器的配置

7.1 task

import datetime
import uuid
from auction_celery.main import appfrom api import models@app.task(name="check_auction")
def check_auction():"""拍卖场未开始--->预售"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=1, preview_start_time__lte=now,preview_end_time__gte=now)# 专场的商品状态修改for item in query_auction:item.status = 2item.save()# 专场的status修改query_auction.update(status=2)return 'ok0'@app.task(name="check_auction_pay")
def check_auction_pay():"""拍卖场预售--->开始"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=2, auction_start_time__lte=now,auction_end_time__gte=now)# 专场的商品状态修改for item in query_auction:item.status = 3item.save()# 专场的status修改query_auction.update(status=3)return 'ok00'@app.task(name="check_auction_end")
def check_auction_end():"""拍卖场开始--->结束"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=3, auction_end_time__lte=now, is_operation=False)# 专场的商品状态修改total_price = 0for item in query_auction:# item--- 拍卖场# 查询到所有结束的商品auction_item_query = item.auction_item.all().filter(status=3)for auction_item in auction_item_query:# auction_item 拍品# 根据价格判断是成交/流拍if auction_item.bid_record.all():max_price = auction_item.bid_record.all().filter(status=1).order_by('-id').first().pricetotal_price += max_price# 当前商品的价格添加auction_item.deal_price = max_priceauction_item.save()# 为出价最高者,创建订单userinfo = auction_item.bid_record.all().order_by('-id').first().usermodels.Order.objects.create(uid=str(uuid.uuid4()),userinfo=userinfo,auction=item,auction_item=auction_item,price=max_price,)# 把当前商品状态修改成成交item.auction_item.filter(status=3).update(status=4)else:item.auction_item.filter(status=3).update(status=5)# 当前专卖场的总价格,item.total_price = total_price# 把当前专场的状态修改成结束query_auction.update(status=4, is_operation=True)return 'ok'@app.task(name="check_auction_item_order")
def check_auction_item_order():"""生成的订单支付--状态修改"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 当前时间 时间间隔 创建时间 create_time# 创建时间  < 当前时间 - 时间间隔# 时间间隔 + 创建时间 < 当前时间timer = now - datetime.timedelta(days=1)query_order = models.Order.objects.filter(status=1, create_time__lte=timer).update(status=3)return 'ok'#  支付时间 < 当前时间 + 时间段

7.2 config配置

# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@106.14.42.253:8889/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@106.14.42.253:8889/15'# 定时器的配置文件
# 导入定时器
from celery.schedules import crontab
from .main import app# 定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行'check_auction_out_time': {# 本次调度的任务'task': 'check_auction',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_pay': {# 本次调度的任务'task': 'check_auction_pay',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_end': {# 本次调度的任务'task': 'check_auction_end',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_item_order': {# 本次调度的任务'task': 'check_auction_item_order',  # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0),   # 每周凌晨00:00'schedule': crontab(),  # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16),  # 注意:任务就是一个函数,所以如果有参数则需要传递},
}
# # 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
# CELERY_TIMEZONE = "Asia/Shanghai"
# CELERY_ENABLE_UTC = Falseapp.conf.timezone = "Asia/Shanghai"

8. 时间celery配置

8.1 配置1(推荐)

# 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
app.conf.timezone = "Asia/Shanghai"

8.2 配置2(可能报错)

CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False

9. 定时器的时间crontab()

1.介绍

其中,crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:

minute:分钟,范围0-59;

hour:小时,范围0-23;

day_of_week:星期几,范围0-6。以星期天为开始,即0为星期天。这个星期几还可以使用英文缩写表示,例如“sun”表示星期天;

day_of_month:每月第几号,范围1-31;

month_of_year:月份,范围1-12。

a、默认参数

这些参数可以设置表达式,表达稍微复杂的设置。默认值都是"*"星号,代表任意时刻。即crontab()相当与:

含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。

crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')

b、具体某个值

上面提到这些参数的取值范围。我们可以直接设置某个值。例如:

即每小时的15分时刻执行一次任务。直接指定某个时刻。

crontab(minute=15)

以此类推可以设置每天0点0分时刻执行任务的设置如下:

crontab(minute=0, hour=0)

当然,也可以设置多个值。例如0分和30分执行一次任务:

这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。

crontab(minute='0,30')

c、设置范围

设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。

 crontab(minute='*', hour='9-12')

这里*号是默认值,可以省略如下:

 crontab(hour='9-12')

上面提到逗号是or逻辑关系。拓展一下,指定9点到12点和20点中每分钟执行任务:

 crontab(hour='9-12,20')

crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:

 from celery.task.schedules import crontab_parserr = crontab_parser(23, 0).parse('9-12,20')print(r)

其中,crontab_parse是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse输入表达式,可得到表达式的解析结果:

 set([9, 10, 11, 12, 20])

d、设置间隔步长

假如我要设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:

crontab(day_of_month='1,3,5,7,9,11')

观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:

 crontab(minute='*/2')crontab(minute='0-59/2') #效果等同上面

这个/号不是除以的意思。相当与range的第3个参数,例如:

 range(0, 59+1, 2)

差不多crontab表达式就这些,多举几个例子:

#每2个小时中每分钟执行1次任务
crontab(hour='*/2')#每3个小时的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]点0分
crontab(minute=0, hour='*/3')#每3个小时或8点到12点的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
crontab(minute=0, hour='*/3,8-12')#每个季度的第1个月中,每天每分钟执行1次任务
#月份范围是1-12,每3个月为[1,4,7,10]
crontab(month_of_year='*/3')#每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')#每年5月11号的0点0分时刻执行1次任务
crontab(0, 0, day_of_month='11', month_of_year='5')

celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用相关推荐

  1. python 分布式队列_〖Python〗-- Celery分布式任务队列

    [Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...

  2. Celery 分布式任务队列快速入门

    Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...

  3. python Celery 分布式任务队列快速入门

    本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...

  4. Python 第三方库之 Celery 分布式任务队列

    一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...

  5. Celery分布式任务队列的认识和基本操作

    一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...

  6. Python定时任务库Celery——分布式任务队列

    文章目录 定时任务库对比 简介 安装 初试 进阶 项目结构 配置文件 Celery实例化 实时任务 定时任务 调用任务 启动定时任务 任务状态跟踪 递归调用 Celery配置 命令行参数 分布式集群部 ...

  7. Celery分布式任务队列学习记录

    安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...

  8. twisted应用中异步回调的方式及线程的应用

    前言: 学习了golang的goruntine后,再回过头来看twisted网络编程库中的异步应用,没事琢磨下,以前搞过一个twisted做负载分发,性能差的要命,这几天再搞搞,看看能不能做少许提升. ...

  9. python 任务队列 huey_使用python的分布式任务队列huey实现任务的异步化

    前言: 一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单. 这次算是原文的翻译了....  一开始看到这个东西的时候,想看看有没有中文的资料,能立 ...

  10. 分布式任务队列 Celery — 应用基础

    目录 文章目录 目录 前文列表 前言 Celery 的周期(定时)任务 Celery 的同步调用 Celery 结果储存 Celery 的监控 Celery 的调试 前文列表 分布式任务队列 Cele ...

最新文章

  1. 16 Java面试之 HTML
  2. 聂聪:数据科学让我为城市规划注入创新价值 | 优秀毕业生专访
  3. centos7 docker 安装 otter 注意事项
  4. php和python和java-python和java,php,c,c#,c++的对比
  5. 蚌埠粮食经济技师学院计算机,安徽粮食经济技师学院2020年有哪些专业
  6. Spring Boot自定义缓存注解
  7. 已知线性表最多可能有20个元素,存储每个元素需要8字节,存储每个指针需要4字节。当元素个数为( )时使用单链表比使用数组存储此线性表更加节约空间。
  8. 一种用css实现图片在父框中等比缩放并垂直居中的办法
  9. tf.group()用于组合多个操作
  10. Broken Keyboard (a.k.a. Beiju Text) UVA - 11988 (链表)
  11. Anchor 对象学习
  12. 浅议磁盘分区——从MBR到GPT
  13. oracle _db_block_write_batch,Oracle体系结构----实例的进程结构
  14. 一文彻底看懂成交量的本质
  15. vendor php,使用php composer时, 如何优雅修改vendor中第三方代码
  16. java资源文件路径_Java 中获取资源(文件)的路径问题总结
  17. radio男女选择取值
  18. cv::imread读不出图片的解决办法
  19. 随机变量的特征函数及应用
  20. HBase最佳实践-用好你的操作系统

热门文章

  1. 带通滤波器参数详细推导
  2. 2.5万字讲解DDD领域驱动设计,从理论到实践掌握DDD分层架构设计,赶紧收藏起来吧
  3. ROS实验笔记之——JCV-450无人机初入门
  4. E/art﹕Failed sending reply to debugger: Broken pipe的解决方法
  5. 物联网、大数据和云计算的基本关系和应用
  6. android街机模拟器,手机街机模拟器哪个好_安卓街机模拟器大全-66街机网
  7. sqli-labs(54-65)终章
  8. 学术论文写作之引言(Introduction)怎么写
  9. 拳皇97c语言编码,拳皇97金手指代码
  10. 乔巴机器人 番外篇_超神学院之暮光之眼