1、释义:

  Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。

举几个实例场景中可用的例子:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

Celery 本身并不提供消息服务,在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

2、Celery的优点:

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

3、Celery基本工作流程图

4、示例

这里我们使用redis
连接url的格式为:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = 'redis://localhost:6379/0'

安装celery和redis

  • pip install celery
  • pip install redis

使用celery包含三个方面:

  • 定义任务函数
  • 运行celery服务
  • 客户应用程序的调用

先创建一个脚本 tasks.py

from celery import Celery        #导入了celery

broker = 'redis://172.16.94.85:6379/1'backend = 'redis://172.16.94.85:6379/2'app = Celery('tasks', broker=broker, backend=backend) #创建了celery实例app,实例化的过程中指定任务名tasks(和文件名一致),传入了broker和backend

@app.task #装饰器def add(x, y): #创建任务函数addprint("running...", x, y)return x + y

在当前命令行终端运行(启动worker,worker名要和脚本名一致):

celery -A tasks worker --loglevel=info

此时会看见一对输出,包括注册的任务

新建 test.py并执行:

#!/usr/bin/env python# -*- coding:utf-8 -*-# @Time   : 2018/5/26 8:17# @Author : JWQ# @File   : demo1.py

from tasks import add #导入tasks模块

re = add.delay(10, 20)print(re.result) #获取结果print(re.ready) #是否处理print(re.get(timeout=1)) #获取结果print(re.status) #是否处理

执行test.py后在celery行能看到相关的操作日志:

[2018-05-25 11:31:28,373: WARNING/ForkPoolWorker-1] ('running...', 4, 4)
[2018-05-25 11:31:28,394: INFO/ForkPoolWorker-1] Task tasks.add[30b145f9-14f7-4cd8-aa5e-7b6105c52325] succeeded in 0.0216804221272s: 8
[2018-05-25 11:31:58,991: INFO/MainProcess] Received task: tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762]  
[2018-05-25 11:31:58,995: WARNING/ForkPoolWorker-1] ('running...', 4, 4)
[2018-05-25 11:31:58,998: INFO/ForkPoolWorker-1] Task tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762] succeeded in 0.00274921953678s: 8

打开 backend的redis,也可以看见celery执行的信息。

在python环境中调用的add函数,实际上是在应用程序中调用这个方法。需要注意,如果把返回值赋值给一个变量,那么原来的应用程序也会被阻塞,需要等待异步任务返回的结果。因此,实际使用中,不需要把结果赋值。

5、Celery模块调用

既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,我觉得就要提到celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去,如图:

6、多worker,多队列

先写脚本task.py

[root@localhost celery]# cat tasks.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def taskA(x,y):
return x + y

@app.task
def taskB(x,y,z):
return x + y + z

上面的代码中,首先定义了一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。
接下来写celeryconfig.py,需要注意代码的缩进格式:
[root@localhost celery]# cat celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.48.131:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}

配置文件一般单独写在一个文件中

启动一个worker来指定taskA
celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B
脚本测试:
from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2, 3)
print(re3.status)     #PENDING
我们看到add的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。
celery -A tasks worker -l info -n worker.%h -Q celery
print(re3.status)    #SUCCESS
7、Celery与定时任务
在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'taskA_schedule' : {
        'task':'tasks.taskA',
        'schedule':20,
        'args':(5,6)
    },
    'taskB_scheduler' : {
        'task':"tasks.taskB",
        "schedule":200,
        "args":(10,20,30)
    },
    'add_schedule': {
        "task":"tasks.add",
        "schedule":10,
        "args":(1,2)
    }
注意格式,否则会有问题
Celery启动定时任务:
celery –A tasks beat

Celery启动定时任务:

这样taskA每20秒执行一次taskA.delay(5, 6)
taskB每200秒执行一次taskB.delay(10, 20, 30)
Celery每10秒执行一次add.delay(1, 2)

转载于:https://www.cnblogs.com/Jweiqing/p/9096427.html

