昨日回顾

1 redis的列表操作lpushllenlinsertlsetlremlpoplrange  # 使用它,自定义增量迭代blpop   # 分布式自定义增量迭代
2 redis通用方法-delete-exist-expire-rename-type3 redis管道-不支持事务,但是通过管道模拟实现-批量的多个命令一次性执行-pipline,管道----》实现事务4 django中集成redis-通用做法-写一个连接池-通过模块导入,单例,-通过模块-类装饰器-类的绑定方法-__new__-元类-conn=redis.Redis(connection_pool=POOL)-第二种:-pip install django-redis-配置cacheCACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}# "PASSWORD": "123",}}}-直接使用cachecache.set(key,value是任意数据类型,10*60)-from django_redis import get_redis_connection-conn=get_redis_connection(alias="default")
5 celery介绍-分布式异步任务框架-可以做的事-异步任务:(同步任务)-延迟任务:一段时间后,执行任务(函数)-定时任务:每隔多长时间执行任务-架构-提交任务(user)-消息队列(使用第三方:redis,rabbitmq)-任务执行单元(work:工人),可以运行在不同的节点上-结果存储:redis6 python中执行定时任务:APScheduler ----》公司里使用比较多

今日内容

0 celery简介,架构

1 celery:芹菜(跟芹菜没有任何关系)
2 python中的一个分布式异步任务框架-执行异步任务---(对立:同步任务):解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等-执行延时任务(5分钟后干一件事):解决延迟任务-执行定时任务:每天,隔几分钟,干什么事:解决周期(周期)任务,比如每天数据统计 (如果只想执行定时任务,可以用APScheduler定时框架)
3 解释
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.4 celery特点(了解)1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)2)celery服务为为其他项目服务提供异步解决任务需求的5 Celery架构Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成#消息中间件/消息队列
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等# 任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。# 任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

安装

pip install celery

1 celery的基本使用

1 写一个py文件import celery# 消息中间件(redis)broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db# 结果存储(redis)backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db# 实例化得到对象,指定中间件和结果存储app=celery.Celery('test',broker=broker,backend=backend)@app.taskdef add(a,b):return a+b@app.taskdef mul(a,b):return a*b2 提交任务(在其它文件中)from t_celery import add, mulres=add.delay(100,4) # 生成一个消息队列中的idprint(res)  # id号3 启动worker# 非windows平台:celery worker -A t_celery -l info# windows需要装eventlet模块:celery worker -A t_celery -l info -P eventlet4 查看执行结果from t_celery import appfrom celery.result import AsyncResult# 关键字,变量不能定义为关键字id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')

2 celery多任务结构

package_celery:     # 项目名celery_task     # celery包名__init__.py celery.py   # celery 的app,必须叫celeryorder_task.py # 任务user_task.py  # 任务result.py         # 结果查询submit_tast.py    # 提交任务# 运行worker(在package_celery目录下执行)celery worker -A celery_task -l info -P eventlet
# 提交任务from celery_task import order_task,user_task# 提交一个给用户发短信的任务res=user_task.send_sms.delay('18723454566')print(res)# 提交一个取消订单任务res=order_task.cancel_order.delay()print(res)# 真实应用场景-秒杀系统-不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁-提高并发量---》把同步做成异步---》使用celery-前端点击秒杀按钮,向后端发送秒杀请求---》同步操作-同步操作-请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状                   态                       -请求返回,告诉前端,秒杀成功-异步操作-请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如                      果够,要生成订单(mysql)-秒杀是否成功的结果还没有,直接返回了(返回任务id)-前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完                     成(带着任务id查)-如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行-又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功

3 高级使用之延时任务

# celery执行延时任务
## 第一种方式:2021年1月7日17点3分12秒发送短信
# from datetime import datetime
# # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
# v1 = datetime(2021, 1, 7, 17, 3, 12)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# res=user_task.send_sms.apply_async(args=['18977654332',],eta=v2)## 第二种方式:隔几秒后执行
from datetime import datetime
from datetime import timedelta
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
print(task_time)
res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)

