目录

  • Celery
  • 一、官方
  • 二、Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储
  • 三、使用场景
  • 四、Celery的安装配置
  • 五、两种celery任务结构:提倡用包管理,结构更清晰
  • 七、Celery执行异步任务包架构封装
  • 八、基本使用celery.py 基本配置tasks.py 添加任务add_task.py 添加立即、延迟任务get_result.py 获取结果
  • 九、高级使用celery.py 定时任务配置(循环的)tasks.pyget_result.py
  • 十、django中使用(更新轮播图案例)redis的配置接口缓存views.py启动服务celery.pytasks.py

Celery

一、官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

二、Celery异步任务框架

"""1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)2)celery服务为为其他项目服务提供异步解决任务需求的注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求人是一个独立运行的服务 | 医院也是一个独立运行的服务正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求"""

Celery架构图

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

三、使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

四、Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

五、两种celery任务结构:提倡用包管理,结构更清晰

# 如果 Celery对象:Celery(...) 是放在一个模块下的# 1)终端切换到该模块所在文件夹位置:scripts# 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info# 注:模块名随意# 如果 Celery对象:Celery(...) 是放在一个包下的# 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中# 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet# 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info# 注:包名随意

放在根目录下就行:

七、Celery执行异步任务

包架构封装

project    ├── celery_task  # celery包    │   ├── __init__.py # 包文件    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py    │   └── tasks.py    # 所有任务函数    ├── add_task.py  # 添加任务    └── get_result.py   # 获取结果

八、基本使用

celery.py 基本配置

# 1)创建app + 任务# 2)启动celery(app)服务:# 非windows# 命令:celery worker -A celery_task -l info# windows:# pip3 install eventlet# celery worker -A celery_task -l info -P eventlet# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本from celery import Celery# 无密码broker = 'redis://127.0.0.1:6379/1'    backend = 'redis://127.0.0.1:6379/2'# 有密码:broker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])'''broker : 任务仓库backend : 任务结果仓库include :任务(函数)所在文件'''

tasks.py 添加任务

from .celery import app@app.taskdef add(n1,n2):    res = n1+n2    print('n1+n2 = %s' % res)    return res@app.taskdef low(n1,n2):    res = n1-n2    print('n1-n2 = %s' % res)    return res

add_task.py 添加立即、延迟任务

from celery_task import tasks# delay  :添加立即任务# apply_async : 添加延迟任务# eta : 执行的utc时间# 添加立即执行任务t1 = tasks.add.delay(10, 20)t2 = tasks.low.delay(100, 50)print(t1.id)# 添加延迟任务from celery_package.tasks import jumpfrom datetime import datetime,timedelta# 秒def eta_second(second):    ctime = datetime.now()  # 当前时间    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())  # 当前UTC时间    time_delay = timedelta(seconds=second)  # 秒    return utc_ctime + time_delay  # 当前时间+往后延迟的秒# 天def eta_days(days):    ctime = datetime.now()  # 当前时间    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())  # 当前UTC时间    time_delay = timedelta(days=days)  # 天    return utc_ctime + time_delay  # 当前时间+往后延迟的天jump.apply_async(args=(20,5), eta=eta_second(10))  # 10秒后执行jump.apply_async(args=(20,5), eta=eta_days(1))  # 1天后执行

get_result.py 获取结果

from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__':    async = AsyncResult(id=id, app=app)    if async.successful():        result = async.get()        print(result)    elif async.failed():        print('任务失败')    elif async.status == 'PENDING':        print('任务等待中被执行')    elif async.status == 'RETRY':        print('任务异常后正在重试')    elif async.status == 'STARTED':        print('任务已经开始被执行')

九、高级使用

celery.py 定时任务配置(循环的)

特点:

添加任务的终端关闭之后,停止添加

celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务

# 1)创建app + 任务# 2)启动celery(app)服务:# 注):-A  表示相对路径,所以一定先进入celery_task所在包   -l 表示打印到日志 info 级别# 非windows# 命令:celery worker -A celery_task -l info# windows:# pip3 install eventlet# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celery# 无密码broker = 'redis://127.0.0.1:6379/1'    backend = 'redis://127.0.0.1:6379/2'# 有密码:broker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 自动任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = {    # 定时任务名字    'fall_task': {        'task': 'celery_task.tasks.fall',        'args':(30,20),        'schedule': timedelta(seconds=3),  # 3秒后执行        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点    }}'''fall_task:任务名自定义task:任务来源args:任务参数schedule:定时时间''''schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'''minute : 分钟hour :小时day_of_week :礼拜day_of_month:月month_of_year:年'''

