怎么能不恨呢,在我发现自己是恶鬼的时候,在我最绝望最虚弱的时候,这个世上最该跟我在一起的人却用刀把我的心刺穿了

Celery 是 Distributed Task Queue,分布式任务队列。分布式决定了可以有多个 worker 的存在,队列表示其是异步操作。它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

安装

Celery4.x 开始不再支持Windows平台了。下面装的是3.1.25。

安装使用命令

pip3 install celery==3.1.25

查看是否安装成功,使用命令即可:

celery --version

如果在win10下想要使用celery4.x的话,可以这么做:

pip3 install eventlet

运行worker的时候加上一个参数:

celery -A xxxx worker -l info -P eventlet

然后安装redis(个人比较喜欢redis)

首先下载安装redis windowns服务端

解压后,在其目录下执行命令

redis-server.exe

即可启动redis数据库

然后安装python连接操作redis的库

pip3 install redis==2.10.6

注意版本号

核心主件

celery通过五大模块实现

Task

就是任务,有异步任务和定时任务

Broker

中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

Worker

执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

Beat

定时任务调度器,根据配置定时将任务发送给Broler。

Backend

用于存储任务的执行结果。

组成关系

各个角色间的关系看下面这张图理解一下:

初次使用

首先编写一个文件 命名为task1.py

from celery import Celery

app = Celery('tasks',broker='redis://192.168.1.102:6379/0')

# redis://192.168.1.102:6379/0 是redis数据库地址,无需账号密码验证,也是ssrf在获取内网系统权限的方式之一

@app.task

def add(x,y):

print('传递 {} + {} = {}'.format(x,y,x+y))

return x+y

然后启动redis数据库

接下来再task1文件夹执行命令

celery -A task1 worker --loglevel=info

就会看到消息队列都启动

到现在所有的队列都启动,可以向这个队列添加任务等待处理

方法是再task1目录下打开cmd窗口,进入python3交互界面

python3

from task1 import add

add.delay(6,12)

add.delay(6,6)

上面只是一个发送任务的调用,结果是拿不到的。上面也没有接收返回值,这次把返回值保存到起来

修改task1内容

app = Celery('tasks',broker='redis://192.168.1.102:6379/0',backend='redis://192.168.1.102:6379/0')

然后要重启Worker,IDLE也要重启

然后这样就能获取结果了

t = add.delay(1, 1)

t.get()

# 还可以设置超时时间 t.get(timeout=5)

# 如果出错,获取错误结果,不触发异常

# 使用命令t.get(propagate=False)

# t.traceback (打印异常详细结果)

# 还可以获取任务状态

# t.ready() 返回True 或者False

在项目中使用Celery

可以把celery配置成一个应用,假设应用名字是CeleryPro,目录格式如下:

CeleryPro

├─__init.py

├─celery.py

├─tasks.py

这里的连接文件命名必须为celery.py,其他名字随意

celery文件

这个文件名必须是celery.py:

from __future__ import absolute_import, unicode_literals

from celery import Celery

app = Celery('CeleryPro',

broker='redis://192.168.1.102:6379',

backend='redis://192.168.1.102:6379',

include=['CeleryPro.tasks'])

app.conf.update(

result_expires=3600,

)

if __name__ == '__main__':

app.start()

tasks文件

这个文件开始两行就多了一个点,这里要导入上面的celery.py文件。后面只要写各种任务加上装饰器就可以了:

from __future__ import absolute_import, unicode_literals

from .celery import app

import time

@app.task

def add(x, y):

print("计算2个值的和: %s %s" % (x, y))

return x+y

@app.task

def upper(v):

for i in range(10):

time.sleep(1)

print(i)

return v.upper()

启动worker

启动的时候,-A 参数后面用应用名称 CeleryPro 。你还需要cd到你CeleryPro的父级目录上启动,否则找不到:

启动的姿势

这里注意用的都是CeleryPro:

celery -A CeleryPro worker -loglevel=info # 前台启动不推荐

celery -A CeleryPro worker -l info # 前台启动简写

celery multi start w1 -A CeleryPro -l info # 推荐用后台启动

定时任务

主要修改 celery.py文件

from __future__ import absolute_import, unicode_literals

from celery import Celery

from celery.schedules import crontab

from datetime import timedelta

app = Celery('CeleryPro',

broker='redis://192.168.1.102',

backend='redis://192.168.1.102',

include=['CeleryPro.tasks'])

app.conf.CELERYBEAT_SCHEDULE = {

'add every 10 seconds': {

'task': 'CeleryPro.tasks.add',

'schedule': timedelta(seconds=10),

# 可以用timedelta对象

# 'schedule': 10, # 也支持直接用数字表示秒数

'args': (1, 2)

},

'upper every 2 minutes': {

'task': 'CeleryPro.tasks.upper',

'schedule': crontab(minute='*/2'),

'args': ('abc', ),

},

}

# app.conf.CELERY_TIMEZONE = 'UTC'

app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'

# Optional configuration, see the application user guide.

app.conf.update(

CELERY_TASK_RESULT_EXPIRES=3600,

)

if __name__ == '__main__':

app.start()

启动使用命令

celery -A CeleryPro beat -l info

celery -A CeleryPro worker -l info

参数解析:

-l info 与--loglevel=info的作用是一样的。

--beat 周期性的运行。即设置 心跳。

新例子# tasks.py

# coding:utf-8

from celery import Celery,platforms

app = Celery('tasks')

app.config_from_object('config')

