python消息队列celery高可用_分布式消息队列-Celery
怎么能不恨呢,在我发现自己是恶鬼的时候,在我最绝望最虚弱的时候,这个世上最该跟我在一起的人却用刀把我的心刺穿了
Celery 是 Distributed Task Queue,分布式任务队列。分布式决定了可以有多个 worker 的存在,队列表示其是异步操作。它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
安装
Celery4.x 开始不再支持Windows平台了。下面装的是3.1.25。
安装使用命令
pip3 install celery==3.1.25
查看是否安装成功,使用命令即可:
celery --version
如果在win10下想要使用celery4.x的话,可以这么做:
pip3 install eventlet
运行worker的时候加上一个参数:
celery -A xxxx worker -l info -P eventlet
然后安装redis(个人比较喜欢redis)
首先下载安装redis windowns服务端
解压后,在其目录下执行命令
redis-server.exe
即可启动redis数据库
然后安装python连接操作redis的库
pip3 install redis==2.10.6
注意版本号
核心主件
celery通过五大模块实现
Task
就是任务,有异步任务和定时任务
Broker
中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker
执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat
定时任务调度器,根据配置定时将任务发送给Broler。
Backend
用于存储任务的执行结果。
组成关系
各个角色间的关系看下面这张图理解一下:
初次使用
首先编写一个文件 命名为task1.py
from celery import Celery
app = Celery('tasks',broker='redis://192.168.1.102:6379/0')
# redis://192.168.1.102:6379/0 是redis数据库地址,无需账号密码验证,也是ssrf在获取内网系统权限的方式之一
@app.task
def add(x,y):
print('传递 {} + {} = {}'.format(x,y,x+y))
return x+y
然后启动redis数据库
接下来再task1文件夹执行命令
celery -A task1 worker --loglevel=info
就会看到消息队列都启动
到现在所有的队列都启动,可以向这个队列添加任务等待处理
方法是再task1目录下打开cmd窗口,进入python3交互界面
python3
from task1 import add
add.delay(6,12)
add.delay(6,6)
上面只是一个发送任务的调用,结果是拿不到的。上面也没有接收返回值,这次把返回值保存到起来
修改task1内容
app = Celery('tasks',broker='redis://192.168.1.102:6379/0',backend='redis://192.168.1.102:6379/0')
然后要重启Worker,IDLE也要重启
然后这样就能获取结果了
t = add.delay(1, 1)
t.get()
# 还可以设置超时时间 t.get(timeout=5)
# 如果出错,获取错误结果,不触发异常
# 使用命令t.get(propagate=False)
# t.traceback (打印异常详细结果)
# 还可以获取任务状态
# t.ready() 返回True 或者False
在项目中使用Celery
可以把celery配置成一个应用,假设应用名字是CeleryPro,目录格式如下:
CeleryPro
├─__init.py
├─celery.py
├─tasks.py
这里的连接文件命名必须为celery.py,其他名字随意
celery文件
这个文件名必须是celery.py:
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('CeleryPro',
broker='redis://192.168.1.102:6379',
backend='redis://192.168.1.102:6379',
include=['CeleryPro.tasks'])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks文件
这个文件开始两行就多了一个点,这里要导入上面的celery.py文件。后面只要写各种任务加上装饰器就可以了:
from __future__ import absolute_import, unicode_literals
from .celery import app
import time
@app.task
def add(x, y):
print("计算2个值的和: %s %s" % (x, y))
return x+y
@app.task
def upper(v):
for i in range(10):
time.sleep(1)
print(i)
return v.upper()
启动worker
启动的时候,-A 参数后面用应用名称 CeleryPro 。你还需要cd到你CeleryPro的父级目录上启动,否则找不到:
启动的姿势
这里注意用的都是CeleryPro:
celery -A CeleryPro worker -loglevel=info # 前台启动不推荐
celery -A CeleryPro worker -l info # 前台启动简写
celery multi start w1 -A CeleryPro -l info # 推荐用后台启动
定时任务
主要修改 celery.py文件
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
app = Celery('CeleryPro',
broker='redis://192.168.1.102',
backend='redis://192.168.1.102',
include=['CeleryPro.tasks'])
app.conf.CELERYBEAT_SCHEDULE = {
'add every 10 seconds': {
'task': 'CeleryPro.tasks.add',
'schedule': timedelta(seconds=10),
# 可以用timedelta对象
# 'schedule': 10, # 也支持直接用数字表示秒数
'args': (1, 2)
},
'upper every 2 minutes': {
'task': 'CeleryPro.tasks.upper',
'schedule': crontab(minute='*/2'),
'args': ('abc', ),
},
}
# app.conf.CELERY_TIMEZONE = 'UTC'
app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
启动使用命令
celery -A CeleryPro beat -l info
celery -A CeleryPro worker -l info
参数解析:
-l info 与--loglevel=info的作用是一样的。
--beat 周期性的运行。即设置 心跳。
新例子# tasks.py
# coding:utf-8
from celery import Celery,platforms
app = Celery('tasks')
app.config_from_object('config')
platforms.C_FORCE_ROOT = True
@app.task
def add(x,y):
return x + y
和另一个文件
# config.py
# coding:utf-8
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERYBEAT_SCHEDULE = {
'add-every-2-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=2),
'args': (16, 10),
},
}
CELERY_TIMEZONE = 'Asia/Shanghai'
然后打开三个cmd窗口,依次输入:
celery -A tasks beat -l info
celery -A tasks worker -l info
celery -A tasks flower
然后访问本地5555端口即可~
查看异步任务情况
Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现
下实现的方式如下:
安装flower:
pip3 install flower
启动flower(默认会启动一个webserver,端口为5555):
在另一个Terminal中:
celery -A task1 flower
这里的task1是上面创建的py文件
进入
http://localhost:5555
即可查看。
资料文档
python消息队列celery高可用_分布式消息队列-Celery相关推荐
- rocketmq 如何保证高可用_如何保证消息队列是高可用的
为什么写这篇文章? 博主有两位朋友分别是小A和小B: 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑.再不然就是和运营聊聊天,写几个SQL,生成下报表 ...
- Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)
上一篇文章,留了一个悬念,Config Client 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多 ...
- 消息队列面试 - 如何保证消息队列的高可用?
面试题 如何保证消息队列的高可用? 面试官心理分析 如果有人问到你 MQ 的知识,高可用是必问的.上一讲提到,MQ 会导致系统可用性降低.所以只要你用了 MQ,接下来问的一些要点肯定就是围绕着 MQ ...
- 字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下
写在前面 又到了年底跳槽高峰季,很多小伙伴出去面试时,不少面试官都会问到消息队列的问题,不少小伙伴回答的不是很完美,有些小伙伴是心里知道答案,嘴上却没有很好的表达出来,究其根本原因,还是对相关的知识点 ...
- 为什么多个线程不可能同时抢到一把锁_分布式为什么一定要有高可用的分布式锁?看完就知道了...
分布式锁定义 分布式锁在分布式环境下,锁定全局唯一公共资源,表现为: 请求串行化 互斥性 第一步是上锁的资源目标,是锁定全局唯一公共资源,只有是全局唯一的资源才存在多个线程或服务竞争的情况. 互斥性表 ...
- 数据存储,消息队列的高可用保障
1 介绍 在之前的章节中,我们介绍了消息的发送 和 消息通信 的原理.但是这边有一个比较核心的关键点,那就是如果已经把消息传递给Broker.在Broker在被消费之前,如何保证消息的稳定性,避免消息 ...
- 如何构建一套高可用的 APP 消息推送平台
转载自 如何构建一套高可用的 APP 消息推送平台 消息推送作为移动 APP 运营中的一项关键技术,已经被越来越广泛的运用.本文追溯了推送技术的发展历史,剖析了其核心原理,并对推送服务的关键技术进行 ...
- Spring-Cloud-Config消息总线和高可用
2019独角兽企业重金招聘Python工程师标准>>> 系列文章 Spring-Cloud-Config快速开始 Spring-Cloud-Config消息总线和高可用 前言 上文中 ...
- 分布式为什么一定要有高可用的分布式锁?一线大厂必看!
" 现在面试都会聊到分布式系统,其中不免谈到分布式锁这块的知识,今天就来聊聊如何设计高可用的分布式锁. 作者:陈于喆,来自:51CTO技术栈 分布式锁定义 分布式锁在分布式环境下,锁定全局唯 ...
最新文章
- 适用于任何数据可视化需求的国外10个最佳JavaScript图表库
- null NULL is_null 竟然不一样
- codeforces 483B Friends and Presents 解题报告
- 网站被黑了被挂马篡改后,如何解决网站被挂马?
- python3 解析 base64 数据
- BI开发之——ETL注意细节
- 整理的C++面试题,大厂面试总遇到!
- 205.同构字符串 (力扣leetcode) 博主可答疑该问题
- 【转】C#通过WMI设置NTFS目录共享和目录安全
- stm32 boot设置
- 制作纯净系统U盘教程(详细版)
- 深度 | 国产数据库到底行不行?金仓数据库审计性能实测
- CSP小中大python
- Delphi东京版FireDAC连接MSSQL2000
- Database of Fog
- Eclipse中各种查找快捷键
- 对人工智能芯片的一些看法
- 服务器紧急维修,Hypixel服务器紧急维护
- html pdf支持css%写法吗,flying-saucer-pdf终于完美解决了(中文问题,换行问题,分页,页眉页脚,水印),html+css控制pdf样式...
- 有哪些回收手机打款快的平台