4 高级使用之定时任务

# 在celery.py中配置# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {'send-msg':{'task': 'celery_task.user_task.send_sms',# 'schedule': timedelta(hours=24*10),# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点'args': ('18964352112',),}
}
# 启动beat,负责每隔3s提交一个任务
$ celery beat -A celery_task -l info
# 启动worker
$ celery worker -A celery_task -l info -P eventlet

5 django中使用celery

1 celery是独立的,跟框架没有关系
2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
3 目录celery_task__init__.pycelery.pyhome_task.pyorder_task.pyuser_task.pyluffyapi

5.1 home_task.py

from celery_task.celery import app@app.task
def update_banner():from django.core.cache import cachefrom django.conf import settingsfrom home import modelsfrom home import serializerbanners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]ser = serializer.BannerModelSerializer(instance=banners,many=True)banner_data=ser.data# 拿不到request对象,所以头像的连接base_url要自己组装for banner in banner_data:banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']cache.set('banner_data',banner_data)return True

5.2 celery.py

import celeryimport os
# 执行django配置文件,环境变量加入
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个dbbackend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个dbapp=celery.Celery('test',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task','celery_task.home_task'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {# 'send-msg':{#     'task': 'celery_task.user_task.send_sms',#     # 'schedule': timedelta(hours=24*10),#     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点#     'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点#     'args': ('18964352112',),# }'update-banner':{'task': 'celery_task.home_task.update_banner','schedule': timedelta(seconds=10),'args': (),}
}

views.py

from celery_task import user_task
from celery_task.celery import app
from celery.result import AsyncResultdef test_celery(request):res_id = request.GET.get('id')if res_id:res = AsyncResult(id=res_id, app=app)if res.successful():result = res.get()print(result)return HttpResponse('执行完成了,结果是:%s' % result)res = user_task.send_sms.delay('18276345221')return HttpResponse('任务号是:%s' % str(res))

6 首页轮播图定时更新

1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
2 首页轮播图加入了缓存
3 以后,如果你的接口,请求慢,第一反应就是先使用缓存4 使用celery定时更新缓存
class BannerView(GenericViewSet, ListModelMixin):queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]serializer_class = serializer.BannerModelSerializer# 改成,先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查def list(self, request, *args, **kwargs):# 如果缓存中有值,直接取出来返回,速度很快banner_data = cache.get('banner_data')if banner_data:print('走了缓存')return Response(data=banner_data)# 如果缓存中没有,再走数据,查出来,放到缓存中res = super().list(request, *args, **kwargs)# 把首页轮播图数据放到缓存中cache.set('banner_data', res.data)print('没走缓存')return res

定时任务:更新缓存

Celery_task/home_task

import celery
import os# 执行django配置文件,环境变量加入
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "lufeiapi.settings.dev")broker = 'redis://127.0.0.1:6379/1'  # 1 表示使用redis 1 这个dbbackend = 'redis://127.0.0.1:6379/2'  # 2 表示使用redis 2 这个dbapp = celery.Celery(broker=broker, backend=backend,include=['celery_task.home_task',
])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {# 'send-msg':{#     'task': 'celery_task.user_task.send_sms',#     # 'schedule': timedelta(hours=24*10),#     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点#     'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点#     'args': ('18964352112',),# }'update-banner':{'task': 'celery_task.home_task.update_banner','schedule': timedelta(seconds=10),'args': (),}
}