platforms.C_FORCE_ROOT = True

@app.task

def add(x,y):

return x + y

和另一个文件

# config.py

# coding:utf-8

from __future__ import absolute_import

from celery.schedules import crontab

from datetime import timedelta

BROKER_URL = 'redis://127.0.0.1:6379/0'

CELERYBEAT_SCHEDULE = {

'add-every-2-seconds': {

'task': 'tasks.add',

'schedule': timedelta(seconds=2),

'args': (16, 10),

},

}

CELERY_TIMEZONE = 'Asia/Shanghai'

然后打开三个cmd窗口,依次输入:

celery -A tasks beat -l info

celery -A tasks worker -l info

celery -A tasks flower

然后访问本地5555端口即可~

查看异步任务情况

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现

下实现的方式如下:

安装flower:

pip3 install flower

启动flower(默认会启动一个webserver,端口为5555):

在另一个Terminal中:

celery -A task1 flower

这里的task1是上面创建的py文件

进入

http://localhost:5555

即可查看。

资料文档

python消息队列celery高可用_分布式消息队列-Celery相关推荐

  1. rocketmq 如何保证高可用_如何保证消息队列是高可用的

    为什么写这篇文章? 博主有两位朋友分别是小A和小B: 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑.再不然就是和运营聊聊天,写几个SQL,生成下报表 ...

  2. Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)

    上一篇文章,留了一个悬念,Config Client 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多 ...

  3. 消息队列面试 - 如何保证消息队列的高可用?

    面试题 如何保证消息队列的高可用? 面试官心理分析 如果有人问到你 MQ 的知识,高可用是必问的.上一讲提到,MQ 会导致系统可用性降低.所以只要你用了 MQ,接下来问的一些要点肯定就是围绕着 MQ ...

  4. 字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下

    写在前面 又到了年底跳槽高峰季,很多小伙伴出去面试时,不少面试官都会问到消息队列的问题,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上却没有很好的表达出来,究其根本原因,还是对相关的知识点 ...

  5. 为什么多个线程不可能同时抢到一把锁_分布式为什么一定要有高可用的分布式锁?看完就知道了...

    分布式锁定义 分布式锁在分布式环境下,锁定全局唯一公共资源,表现为: 请求串行化 互斥性 第一步是上锁的资源目标,是锁定全局唯一公共资源,只有是全局唯一的资源才存在多个线程或服务竞争的情况. 互斥性表 ...

  6. 数据存储,消息队列的高可用保障

    1 介绍 在之前的章节中,我们介绍了消息的发送 和 消息通信 的原理.但是这边有一个比较核心的关键点,那就是如果已经把消息传递给Broker.在Broker在被消费之前,如何保证消息的稳定性,避免消息 ...

  7. 如何构建一套高可用的 APP 消息推送平台

    转载自  如何构建一套高可用的 APP 消息推送平台 消息推送作为移动 APP 运营中的一项关键技术,已经被越来越广泛的运用.本文追溯了推送技术的发展历史,剖析了其核心原理,并对推送服务的关键技术进行 ...

  8. Spring-Cloud-Config消息总线和高可用

    2019独角兽企业重金招聘Python工程师标准>>> 系列文章 Spring-Cloud-Config快速开始 Spring-Cloud-Config消息总线和高可用 前言 上文中 ...

  9. 分布式为什么一定要有高可用的分布式锁?一线大厂必看!

    " 现在面试都会聊到分布式系统,其中不免谈到分布式锁这块的知识,今天就来聊聊如何设计高可用的分布式锁. 作者:陈于喆,来自:51CTO技术栈 分布式锁定义 分布式锁在分布式环境下,锁定全局唯 ...

最新文章

  1. 适用于任何数据可视化需求的国外10个最佳JavaScript图表库
  2. null NULL is_null 竟然不一样
  3. codeforces 483B Friends and Presents 解题报告
  4. 网站被黑了被挂马篡改后,如何解决网站被挂马?
  5. python3 解析 base64 数据
  6. BI开发之——ETL注意细节
  7. 整理的C++面试题,大厂面试总遇到!
  8. 205.同构字符串 (力扣leetcode) 博主可答疑该问题
  9. 【转】C#通过WMI设置NTFS目录共享和目录安全
  10. stm32 boot设置
  11. 制作纯净系统U盘教程(详细版)
  12. 深度 | 国产数据库到底行不行?金仓数据库审计性能实测
  13. CSP小中大python
  14. Delphi东京版FireDAC连接MSSQL2000
  15. Database of Fog
  16. Eclipse中各种查找快捷键
  17. 对人工智能芯片的一些看法
  18. 服务器紧急维修,Hypixel服务器紧急维护
  19. html pdf支持css%写法吗,flying-saucer-pdf终于完美解决了(中文问题,换行问题,分页,页眉页脚,水印),html+css控制pdf样式...
  20. 有哪些回收手机打款快的平台

热门文章

  1. python列表对象相同_Python中的学习列表对象,List
  2. python基础题面试_python常见面试题
  3. SSD论文阅读(Wei Liu——【ECCV2016】SSD Single Shot MultiBox Detector)
  4. 各大航空公司将加大海南春运期间运力投入
  5. docker和docker-compose 端口映射
  6. JAVA NIO编程入门(二)
  7. DevExress笔记
  8. 业务安全通用解决方案——WAF数据风控
  9. css细节(实习第1天)
  10. Asp.net中服务端控件事件是如何触发的(笔记)