tasks.py

from .celery import app@app.taskdef fall(n1,n2):    res = n1/n2    print('n1 /n2 = %s' % res)    return res

get_result.py

from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__':    async = AsyncResult(id=id, app=app)    if async.successful():        result = async.get()        print(result)    elif async.failed():        print('任务失败')    elif async.status == 'PENDING':        print('任务等待中被执行')    elif async.status == 'RETRY':        print('任务异常后正在重试')    elif async.status == 'STARTED':        print('任务已经开始被执行')

十、django中使用(更新轮播图案例)

最终达到的效果:根据定时任务来更新redis中的缓存。用户获取资源都是从redis缓存中获取。避免了数据库的压力

redis的配置

dev.py

# 缓存redis数据库配置CACHES = {    "default": {        "BACKEND": "django_redis.cache.RedisCache",        "LOCATION": "redis://127.0.0.1:6379/10",        "OPTIONS": {            "CLIENT_CLASS": "django_redis.client.DefaultClient",            "CONNECTION_POOL_KWARGS": {"max_connections": 100}, # 同时的并发量            "DECODE_RESPONSES": True,            "PASSWORD": "123",        }    }}

接口缓存

"""1)什么是接口的后台缓存前台访问后台接口,后台会优先从缓存(内存)中查找接口数据如果有数据,直接对前台响应缓存数据如果没有数据,与(mysql)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了2)什么的接口会进行接口缓存i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问ii)在一定时间内数据不会变化(或数据不变化)的接口iii)接口数据的时效性不是特别强(数据库数据发生变化了,不是立即同步给前台,验后时间同步给前台也没事)注:理论上所有接口都可以建立缓存,只要数据库与缓存数据同步及时3)如何实现接口缓存:主页轮播图接口"""

views.py

from rest_framework.viewsets import ModelViewSetfrom rest_framework import mixinsfrom . import models, serializersfrom django.conf import settingsfrom rest_framework.response import Responsefrom django.core.cache import cacheclass BannerViewSet(ModelViewSet, mixins.ListModelMixin):    queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]    serializer_class = serializers.BannerSerializer    # 有缓存走缓存,没有缓存走数据库,然后同步给缓存。接口自己实现    def list(self, request, *args, **kwargs):        banner_list = cache.get('banner_list')        if not banner_list:            print('走了数据库')            response = self.list(request, *args, **kwargs)            banner_list = response.data            cache.set('banner_list', banner_list, 86400)  # 存进缓存中,缓存配置了redis数据库        return Response(banner_list)

启动服务

'''1):先切换到celery_task所在的同级目录(一般为根目录下)2):开一个终端(启动服务):celery worker -A celery_task -l info -P eventlet3):再开一个终端(添加任务):celery beat -A celery_task -l info'''# 注):-A  表示相对路径,所以一定先进入celery_task所在包   -l 表示打印到日志 info 级别

celery.py

"""celery框架django项目工作流程1)加载django配置环境2)创建Celery框架对象app,配置broker和backend,得到的app就是worker3)给worker对应的app添加可处理的任务函数,用include配置给worker的app4)完成提供的任务的定时配置app.conf.beat_schedule5)启动celery服务,运行worker,执行任务6)启动beat服务,运行beat,添加任务重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下"""# 一、加载django配置环境import osos.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")# 二、加载celery配置环境from celery import Celerybroker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'# workerapp = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])  # 外面的包名和文件名,一般都是固定# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = {    # 定时任务名字    'update_banner_cache': {        'task': 'celery_task.tasks.update_banner_list',        'args': (),        'schedule': timedelta(seconds=10),  # 3秒一次        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点        # 'schedule': crontab(minute=0, day_of_week=1),  # 每周一早八点    }}'''minute : 分钟hour :小时day_of_week :礼拜day_of_month:月month_of_year:年''''''fall_task:任务名自定义task:任务来源args:任务参数schedule:定时时间(秒)'''

tasks.py

from .celery import appfrom django.core.cache import cachefrom home import models, serializersfrom django.conf import settings@app.taskdef update_banner_list():    queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]    banner_list = serializers.BannerSerializer(queryset, many=True).data    # 拿不到request对象,所以头像的连接base_url要自己组装    for banner in banner_list:        banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']    cache.set('banner_list', banner_list, 86400)    return True

