【Python】轻量级分布式任务调度系统-RQ
Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定的Job。就目前而言有三套成熟的工具celery,huey ,rq 。按照功能和使用复杂度来排序的话也是 celery>huey>rq. 因为rq 简单,容易上手,所以自己做的系统也会使用RQ作为分布式任务调度系统。
二 安装
因为RQ 依赖于Redis 故需要安装版本>= 2.6.0.具体安装方法请参考《Redis初探》。*nix 系统环境下安装RQ:
- pip install rq
无需其他配置即可以使用RQ。
三 原理
RQ 主要由三部分构成 Job ,Queues,Worker 构成。job也就是开发定义的函数用来实现具体的功能。调用RQ 把job 放入队列Queues,Worker 负责从redis里面获取任务并执行,根据具体情况返回函数的结果。
3.1 关于job
一个任务(job)就是一个Python对象,具体表现为在一个工作(后台)进程中异步调用一个函数。任何Python函数都可以异步调用,简单的将函数与参数追加到队列中,这叫做入队(enqueueing)。
3.2 关于Queue
将任务加入到队列之前需要初始化一个连接到指定Redis的Queue
- q=Queue(connection=redis_conn)
- from rq_test import hello
- result = q.enqueue(hello,'yangyi')
queue有如下属性:
timeout :指定任务最长执行时间,超过该值则被认为job丢失,对于备份任务 需要设置一个比较长的时间 比如24h。
result_ttl :存储任务返回值的有效时间,超过该值则失效。
ttl :specifies the maximum queued time of the job before it'll be cancelled
depends_on :specifies another job (or job id) that must complete before this job will be queued
job_id : allows you to manually specify this job's job_id
at_front :will place the job at the front of the queue, instead of the back
kwargs and args : lets you bypass the auto-pop of these arguments, ie: specify a timeout argument for the underlying job function.
需要关注的是 depends_on ,通过该属性可以做级联任务A-->B ,只有当A 执行成功之后才能执行B .
通过指定队列的名字,我们可以把任务加到一个指定的队列中:
- q = Queue("low", connection = redis_conn)
- q.enqueue(hello, "杨一")
对于例子中的Queue("low"),具体使用的时候可以替换"low"为任意的复合业务逻辑名字,这样就可以根据业务的需要灵活地归类的任务了。一般会根据优先级给队列命名(如:high, medium, low).
如果想要给enqueue传递参数的情况,可以使用enqueue_call方法。在要传递超时参数的情况下:
- q = Queue("low", connection = redis_conn)
- q.enqueue_call(func=hello, args= ("杨一",),timeout = 30)
3.3 关于worker
Workers将会从给定的队列中不停的循环读取任务,当所有任务都处理完毕就等待新的work到来。每一个worker在同一时间只处理一个任务。在worker中,是没有并发的。如果你需要并发处理任务,那就需要启动多个worker。
目前的worker实际上是fork一个子进程来执行具体的任务,也就是说rq不适合windows系统。而且RQ的work是单进程的,如果想要并发执行队列中的任务提高执行效率需要使用threading针对每个任务进行fork线程。
worker的生命周期有以下几个阶段组成:
1 启动,载入Python环境
2 注册,worker注册到系统上,让系统知晓它的存在。
3 开始监听。从给定的redis队列中取出一个任务。如果所有的队列都是空的且是以突发模式运行的,立即退出。否则,等待新的任务入队。
4 分配一个子进程。分配的这个子进程在故障安全的上下文中运行实际的任务(调用队列中的任务函数)
5 处理任务。处理实际的任务。
6 循环。重复执行步骤3。
四 如何使用
简单的开发一个deamon 函数,用于后端异步调用,注意任意函数都可以加入队列,必须能够在入队的时候 被程序访问到。
- #!/usr/bin/env python
- #-*- coding:utf-8 -*-
- def hello(name):
- print "hello ,%s"%name
- ip='192.168.0.1'
- num=1024
- return name,ip,num
- def workat(name):
- print "hello %s ,you r workat youzan.com "%(name)
4.1 构建队列,将任务对象添加到队列里面
- >>> from redis import Redis,ConnectionPool
- >>> from rq import Queue
- >>> pool = ConnectionPool(db=0, host='127.0.0.1', port=6379,
- ... password='yangyi')
- >>> redis_conn = Redis(connection_pool=pool)
- >>> q=Queue(connection=redis_conn)
- >>> from rq_test import hello
- >>>
- >>> result = q.enqueue(hello,'yangyi')
- >>> result = q.enqueue(hello,'youzan.com')
先实例化一个Queue类q,然后通过enqueue方法发布任务。第一个参数是执行的函数名,后面是函数执行所需的参数,可以是args也可以是kwargs,案例中是一个字符串。
然后会返回一个Job类的实例,后面会具体介绍Job类的实例具体的api。
4.2启动worker ,从日志上可以看到执行了utils.hello('yangyi') utils.hello('youzan.com') 。当然这个只是简单的调用介绍,生产环境还要写的更加健壮,针对函数执行的结果进行相应的业务逻辑处理。
- root@rac2:~# >python woker.py
- 23:44:48 RQ worker u'rq:worker:rac2.3354' started, version 0.6.0
- 23:44:48 Cleaning registries for queue: default
- 23:44:48
- 23:44:48 *** Listening on default...
- 23:44:48 default: utils.hello('yangyi') (63879f7c-b453-4405-a262-b9a6b6568b68)
- hello ,yangyi
- 23:44:48 default: Job OK (63879f7c-b453-4405-a262-b9a6b6568b68)
- 23:44:48 Result is kept for 500 seconds
- 23:44:48
- 23:44:48 *** Listening on default...
- 23:45:12 default: utils.hello('youzan.com') (e4e9ed62-c476-45f2-b66a-4b641979e731)
- hello ,youzan.com
- 23:45:12 default: Job OK (e4e9ed62-c476-45f2-b66a-4b641979e731)
- 23:45:12 Result is kept for 500 seconds
需要说明的是其实 worker的启动顺序应该在job放入队列之前,一直监听rq里面是否有具体的任务,当然如果worker晚于job 加入队列启动,job的状态会显示为 queued 状态。
4.3 查看作业执行的情况
当任务加入队列,queue.enqueue()方法返回一个job实例。其定义位于rq.job文件中,可以去查看一下它的API,主要用到的API有:
- >>> from rq import job
- >>> job = q.enqueue(hello,'youzan.com')
- >>> job.get_id() ##获取任务的id ,如果没有指定 ,系统会自动分配一个随机的字符串。
- u'17ad0b3a-195e-49d5-8d31-02837ccf5fa6'
- >>> job = q.enqueue(hello,'youzan.com')
- >>> print job.get_status() ##获取任务的处理状态
- finished
- >>> step1=q.enqueue(workat,) ##故意不传递参数,让函数执行失败,则获取的状态值是 failed
>>> print step1.get_status()
failed - >>> print job.result # 当任务没有执行的时候返回None,否则返回非空值,如果 函数 hello() 有return 的值,会赋值给result
- None
- 当我们把worker 监听进程停止,然后重新发布任务,查看此时任务的在队列的状态,会显示为 queued
- >>> job = q.enqueue(hello,'youzan')
- >>> print job.get_status()
- queued
- >>> print job.to_dict() #把job实例转化成一个字典,我们主要关注状态。
- {u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
- >>> job.cancel() # 取消作业,尽管作业已经被执行,也可以取消
- >>> print job.to_dict()
- {u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
- >>> print job.get_status()
- queued
- >>>
- >>> job.delete() # 从redis队列中删除该作业
- >>> print job.get_status()
- None
- >>> print job.to_dict()
- {u'origin': u'default', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
五 参考文章
[1] 官方文档
[2] 翻译 - Python RQ Job
[3] 翻译 - Python RQ Workers
[4] 云峰就她了 这位博主写了很多rq相关的实践经验,值得参考。
【Python】轻量级分布式任务调度系统-RQ相关推荐
- 一款你不得不了解的轻量级分布式任务调度系统
CronMan 分布式任务调度/定时任务系统 github地址:CronMan, 欢迎star 欢迎朋友们站内私信交流~ 简介 CronMan是一款轻量级的分布式任务调度系统.随着微服务化架构的逐步演 ...
- 这些优秀的国产分布式任务调度系统,你用过几个?
2019独角兽企业重金招聘Python工程师标准>>> 分布式调度在互联网企业中占据着十分重要的作用,尤其是电子商务领域,由于存在数据量大.高并发的特点,对数据处理的要求较高,既要保 ...
- 探寻繁杂定时任务的解决方案:分布式任务调度系统
导语:本文我们从架构和技术实现上来为大家讲解腾讯云分布式任务调度系统TCT(Tencent Cloud Task)如何实现任务调度的精准实时.稳定高效,以及任务的切分和编排.(编辑:中间件小Q妹) 0 ...
- 轻量级分布式任务调度平台 XXL-JOB
From:https://www.cnblogs.com/xuxueli/p/5021979.html github 地址 及 中文文档地址:https://github.com/xuxueli/xx ...
- hadoop 依赖式job_每天一学:一个轻量级分布式任务调度框架 XXL-JOB
概述 XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速.学习简单.轻量级.易扩展.现已开放源代码并接入多家公司线上产品线,开箱即用. 官方地址中文版:http://www.xux ...
- 分布式任务调度系统-定时任务的解决方案
导语:在前面我们讲过了阿里云分布式任务调度平台,今天我们从架构和技术实现上来为大家讲解腾讯云分布式任务调度系统TCT(Tencent Cloud Task)如何实现任务调度的精准实时.稳定高效,以及任 ...
- 开源分布式任务调度系统就选它!
分布式任务调度这个话题是每个后端开发和大数据开发都会接触的话题.因为应用场景的广泛,所以有很多开源项目专注于解决这类问题,比如我们熟知的xxl-job. 那么今天要给大家推荐的则是另一个更为强大的开源 ...
- 分布式任务调度系统V1
分布式任务调度系统V1目标 初步目标实现,实现任务的下发分配,分布式任务执行,支持任务分片(在代码上支持),任务执行记录. 任务调度系统构思 基于C/S架构实现,基于长连接来管理实现,当前版本的逻辑架 ...
- 赫拉(hera)分布式任务调度系统之项目启动(二)
文章目录 赫拉 创建表 打包部署 测试 TIPS 加入群聊 赫拉 大数据平台,随着业务发展,每天承载着成千上万的ETL任务调度,这些任务集中在hive,shell脚本调度.怎么样让大量的ETL任务准确 ...
最新文章
- ML-1 逻辑回归和梯度下降
- python的读取纯文本文件的几种模式
- Oracle中类似于isql或osql的命令行工具
- 一个蚂蚁前端程序员,曾经的辛酸面试历程
- 需求获取安排计划书_6分钟教你写一份融资计划书
- 外卖小哥等餐被打,“成年人的崩溃,只在一瞬间”
- 网络连接的netstat命令
- flutter 刷脸_GitHub - hqwlkj/wechat_face_payment: 微信刷脸支付、刷脸认证、扫码支持等 Flutter 插件....
- Sentinel服务熔断只配置fallback_客户自定义限流处理_削峰填谷_流量控制_速率控制_服务熔断_服务降级---微服务升级_SpringCloud Alibaba工作笔记0050
- 决定创业失败的除了团队外
- 收藏十二:ExtJs
- 3.MNIST数据集分类
- crontab 问号_轻松搞定crontab和quartz表达式
- python程序员真实收入曝光_行!看到抖音上Python程序员晒得工资条,我沉默了.........
- 工业相机及镜头的选型
- 2023年,哪些行业最具发展潜力?
- mybatis和spring框架的整合
- 画论76 布颜图《画学心法问答》
- 苹果发展到计算机,从天堂到地狱 十分钟看懂AIO兴衰存亡
- 电子学会、CPA和NCT考试有什么区别,哪个好?
热门文章
- Python 什么时候会被取代?
- 关于征集2020重大科学问题和工程技术难题的通知
- Nature最新研究:超9000学者每5天发1篇论文,这些高产作家背后的科研圈
- 英特尔反驳质疑:芯片供应充足、10nm量产没问题
- 程序员的十大谎言,你中了几个? | 每日趣闻
- 为什么 M1 和 Mac 是绝配?
- 一个web.Config或app.Config自定义段configSections的示例
- vs2017 编码约定——.editorconfig文件
- JavaScript原型-进阶者指南
- java中native的用法[转]