Celery的实践指南
celery原理:
celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。
典型的场景为:
  1. 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
  2. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。
实践中的典型场景:
  1. 简单的定时任务:

    1. 替换crontab的celery写法:

      1. from celery import Celery
        from celery.schedules import crontab

        app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

        app.conf.update(CELERYBEAT_SCHEDULE = {
            "add": {
                "task": "celery_demo.add",
                "schedule": crontab(minute="*"),
                "args": (16, 16)
            },
        })

        @app.task
        def add(x, y):
            return x + y

    2. 运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
      1. `celery -A celery_demo worker`
    3. 运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
      1. `celery -A celery_demo beat`
    4. 安装并运行flower,方便监控task的运行状态
      1. `celery flower -A celery_demo`
      2. 或者设置登录密码 `
        celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
  2. 多同步任务-链式任务-
  3. 失败自动重试的task
    1. 失败重试方法: 将task代码函数参数增加self,同时绑定bind。
    2. demo代码:
      1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
        def my_task_A(self):
            try:
                print("doing stuff here...")
            except SomeNetworkException as e:
                print("maybe do some clenup here....")
                self.retry(e)
    3. 自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
  4. 派发到不同Queue队列的task
    1. 一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。

      1. 比如:把queue的exchange和routing_key配置成通用模式:
      2. 再定义task的routing_key的名称:
    2. 可用的不同exchange策略:
      1. direct:直接根据定义routing_key
      2. topic:exchange会根据通配符来将一个消息推送到多个queue。
      3. fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
    3. 参考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  5. 高级配置
    1. result是否保存
    2. 失败邮件通知:
    3. 关闭rate limit:
  6. auto_reload方法(*nix系统):
    1. celery通过监控源代码目录的改动,自动地进行reload
    2. 使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
    3. 安装依赖:
      $ pip install pyinotify
    4. (可选) 指定fsNotify的依赖:
      $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
    5. 启动: celery -A appname worker --autoreload
  7. auto-scale方法:
    1. 启用auto-scale
    2. 临时增加worker进程数量(增加consumer):
      $ celery -A proj control add_consumer foo -d worker1.local
    3. 临时减少worker进程数量(减少consumer):
  8. 将scheduled task的配置从app.conf变成DB的方法:
    1. 需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。

      1. celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
  9. 启动停止worker的方法:
    1. 启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

      1. root用户可以使用celeryd
      2. 非特权用户:celery multi start worker1 -A appName  —autoreload  --pidfile="$HOME/run/celery/%n.pid"  --logfile="$HOME/log/celery/%n.log"
      3. 或者 celery worker —detach
    2. 停止
    3. ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
  10. 与Flask集成的方法
    1. 集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
    2. flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
    3. flask获得
  11. 与flask集成后的启动问题
    1. 由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
  12. 性能提升: eventlet 和 greenlet
官方参考:http://docs.celeryproject.org/en/latest/userguide/index.html

转载于:https://www.cnblogs.com/ToDoToTry/p/5453149.html

Celery的实践指南相关推荐

  1. 《App架构师实践指南》:移动开发的进阶指南

    文章主要内容: 什么是 app 架构师 这本书主要内容 读完感受 什么是 App 架构师 成为"架构师"是许多程序员的梦想,当然也包括我,在工作的几年里,我见过很多架构师,他们在设 ...

  2. Python机器学习实践指南pdf (中文版带书签)、原书代码、数据集

    Python机器学习实践指南 目 录  第1章Python机器学习的生态系统 1  1.1 数据科学/机器学习的工作  流程 2  1.1.1 获取 2  1.1.2 检查和探索 2  1.1.3 清 ...

  3. 深度学习「CV」学习实践指南!

    ↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:黄星源.樊亮.陈桦.斯国一 深度学习的发展不仅突破了许多视觉难题, ...

  4. json数据解析_VBA 实践指南 -- Split函数解析Json数据

    什么是JSON? JSON(JavaScript Object Notation) 是一种及其轻量级的数据交换格式,它是 ECMAScript (欧洲计算机协会制定的JavaScript规范)的一个子 ...

  5. 免费教材丨第55期:Python机器学习实践指南、Tensorflow 实战Google深度学习框架

    小编说  时间过的好快啊,小伙伴们是不是都快进入寒假啦?但是学习可不要落下哦!  本期教材  本期为大家发放的教材为:<Python机器学习实践指南>.<Tensorflow 实战G ...

  6. 《响应式Web设计:HTML5和CSS3实践指南》——2.9节基于位置伪类的交替行样式

    本节书摘来自华章社区<响应式Web设计:HTML5和CSS3实践指南>一书中的第2章,第2.9节基于位置伪类的交替行样式,作者(美) Benjamin LaGrone,更多章节内容可以访问 ...

  7. 《Core Data应用开发实践指南》一1.3 创建Grocery Dude项目

    本节书摘来自华章出版社<Core Data应用开发实践指南>一书中的第1章,第1.3节,作者 (美)Tim Roadley,更多章节内容可以访问云栖社区"华章计算机"公 ...

  8. 《术以载道——软件过程改进实践指南》—第1章1.1节对CMMI的基本认识

    本节书摘来自异步社区<术以载道--软件过程改进实践指南>一书中的第1章1.1节对CMMI的基本认识,作者任甲林,更多章节内容可以访问云栖社区"异步社区"公众号查看. 第 ...

  9. 操作系统CnetOS_7—systemd管理实践指南

    systemd管理实践指南 管理systemd CentOS 7 使用systemd替换了SysV.Systemd目的是要取代Unix时代以来一直在使用的init系统,兼容SysV和LSB的启动脚本, ...

最新文章

  1. POJ 1944 - Fiber Communications
  2. VS 2010 测试功能学习(八) - RnP与Coded UI测试(继续篇)
  3. iOS之界面传值(通知,属性,协议,NSUserDefaults,KVC)
  4. linux下安装davinci
  5. pg安装部署linux_简简单单基于docker部署微服务网关
  6. 深入浅出计算机组成原理04:存储和IO系统
  7. B站【云E办】在线办公系统 项目源码
  8. Nexus3 功能介绍
  9. 灰色系统理论概论(个人总结)
  10. 国潮席卷!这家高端酒店品牌推出以唐风宋韵为基础的“新国风”酒店
  11. linux下查看vnc端口_Linux的VNCServer的默认端口是多少?
  12. 避免当野monkey, 走野路子
  13. Pr 案例:制作快闪转场效果
  14. 练手之经典病毒熊猫烧香分析(上)
  15. Java 10正式发布,带来了这些新特性
  16. xmppFrameWork的使用
  17. aiku给你们最真心地学习建议--转
  18. 《20个月赚30个亿-陈士骏自传》读后感
  19. Photoshop制作木板雕刻看图识字
  20. Unity2D游戏使游戏角色移动的脚本

热门文章

  1. Java学习笔记_类和对象
  2. 『设计模式』JAVA I/O 与装饰者模式UML图
  3. [数组] 连续子数组的最大和 --- LeetCode53
  4. android 铃声位置
  5. 相机内外参矩阵和坐标变换
  6. Makefile学习资料及书籍推荐
  7. 存款全线下降,贷款大幅增加,你敢等吗?
  8. 针对C++和Delphi的LiveBindings一瞥
  9. 台湾印象之三:吃与喝
  10. “全裸”:看向京的雕塑艺术展