选择了IT,必定终身学习

作者:Jeff

出处:http://dwz.date/aNfM

celery 停止任务_celery异步任务框架相关推荐

  1. celery的中文_celery异步任务框架

    目录 Celery 一.官方 二.Celery异步任务框架Celery架构图消息中间件任务执行单元任务结果存储 三.使用场景 四.Celery的安装配置 五.两种celery任务结构:提倡用包管理,结 ...

  2. celery 停止任务_celery 停止执行中 task

    原因 因为最近项目需求中需要提供对异步执行任务终止的功能,所以在寻找停止celery task任务的方法.这种需求以前没有碰到过,所以,只能求助于百度和google,但是找遍了资料,都没找到相关的能停 ...

  3. 框架 go_go异步任务框架machinery,嗖嗖的[视频]

    hi,这次介绍一个 go 的异步任务框架 machinery. 和 Python 生态里的 celery 类似.视频里演示了它的简单使用.当我们有一些cpu密集计算任务,延迟任务等, 可以使用异步任务 ...

  4. 异步爬虫框架与协程浅析

    异步爬虫框架与协程浅析 经典原文使用协成完成异步爬虫原文链接 根据分享原文链接. Python基于协程的实现,其实是利用了Python生成器的特性完成的,Python生成器的原理其实涉及到用户态绿色线 ...

  5. Sequelize 4.43.0 发布,基于 Nodejs 的异步 ORM 框架

    Sequelize 4.43.0 发布了,Sequelize 是一款基于 Nodejs 的异步 ORM 框架,它同时支持 PostgreSQL.MySQL.SQLite 和 MSSQL 多种数据库,很 ...

  6. android 学习随笔十二(网络:使用异步HttpClient框架)

    使用异步HttpClient框架发送get.post请求 在https://github.com/  搜索 asyn-http https://github.com/search?utf8=✓& ...

  7. AsyncQueryHandler 异步查询框架

    AsyncQueryHandler简介: 异步的查询操作帮助类,可以处理增删改(ContentProvider提供的数据) 使用场景: 在一般的应用中可以使用ContentProvider去操作数据库 ...

  8. [android] 异步http框架与实现原理

    介绍github上的异步http框架android-async-http loopj开发 获取AsyncHttpClient对象,通过new 调用AsyncHttpClient对象的get(url,r ...

  9. Disruptor是一个高性能的异步处理框架

    为什么80%的码农都做不了架构师?>>>    Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件- ...

最新文章

  1. Maven 的 Scope 区别,你知道吗?
  2. Table control中列隐藏实现方法
  3. LeetCode算法入门- Roman to Integer Integer to Roman -day8
  4. 047 一维数据的格式化和处理
  5. Linux线程的同步,linux线程同步
  6. OpenCore引导配置说明第十二版-基于OpenCore-0.6.5正式版
  7. 王守臣 | 文字不灭:“这边有个要饭的”
  8. 【hiho】2018ICPC北京赛区网络赛B Tomb Raider(暴力dfs)
  9. 【毕设教程】python区块链实现 - proof of work工作量证明共识算法
  10. tspl 重置打印机命令_命令行添加删除打印机
  11. 安卓APP源码和设计报告——小说阅读器
  12. AWS中国 Kubernetes 搭建指南
  13. cs224u GloVe词向量方法
  14. bmi计算 python_python tkinter bmi计算
  15. vue 移动端拨打电话
  16. 万向节锁--简单解释
  17. 大数据笔记(学习归纳)
  18. c语言 pow和sqrt注意
  19. 十三、Jmeter生成html报告
  20. 一个文科生的工程师之路

热门文章

  1. 牛客华为机试第5题python
  2. tensorflow,神经网络创建源码
  3. python文件流习题解析
  4. [译] Lenses:可组合函数式编程的 Getter 和 Setter(第十九部分)
  5. 利用T-SQL语句快速清理ZBLOG程序的SQL SERVER2012数据库内容
  6. 甲骨文推出低成本高速公共与混合云方案,矛头直指AWS
  7. 【计算几何】【分类讨论】Gym - 101173C - Convex Contour
  8. XOOM MZ606 刷机
  9. 数据研发岗位需要技能
  10. Scala安装时的坑