cerely异步分布式相关推荐

  1. python—Celery异步分布式

    python-Celery异步分布式 Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向brok ...

  2. python 消息队列、异步分布式

    一.消息队列 消息队列:是在消息的传输过程中保存消息的容器. 消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不同的进程.生产者往管道中写消息,消费者从管道中读消息. ...

  3. python消息队列celery_python—Celery异步分布式

    一.Celery异步分布式 Celery 是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息 ...

  4. 谷粒P193线程池异步分布式session单点登录

    p193 异步 异步复习 多线程几种方式 1.继承Thread 2.实现runable接口3.实现callable接口 4.线程池 public class ThreadTest {public st ...

  5. 分布式一致性与共识算法

    区块链技术是近几年逐渐变得非常热门的技术,以比特币为首的密码货币其实已经被无数人所知晓,但是却很少有人会去研究它们的底层技术,也就是作为一个分布式网络比特币等加密货币是如何工作的. 无论是 Bitco ...

  6. 分布式事务Seata的AT模式下两阶段提交原理

    文章目录 第一阶段 1. 扫描@GlobalTransactional注解,获取全局事务XID 2. TC生成全局事务XID,记录入库 3. 执行业务逻辑,提交本地事务,记录branch_table. ...

  7. 分布式事务解决方案,Seata的基本配置和使用

    文章目录 1. 分布式事务介绍 ①:本地事务 ②:分布式事务 ③:常见的分布式事务解决方案 3. 2PC与3PC ①:2PC与3PC的区别 3. Seata介绍 ①:Seata的三种角色 ②:Seat ...

  8. 纯干货-17 分布式深度学习原理、算法详细介绍

    介绍 无监督的特征学习和深度学习已经证明,通过海量的数据来训练大型的模型可以大大提高模型的性能.但是,考虑需要训练的深度网络模型有数百万甚至数十亿个参数需要训练,这其实是一个非常复杂的问题.我们可以很 ...

  9. 七个实用的分布式开源框架

    分布式系统是由一组通过网络进行通信.为了完成共同的任务而协调工作的计算机节点组成的系统,广泛应用在互联网企业项目生产研发中.近几年 "大数据" 概念的兴起,让复杂而宽泛的分布式系统 ...

最新文章

  1. 电子计算机原理讲义,计算机原理讲义
  2. Java从基础进阶到高手
  3. 中南大学在线考试答案计算机基础,中南大学《计算机基础》在线考试题库(267题)(有答案).doc...
  4. 因特网 以太网 互联网的含义及区别
  5. 实现查看订单详情功能
  6. BaseAudioContext
  7. Java监听器Listener使用说明
  8. 栈溢出脚本_污点分析挖掘漏洞演示——如何在8小时内从零发现cve20120158(word溢出漏洞)...
  9. 搭建邮件服务器2003,用Windows Server 2003来搭建简易的邮件服务器
  10. MCSA / Windows Server 2016 用MAP工具进行IT资产评估I和虚拟化部署准备
  11. DeFi 的纵向扩张:利率协议将带来去中心化金融世界新变革
  12. Springboot顺利达驾校预约管理系统毕业设计-附源码191748
  13. java localdate获取自然周
  14. 偷偷赚钱的副业(真实有效)
  15. Web---HTML标签总结
  16. 日本美女机器人 婀娜多姿会回眸一笑(图)
  17. 档案数字化中的OCR应用
  18. DECLARE_HANDLE()
  19. Android P 和 Android Q的适配
  20. 蒸汽机器人布里茨天赋_lolS7蒸汽机器人 布里茨怎么出装_lolS7机器人天赋出装推荐_玩游戏网...

热门文章

  1. SAP成都研究院姚瑶:软件质量保证工作的变迁
  2. arduino读取matlab串口,Matlab Arduino实时串行通信,采样0.004 s
  3. 数据库怎么看是什么编码_离婚了怎么发朋友圈?看你喜欢什么类型
  4. c语言回溯算法骑士周游,191-骑士周游回溯算法代码实现(1)
  5. php万年历上个月下个月,php 万年历
  6. mysql里的max怎么用_MySQL中的max()函数使用教程
  7. Mysql常见的面试总结
  8. wifi上行下行速度测试_增大AP天线增益,可以同时改善上下行 —— WIFI基础知识(3)...
  9. uml图中的各种箭头_一次搞懂建模语言UML
  10. python字符串转化为long_Java带有运算符的字符串转换为Long型