celery 停止任务_celery异步任务框架
目录
- Celery
- 一、官方
- 二、Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储
- 三、使用场景
- 四、Celery的安装配置
- 五、两种celery任务结构:提倡用包管理,结构更清晰
- 七、Celery执行异步任务包架构封装
- 八、基本使用celery.py 基本配置tasks.py 添加任务add_task.py 添加立即、延迟任务get_result.py 获取结果
- 九、高级使用celery.py 定时任务配置(循环的)tasks.pyget_result.py
- 十、django中使用(更新轮播图案例)redis的配置接口缓存views.py启动服务celery.pytasks.py
Celery
一、官方
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
二、Celery异步任务框架
"""1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)2)celery服务为为其他项目服务提供异步解决任务需求的注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求人是一个独立运行的服务 | 医院也是一个独立运行的服务正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求"""
Celery架构图
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
三、使用场景
异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务
四、Celery的安装配置
pip install celery
消息中间件:RabbitMQ/Redis
app=Celery('任务名', broker='xxx', backend='xxx')
五、两种celery任务结构:提倡用包管理,结构更清晰
# 如果 Celery对象:Celery(...) 是放在一个模块下的# 1)终端切换到该模块所在文件夹位置:scripts# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info# 注:模块名随意# 如果 Celery对象:Celery(...) 是放在一个包下的# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info# 注:包名随意
放在根目录下就行:
七、Celery执行异步任务
包架构封装
project ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
八、基本使用
celery.py 基本配置
# 1)创建app + 任务# 2)启动celery(app)服务:# 非windows# 命令:celery worker -A celery_task -l info# windows:# pip3 install eventlet# celery worker -A celery_task -l info -P eventlet# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本from celery import Celery# 无密码broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2'# 有密码:broker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])'''broker : 任务仓库backend : 任务结果仓库include :任务(函数)所在文件'''
tasks.py 添加任务
from .celery import app@app.taskdef add(n1,n2): res = n1+n2 print('n1+n2 = %s' % res) return res@app.taskdef low(n1,n2): res = n1-n2 print('n1-n2 = %s' % res) return res
add_task.py 添加立即、延迟任务
from celery_task import tasks# delay :添加立即任务# apply_async : 添加延迟任务# eta : 执行的utc时间# 添加立即执行任务t1 = tasks.add.delay(10, 20)t2 = tasks.low.delay(100, 50)print(t1.id)# 添加延迟任务from celery_package.tasks import jumpfrom datetime import datetime,timedelta# 秒def eta_second(second): ctime = datetime.now() # 当前时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间 time_delay = timedelta(seconds=second) # 秒 return utc_ctime + time_delay # 当前时间+往后延迟的秒# 天def eta_days(days): ctime = datetime.now() # 当前时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 当前UTC时间 time_delay = timedelta(days=days) # 天 return utc_ctime + time_delay # 当前时间+往后延迟的天jump.apply_async(args=(20,5), eta=eta_second(10)) # 10秒后执行jump.apply_async(args=(20,5), eta=eta_days(1)) # 1天后执行
get_result.py 获取结果
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
九、高级使用
celery.py 定时任务配置(循环的)
特点:
添加任务的终端关闭之后,停止添加
celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务
# 1)创建app + 任务# 2)启动celery(app)服务:# 注):-A 表示相对路径,所以一定先进入celery_task所在包 -l 表示打印到日志 info 级别# 非windows# 命令:celery worker -A celery_task -l info# windows:# pip3 install eventlet# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celery# 无密码broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2'# 有密码:broker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 自动任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = { # 定时任务名字 'fall_task': { 'task': 'celery_task.tasks.fall', 'args':(30,20), 'schedule': timedelta(seconds=3), # 3秒后执行 # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 }}'''fall_task:任务名自定义task:任务来源args:任务参数schedule:定时时间''''schedule': crontab(hour=8, day_of_week=1), # 每周一早八点'''minute : 分钟hour :小时day_of_week :礼拜day_of_month:月month_of_year:年'''
tasks.py
from .celery import app@app.taskdef fall(n1,n2): res = n1/n2 print('n1 /n2 = %s' % res) return res
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
十、django中使用(更新轮播图案例)
最终达到的效果:根据定时任务来更新redis中的缓存。用户获取资源都是从redis缓存中获取。避免了数据库的压力
redis的配置
dev.py
# 缓存redis数据库配置CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", "LOCATION": "redis://127.0.0.1:6379/10", "OPTIONS": { "CLIENT_CLASS": "django_redis.client.DefaultClient", "CONNECTION_POOL_KWARGS": {"max_connections": 100}, # 同时的并发量 "DECODE_RESPONSES": True, "PASSWORD": "123", } }}
接口缓存
"""1)什么是接口的后台缓存前台访问后台接口,后台会优先从缓存(内存)中查找接口数据如果有数据,直接对前台响应缓存数据如果没有数据,与(mysql)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了2)什么的接口会进行接口缓存i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问ii)在一定时间内数据不会变化(或数据不变化)的接口iii)接口数据的时效性不是特别强(数据库数据发生变化了,不是立即同步给前台,验后时间同步给前台也没事)注:理论上所有接口都可以建立缓存,只要数据库与缓存数据同步及时3)如何实现接口缓存:主页轮播图接口"""
views.py
from rest_framework.viewsets import ModelViewSetfrom rest_framework import mixinsfrom . import models, serializersfrom django.conf import settingsfrom rest_framework.response import Responsefrom django.core.cache import cacheclass BannerViewSet(ModelViewSet, mixins.ListModelMixin): queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT] serializer_class = serializers.BannerSerializer # 有缓存走缓存,没有缓存走数据库,然后同步给缓存。接口自己实现 def list(self, request, *args, **kwargs): banner_list = cache.get('banner_list') if not banner_list: print('走了数据库') response = self.list(request, *args, **kwargs) banner_list = response.data cache.set('banner_list', banner_list, 86400) # 存进缓存中,缓存配置了redis数据库 return Response(banner_list)
启动服务
'''1):先切换到celery_task所在的同级目录(一般为根目录下)2):开一个终端(启动服务):celery worker -A celery_task -l info -P eventlet3):再开一个终端(添加任务):celery beat -A celery_task -l info'''# 注):-A 表示相对路径,所以一定先进入celery_task所在包 -l 表示打印到日志 info 级别
celery.py
"""celery框架django项目工作流程1)加载django配置环境2)创建Celery框架对象app,配置broker和backend,得到的app就是worker3)给worker对应的app添加可处理的任务函数,用include配置给worker的app4)完成提供的任务的定时配置app.conf.beat_schedule5)启动celery服务,运行worker,执行任务6)启动beat服务,运行beat,添加任务重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下"""# 一、加载django配置环境import osos.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")# 二、加载celery配置环境from celery import Celerybroker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'# workerapp = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 外面的包名和文件名,一般都是固定# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = { # 定时任务名字 'update_banner_cache': { 'task': 'celery_task.tasks.update_banner_list', 'args': (), 'schedule': timedelta(seconds=10), # 3秒一次 # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 # 'schedule': crontab(minute=0, day_of_week=1), # 每周一早八点 }}'''minute : 分钟hour :小时day_of_week :礼拜day_of_month:月month_of_year:年''''''fall_task:任务名自定义task:任务来源args:任务参数schedule:定时时间(秒)'''
tasks.py
from .celery import appfrom django.core.cache import cachefrom home import models, serializersfrom django.conf import settings@app.taskdef update_banner_list(): queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT] banner_list = serializers.BannerSerializer(queryset, many=True).data # 拿不到request对象,所以头像的连接base_url要自己组装 for banner in banner_list: banner['image'] = 'http://127.0.0.1:8000%s' % banner['image'] cache.set('banner_list', banner_list, 86400) return True
选择了IT,必定终身学习
作者:Jeff
出处:http://dwz.date/aNfM
celery 停止任务_celery异步任务框架相关推荐
- celery的中文_celery异步任务框架
目录 Celery 一.官方 二.Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储 三.使用场景 四.Celery的安装配置 五.两种celery任务结构:提倡用包管理,结 ...
- celery 停止任务_celery 停止执行中 task
原因 因为最近项目需求中需要提供对异步执行任务终止的功能,所以在寻找停止celery task任务的方法.这种需求以前没有碰到过,所以,只能求助于百度和google,但是找遍了资料,都没找到相关的能停 ...
- 框架 go_go异步任务框架machinery,嗖嗖的[视频]
hi,这次介绍一个 go 的异步任务框架 machinery. 和 Python 生态里的 celery 类似.视频里演示了它的简单使用.当我们有一些cpu密集计算任务,延迟任务等, 可以使用异步任务 ...
- 异步爬虫框架与协程浅析
异步爬虫框架与协程浅析 经典原文使用协成完成异步爬虫原文链接 根据分享原文链接. Python基于协程的实现,其实是利用了Python生成器的特性完成的,Python生成器的原理其实涉及到用户态绿色线 ...
- Sequelize 4.43.0 发布,基于 Nodejs 的异步 ORM 框架
Sequelize 4.43.0 发布了,Sequelize 是一款基于 Nodejs 的异步 ORM 框架,它同时支持 PostgreSQL.MySQL.SQLite 和 MSSQL 多种数据库,很 ...
- android 学习随笔十二(网络:使用异步HttpClient框架)
使用异步HttpClient框架发送get.post请求 在https://github.com/ 搜索 asyn-http https://github.com/search?utf8=✓& ...
- AsyncQueryHandler 异步查询框架
AsyncQueryHandler简介: 异步的查询操作帮助类,可以处理增删改(ContentProvider提供的数据) 使用场景: 在一般的应用中可以使用ContentProvider去操作数据库 ...
- [android] 异步http框架与实现原理
介绍github上的异步http框架android-async-http loopj开发 获取AsyncHttpClient对象,通过new 调用AsyncHttpClient对象的get(url,r ...
- Disruptor是一个高性能的异步处理框架
为什么80%的码农都做不了架构师?>>> Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件- ...
最新文章
- Maven 的 Scope 区别,你知道吗?
- Table control中列隐藏实现方法
- LeetCode算法入门- Roman to Integer Integer to Roman -day8
- 047 一维数据的格式化和处理
- Linux线程的同步,linux线程同步
- OpenCore引导配置说明第十二版-基于OpenCore-0.6.5正式版
- 王守臣 | 文字不灭:“这边有个要饭的”
- 【hiho】2018ICPC北京赛区网络赛B Tomb Raider(暴力dfs)
- 【毕设教程】python区块链实现 - proof of work工作量证明共识算法
- tspl 重置打印机命令_命令行添加删除打印机
- 安卓APP源码和设计报告——小说阅读器
- AWS中国 Kubernetes 搭建指南
- cs224u GloVe词向量方法
- bmi计算 python_python tkinter bmi计算
- vue 移动端拨打电话
- 万向节锁--简单解释
- 大数据笔记(学习归纳)
- c语言 pow和sqrt注意
- 十三、Jmeter生成html报告
- 一个文科生的工程师之路