170-路飞11-分布式异步框架Celery的使用相关推荐

  1. python分布式任务调度_Python使用Celery分布式异步队列/任务调度(基于Redis) - pytorch中文网...

    今天使用爬虫有些耗时较长,需要使用任务调度,Celery是Python开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有Rabb ...

  2. Python 并行分布式框架 Celery

    Celery 官网:http://www.celeryproject.org Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index. ...

  3. 媲美celery的分布式调度框架funboost

    最近项目中缺乏一款分布式框架,在github上闲逛时,找到了一款分布式调度框架.按照国惯例先上链接:https://github.com/ydf0509/funboost.该框架是由国人ydf开发的, ...

  4. [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (11) - on spark - GLOO 方案 文章目录 [源码解析] 深度学习分布式训练框架 horovod (11) --- on spa ...

  5. 11月15日云栖精选夜读:分布式服务框架Dubbo疯狂更新!阿里开源要搞大事情?

    最近,开源社区发生了一件大事--使用最广的开源服务框架之一Dubbo低调重启维护,并且3个月连续发布了3个维护版本.这3个维护版本不仅解决了社区关心的一系列问题和需求,还让整个社区的活跃度得到了大幅提 ...

  6. 分布式服务框架 Zookeeper — 管理分布式环境中的数据

    FROM: http://www.superwu.cn/2014/11/26/1461 本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它涉及到了paxos算法.Z ...

  7. 分布式事物框架--EasyTransaction的入门介绍

    分布式事物框架--EasyTransaction的入门介绍 柔性事务,分布式事务,TCC,SAGA,可靠消息,最大努力交付消息,事务消息,补偿,全局事务,soft transaction, distr ...

  8. 分布式服务框架XXL-RPC

    <分布式服务框架XXL-RPC>      一.简介 1.1 概述 XXL-RPC 是一个分布式服务框架,提供稳定高性能的RPC远程服务调用功能.拥有"高性能.分布式.注册中心. ...

  9. Hasor-RSF —— 分布式服务框架

    一个高可用.高性能.轻量级的分布式服务框架.支持容灾.负载均衡.集群.一个典型的应用场景是,将同一个服务部署在多个Server上提供 request.response 消息通知.使用RSF可以点对点调 ...

最新文章

  1. PyTorch中的MIT ADE20K数据集的语义分割
  2. debug内exe文件复制到桌面无法打开_Qt打包生成exe步骤和无法定位程序输入点_gxx_personality_v0于动态链接库...
  3. java 加密服务器_Javascript端加密java服务端解密
  4. ------webkitformboundary
  5. 面试总结之人工智能AI(Artificial Intelligence)/ 机器学习(Machine Learning)
  6. 高等数学——二重积分的计算方法
  7. Kubernetes详解
  8. windows防火墙概述
  9. 微信小程序网络请求api中HTML格式问题
  10. 【评测】MP DNA提取试剂盒使用报告
  11. linux 服务器下查看防火墙
  12. 老旧小区安防难题成遗留“沉疴” 如何破解?
  13. [数据结构]第十一章-图论
  14. 开放式基金全景点评:净值全面攀升 关注老基金(ZZ)
  15. ​力扣解法汇总606-根据二叉树创建字符串
  16. 万象物语怎么在电脑上玩 万象物语电脑版玩法教程
  17. Shell脚本中cp使用*号提示No such file
  18. 手机查看html效果,使用IE浏览器查看页面手机端效果的方法
  19. 微信小程序实现pdf、word等格式文件上传的方法
  20. 过年回家啦用python写一个宝石消消乐的游戏哄小朋友

热门文章

  1. M2DGR数据集在一些SLAM框架上的配置与运行:ORB-SLAM系列、VINS-Mono、LOAM系列、FAST-LIO系列、hdl_graph_slam
  2. 图片加载库Coil详解
  3. 解决OleDbDataReader重新获取记录时,使用GetString()方法出错的问题
  4. Java 使用 commons-fileupload 实现文件上传工具类
  5. c#里的bindingsource和bindingnavigator
  6. 深度学习 AI 美颜系列:AI 发型管家(美颜相机发型管家算法解析)
  7. StopWatch统计耗时
  8. twitter_如何阻止所有Twitter烦人的电子邮件
  9. 全球及中国香水行业品牌消费调研与营销策略模式研究报告2022年
  10. 针织厂计算机社会实践报告,大学生寒假针织厂社会实践报告