celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用
0. Celery介绍
Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。
简单,易于使用和维护,有丰富的文档。
高效,单个celery进程每分钟可以处理数百万个任务。
灵活,celery中几乎每个部分都可以自定义扩展。
总之一句话,快
一. 官方文档
中文文档http://docs.jinkan.org/docs/celery/getting-started/index.html
二. 流程图
三. 异步回调使用
1.安装
-U是update的意思,有就进行更新,没有就安装,后面单独将celery运行起来就可以了
pip install redis==3.4.1 -i https://pypi.douban.com/simple/
pip install celery==4.4.2 -i https://pypi.douban.com/simple/
2. 使用目录介绍.
a. 使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。
b. app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
c. 一般celery任务目录直接放在项目的根目录下即可,路径:
├── mycelery/├── config.py # 配置文件├── __init__.py ├── main.py # 主程序└── sms/ # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录└── tasks.py # 任务的文件,名称必须是这个!!!
3. 主要文件
1.main.py
# 主程序
import os
from celery import Celery# 创建celery实例对象
app = Celery("dbj") # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做dbj# 把celery和django进行组合,需要识别和加载django的配置文件
import osos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbj.settings.dev')
# 如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import djangodjango.setup()# 通过app对象加载配置
app.config_from_object("mycelery.config")# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms",])
2. config.py
# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'# 2. redis无密码写法
# 任务队列的链接地址(变量名必须叫这个)
# broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
# result_backend = 'redis://127.0.0.1:6379/15'
3. 创建一个任务文件sms/tasks.py,并创建任务,代码:
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelery.main import app@app.task(name="send_sms") # name表示设置任务的名称,如果不填写,则默认使用函数名(路径)做为任务名
def send_sms():print("发送短信!!!")@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2():print("发送短信任务2!!!")
3.1 创建任务案例
from mycelery.main import app
from lyapi.libs.tencent.sms import send_sms_single# 任务列表,在main里面会被自动发现,但不会执行,只有send_sms.delay(code, phone) 才执行
@app.task(name='send_sms') # 给任务起的别名叫send_sms
def send_sms(phone_num, template_id, template_param_list):ret = send_sms_single(phone_num, template_id, template_param_list)# ret 还是成功信息/错误信息return ret
4. 控制台启动
# 启动Celery的命令
# 切换目录到mycelery根目录下启动
# celery -A mycelery.main worker --loglevel=info
5. 最后一步,views使用
import datetime
from auc_celery.sms.tasks import send_sms# celery -A auc_celery.main worker -l info -P eventlet
def text(request):"""生成id"""# 1.立即执行res = send_sms.delay(1, 2)"""# 2.定时任务(只执行一次,不是周期性定时任务)ctime = datetime.datetime.now()# 本地时间转换成utc时间utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())target_time = utc_ctime + datetime.timedelta(seconds=3)res = send_sms.apply_async(args=[6, 3], eta=target_time)"""return HttpResponse(res.id)def text2(request):"""根据id找任务结果及相关"""nid = request.GET.get('nid')# 导入包from celery.result import AsyncResult# 导入appfrom auc_celery.sms.tasks import app# 导入内置的函数,获取任务对象result_object = AsyncResult(id=nid, app=app)# 获取返回结果# data1 = result_object.get() # 3# 获取状态码# status = result_object.status # SUCCESS# # 取消任务# result_object.revoke()# print('success', result_object.successful())# print(data1)if result_object.successful():# print('success', result_object.successful())# 获取结果data = result_object.get()print("data", data)# 清空# result_object.forget()# 失败的逻辑处理elif result_object.failed():print('失败')else:print('未知')return HttpResponse('...')# 周期定时器
# celery -A auc_celery.main beat
# celery启动 -A--->文件名 worker--->启动工作者 -l info--->日志等级 -P eventlet--->解决在windows上报错的问题
# celery -A auc_celery.main worker -l info -P eventlet
"""
小贴士: 如果需要celery保存上次任务运行的时间在数据文件中,通过 -s 指定一个名为celerybeat-schedule的文件即可:
celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
"""
四. 如果报以下错误
Celery ValueError: not enough values to unpack (expected 3, got 0)
报错原因:
win10上运行celery4.x以上的版本就会出现这个问题
解决办法:
1.安装一个eventlet模块
pip3 install eventlet -i https://pypi.douban.com/simple/
2. 然后启动celery的时候加一个参数
celery -A <mymodule> worker -l info -P eventlet
启动案例:
celery -A mycelery.main worker -l info -P eventlet
输入到文件
celery -A cat_celery.main worker -l info -P eventlet --logfile=D:/pythonProject/cat/logs/celery.log
ps:<mymodule>指代的是,你的app所在的文件
详细描述错误原因
错误详细介绍https://blog.csdn.net/qq_52385631/article/details/122786364?spm=1001.2014.3001.5501
五. 定时器使用(配置同上,)
定时器官方文档http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
a. 在clery文件夹里面创建一个包,order
b. 在config.py配置中生成定时器
# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@127.0.0.1:6379/15'# 2. redis无密码写法
# 任务队列的链接地址(变量名必须叫这个)
# broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
# result_backend = 'redis://127.0.0.1:6379/15'# 定时器的配置文件
# 导入定时器
from celery.schedules import crontab
from .main import app# 定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行'check_order_outtime': {# 本次调度的任务'task': 'check_order', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递},
}
c. 定时器执行的任务 tasks.py(文件一定要叫这个名字)
from mycelery.main import app
from order.models import Order
import datetime@app.task(name="check_order")
def check_order():# 进行订单时间的校验# 支付时间 < 当前时间 + 时间段 ,把订单状态进行修改# 我们配置的1分钟执行1次return 'ok'
定时器执行的任务案例
from mycelery.main import app
from order.models import Order
import datetime@app.task(name="check_order")
def check_order():# 进行订单时间的校验# 支付时间 < 当前时间 + 时间段 ,把订单状态进行修改timer = datetime.datetime.now() + datetime.timedelta(days=1)Order.objects.filter(pay_time__lte=timer, order_status=0).update(order_status=3)return 'ok'
d. main.py 配置自动发现
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])
完整版 main.py
# 主程序
import os
from celery import Celery# 创建celery实例对象
app = Celery("xxxx") # celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做xxx# 把celery和django进行组合,需要识别和加载django的配置文件
import osos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lyapi.settings.dev')
# 如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上
import djangodjango.setup()# 通过app对象加载配置
app.config_from_object("mycelery.config")# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms", 'mycelery.order'])
六. 启动流程
# 启动Celery的命令
# 切换目录到mycelery根目录下启动
# 会报错
# celery -A mycelery.main worker --loglevel=info# 报错的类型
# Celery ValueError: not enough values to unpack (expected 3, got 0)# 解决办法
# pip install eventlet
# celery -A <mymodule> worker -l info -P eventlet# 定时器启动
# celery -A mycelery.main beat
# 启动celery
# celery -A mycelery.main worker -l info -P eventlet
七:多个定时器的配置
7.1 task
import datetime
import uuid
from auction_celery.main import appfrom api import models@app.task(name="check_auction")
def check_auction():"""拍卖场未开始--->预售"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=1, preview_start_time__lte=now,preview_end_time__gte=now)# 专场的商品状态修改for item in query_auction:item.status = 2item.save()# 专场的status修改query_auction.update(status=2)return 'ok0'@app.task(name="check_auction_pay")
def check_auction_pay():"""拍卖场预售--->开始"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=2, auction_start_time__lte=now,auction_end_time__gte=now)# 专场的商品状态修改for item in query_auction:item.status = 3item.save()# 专场的status修改query_auction.update(status=3)return 'ok00'@app.task(name="check_auction_end")
def check_auction_end():"""拍卖场开始--->结束"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 拍卖场开始时间<当前时间---status修改成拍卖中# 1. 查询到状态为未开拍且开拍时间<当前时间 的拍卖场query_auction = models.Auction.objects.filter(status=3, auction_end_time__lte=now, is_operation=False)# 专场的商品状态修改total_price = 0for item in query_auction:# item--- 拍卖场# 查询到所有结束的商品auction_item_query = item.auction_item.all().filter(status=3)for auction_item in auction_item_query:# auction_item 拍品# 根据价格判断是成交/流拍if auction_item.bid_record.all():max_price = auction_item.bid_record.all().filter(status=1).order_by('-id').first().pricetotal_price += max_price# 当前商品的价格添加auction_item.deal_price = max_priceauction_item.save()# 为出价最高者,创建订单userinfo = auction_item.bid_record.all().order_by('-id').first().usermodels.Order.objects.create(uid=str(uuid.uuid4()),userinfo=userinfo,auction=item,auction_item=auction_item,price=max_price,)# 把当前商品状态修改成成交item.auction_item.filter(status=3).update(status=4)else:item.auction_item.filter(status=3).update(status=5)# 当前专卖场的总价格,item.total_price = total_price# 把当前专场的状态修改成结束query_auction.update(status=4, is_operation=True)return 'ok'@app.task(name="check_auction_item_order")
def check_auction_item_order():"""生成的订单支付--状态修改"""# 当前时间,拍卖场开始时间,拍卖场结束时间now = datetime.datetime.now()# 当前时间 时间间隔 创建时间 create_time# 创建时间 < 当前时间 - 时间间隔# 时间间隔 + 创建时间 < 当前时间timer = now - datetime.timedelta(days=1)query_order = models.Order.objects.filter(status=1, create_time__lte=timer).update(status=3)return 'ok'# 支付时间 < 当前时间 + 时间段
7.2 config配置
# 配置文件
# 1. 有redis有密码
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://:foobared@106.14.42.253:8889/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://:foobared@106.14.42.253:8889/15'# 定时器的配置文件
# 导入定时器
from celery.schedules import crontab
from .main import app# 定时任务的调度列表,用于注册定时任务
app.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.# check_order_outtime -- 随意起的名字 定时任务的名字,叫什么都都行'check_auction_out_time': {# 本次调度的任务'task': 'check_auction', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_pay': {# 本次调度的任务'task': 'check_auction_pay', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_end': {# 本次调度的任务'task': 'check_auction_end', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递},'check_auction_item_order': {# 本次调度的任务'task': 'check_auction_item_order', # 这里的任务名称必须先到main.py中注册,如果写了别名,直接写别名就可以了# 定时任务的调度周期# 'schedule': crontab(minute=0, hour=0), # 每周凌晨00:00'schedule': crontab(), # 每分钟,没写秒数,那么就是每分钟的0秒开始# 'args': (16, 16), # 注意:任务就是一个函数,所以如果有参数则需要传递},
}
# # 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
# CELERY_TIMEZONE = "Asia/Shanghai"
# CELERY_ENABLE_UTC = Falseapp.conf.timezone = "Asia/Shanghai"
8. 时间celery配置
8.1 配置1(推荐)
# 配置时间,如果django,settings里面的时间,没有改,下面两行可以不写,因为django默认utc时间
app.conf.timezone = "Asia/Shanghai"
8.2 配置2(可能报错)
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
9. 定时器的时间crontab()
1.介绍
其中,crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:
minute:分钟,范围0-59;
hour:小时,范围0-23;
day_of_week:星期几,范围0-6。以星期天为开始,即0为星期天。这个星期几还可以使用英文缩写表示,例如“sun”表示星期天;
day_of_month:每月第几号,范围1-31;
month_of_year:月份,范围1-12。
a、默认参数
这些参数可以设置表达式,表达稍微复杂的设置。默认值都是"*"星号,代表任意时刻。即crontab()相当与:
含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。
crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')
b、具体某个值
上面提到这些参数的取值范围。我们可以直接设置某个值。例如:
即每小时的15分时刻执行一次任务。直接指定某个时刻。
crontab(minute=15)
以此类推可以设置每天0点0分时刻执行任务的设置如下:
crontab(minute=0, hour=0)
当然,也可以设置多个值。例如0分和30分执行一次任务:
这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。
crontab(minute='0,30')
c、设置范围
设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。
crontab(minute='*', hour='9-12')
这里*号是默认值,可以省略如下:
crontab(hour='9-12')
上面提到逗号是or逻辑关系。拓展一下,指定9点到12点和20点中每分钟执行任务:
crontab(hour='9-12,20')
crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:
from celery.task.schedules import crontab_parserr = crontab_parser(23, 0).parse('9-12,20')print(r)
其中,crontab_parse是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse输入表达式,可得到表达式的解析结果:
set([9, 10, 11, 12, 20])
d、设置间隔步长
假如我要设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:
crontab(day_of_month='1,3,5,7,9,11')
观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:
crontab(minute='*/2')crontab(minute='0-59/2') #效果等同上面
这个/号不是除以的意思。相当与range的第3个参数,例如:
range(0, 59+1, 2)
差不多crontab表达式就这些,多举几个例子:
#每2个小时中每分钟执行1次任务
crontab(hour='*/2')#每3个小时的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]点0分
crontab(minute=0, hour='*/3')#每3个小时或8点到12点的0分时刻执行1次任务
#即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
crontab(minute=0, hour='*/3,8-12')#每个季度的第1个月中,每天每分钟执行1次任务
#月份范围是1-12,每3个月为[1,4,7,10]
crontab(month_of_year='*/3')#每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')#每年5月11号的0点0分时刻执行1次任务
crontab(0, 0, day_of_month='11', month_of_year='5')
celery(分布式任务队列)介绍+在django中异步回调使用+定时任务的使用相关推荐
- python 分布式队列_〖Python〗-- Celery分布式任务队列
[Celery分布式任务队列] 一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步 ...
- Celery 分布式任务队列快速入门
Celery 分布式任务队列快速入门 本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置cel ...
- python Celery 分布式任务队列快速入门
本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...
- Python 第三方库之 Celery 分布式任务队列
一.Celery介绍和使用: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, ...
- Celery分布式任务队列的认识和基本操作
一.简单认识 Celery是由Python开发.简单.灵活.可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务.Celery侧重于实时操作,但对调度支持也很好 ...
- Python定时任务库Celery——分布式任务队列
文章目录 定时任务库对比 简介 安装 初试 进阶 项目结构 配置文件 Celery实例化 实时任务 定时任务 调用任务 启动定时任务 任务状态跟踪 递归调用 Celery配置 命令行参数 分布式集群部 ...
- Celery分布式任务队列学习记录
安装与环境配置 环境:Ubuntu18.04 安装celery root@ubuntu:/home/toohoo/learncelery# pip install celery==3.1.25 Col ...
- twisted应用中异步回调的方式及线程的应用
前言: 学习了golang的goruntine后,再回过头来看twisted网络编程库中的异步应用,没事琢磨下,以前搞过一个twisted做负载分发,性能差的要命,这几天再搞搞,看看能不能做少许提升. ...
- python 任务队列 huey_使用python的分布式任务队列huey实现任务的异步化
前言: 一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单. 这次算是原文的翻译了.... 一开始看到这个东西的时候,想看看有没有中文的资料,能立 ...
- 分布式任务队列 Celery — 应用基础
目录 文章目录 目录 前文列表 前言 Celery 的周期(定时)任务 Celery 的同步调用 Celery 结果储存 Celery 的监控 Celery 的调试 前文列表 分布式任务队列 Cele ...
最新文章
- 16 Java面试之 HTML
- 聂聪:数据科学让我为城市规划注入创新价值 | 优秀毕业生专访
- centos7 docker 安装 otter 注意事项
- php和python和java-python和java,php,c,c#,c++的对比
- 蚌埠粮食经济技师学院计算机,安徽粮食经济技师学院2020年有哪些专业
- Spring Boot自定义缓存注解
- 已知线性表最多可能有20个元素,存储每个元素需要8字节,存储每个指针需要4字节。当元素个数为( )时使用单链表比使用数组存储此线性表更加节约空间。
- 一种用css实现图片在父框中等比缩放并垂直居中的办法
- tf.group()用于组合多个操作
- Broken Keyboard (a.k.a. Beiju Text) UVA - 11988 (链表)
- Anchor 对象学习
- 浅议磁盘分区——从MBR到GPT
- oracle _db_block_write_batch,Oracle体系结构----实例的进程结构
- 一文彻底看懂成交量的本质
- vendor php,使用php composer时, 如何优雅修改vendor中第三方代码
- java资源文件路径_Java 中获取资源(文件)的路径问题总结
- radio男女选择取值
- cv::imread读不出图片的解决办法
- 随机变量的特征函数及应用
- HBase最佳实践-用好你的操作系统
热门文章
- 带通滤波器参数详细推导
- 2.5万字讲解DDD领域驱动设计,从理论到实践掌握DDD分层架构设计,赶紧收藏起来吧
- ROS实验笔记之——JCV-450无人机初入门
- E/art﹕Failed sending reply to debugger: Broken pipe的解决方法
- 物联网、大数据和云计算的基本关系和应用
- android街机模拟器,手机街机模拟器哪个好_安卓街机模拟器大全-66街机网
- sqli-labs(54-65)终章
- 学术论文写作之引言(Introduction)怎么写
- 拳皇97c语言编码,拳皇97金手指代码
- 乔巴机器人 番外篇_超神学院之暮光之眼