首先解释下目标的概念:celery任务消息会由各种途径(比如手动通过python shell触发、通过tornado触发等)发往统一的一个celery broker,然后任务消息会由不同server上的worker去获取并执行。具体点说就是,借助celery消息路由机制,celery broker中开不同的消息队列来接收相应类型的任务消息,然后不同server上开启worker来处理目标消息队列里面的任务消息,即任务统一收集、分发到不同server上执行。
测试

项目架构如下:一个服务,一部分task运行在server1上,一部分task运行在server2上,所有的任务都可以通过网页向tornado(部署在server1上)发起、tornado接到网页请求调用相应的task handler、task handler向celery broker相应的queue发任务消息、最后server1上的worker和server2上的worker各自去相应的队列中获取任务消息并执行任务。server1是上海集群的10.121.72.94,server2是济阳集群的10.153.104.76,celery broker是redis数据库:redis://10.121.76.204:17016/1。

    首先来看一下server1上的代码结构,
| start_worker.sh
| proj
    |__init__.py      (空文件)
    |celery.py
    |hotplay_task.py
| hotplay_tornado_server.py
上面的代码包含了响应网页请求的tornado server构建代码、server1上的celery服务。
    先来看server1上的celery调度器,
celery.py
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from kombu import Queue
 
app = Celery("proj",
        broker = "redis://10.121.76.204:17016/1",
             include = ['proj.hotplay_task']
             )
 
app.conf.update(
        CELERY_DEFAULT_QUEUE = 'hotplay_sh_default_queue',
        #CELERY_QUEUES = (Queue('hotplay_jy_queue'),), #该队列是给server2用的,并不需要在这里申明
    )

hotplay_task.py
from __future__ import absolute_import
 
import sys
import os
import hashlib
import time
import subprocess
 
 
from proj.celery import app
 
 
reload(sys)
sys.setdefaultencoding('utf-8')
 
sys.path.append(os.path.join(os.path.dirname(__file__), "./"))
 
HOTPLAY_CATCHUP_DIR = '/home/uaa/prog/hotplay_v2/online_task/catch_up'
 
@app.task(bind=True)
def do_init_catchup(self, user_name, album_id, album_name, channel_name):
    print 'start to init catch up of user %s album %s:%s in channel %s'%(user_name, album_id, album_name, channel_name)
    job_args = 'source %s/init_catch_up.sh %s %s %s %s > ./logs/%s_%s.log'%(HOTPLAY_CATCHUP_DIR, user_name, album_id, album_name, channel_name, album_id, user_name)
    print 'job_args:', job_args
    P = subprocess.Popen(job_args,shell=True)  
    rt_code = P.wait()
    if rt_code == 0:
        print 'job success...'
    else:
        print 'job error:%d'%(rt_code)
    #    print 'job error:%d, will retry in 5 min'%(rt_code)
    #    raise self.retry(countdown=300)
 
@app.task(bind=True)
def do_catchup(self, hotplay_id, start_dt, end_dt):
    print 'start to catch up of %s:%s-%s'%(hotplay_id, start_dt, end_dt)
    job_args = 'source %s/catch_up_all_run.sh %s %s %s > ./logs/%s.log 2>&1'%(HOTPLAY_CATCHUP_DIR, hotplay_id, start_dt, end_dt, hotplay_id)
    print 'job_args:', job_args
    P = subprocess.Popen(job_args,shell=True)  
    rt_code = P.wait()
    if rt_code == 0:
        print 'job success...'
    else:
        print 'job error:%d'%(rt_code)
    #    print 'job error:%d, will retry in 5 min'%(rt_code)
    #    raise self.retry(countdown=300)

start_worker.sh
nohup celery -A proj worker -n hotplay_default_worker -c 3 -Q hotplay_sh_default_queue -l info &
上面的代码定义了一个celery实例,该实例有两个队列,注册了两个celery task function,最后启动一个worker来处理默认队列hotplay_sh_default_queue(celery.py中重命名过的默认队列)中的任务消息。
    tornado server是所有celery任务的发起者,server1和server2上celery task都由tornado server相应的handler发起。
hotplay_tornado_server.py
#-*-coding=utf-8-*-
from __future__ import absolute_import
import sys
import os
import tornado.web
import tornado.ioloop
import tornado.httpserver
 
from celery.execute import send_task
from proj.hotplay_task import do_init_catchup, do_catchup
 
reload(sys)
sys.setdefaultencoding('utf-8')
 
 
TORNADO_SERVER_PORT=10501
 
class InitCatchupHandler(tornado.web.RequestHandler):
    def get(self, path):
        user_name = self.get_argument("user_name", None)
        album_id = self.get_argument("album_id",None)
        album_name = self.get_argument("album_name",None)
        channel_name = self.get_argument("channel_name", None)
        print "request user_name+album_id+album_name+channel_name:%s+%s_%s+%s"%(user_name, album_id, album_name, channel_name)
        if album_id == '0':
            self.write('test tornado server init catch up handler. sucess. just return\n')
            return
        
        try:
            self.write("0")
            do_init_catchup.delay(user_name, album_id, album_name, channel_name)
        except:
            self.write("-1")
 
class DoCatchupHandler(tornado.web.RequestHandler):
    def get(self, path):
        hotplay_id = self.get_argument("hotplay_id",None)
        start_dt = self.get_argument("start_dt",None)
        end_dt = self.get_argument("end_dt",None)
        print "request hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)
        if hotplay_id == '0':
            self.write('test tornado server catch up handler. sucess. just return\n')
            return
        
        try:
            self.write("0")
            do_catchup.delay(hotplay_id, start_dt, end_dt)
        except:
            self.write("-1")
 
class DoCatchupJYHandler(tornado.web.RequestHandler):
    def get(self, path):
        hotplay_id = self.get_argument("hotplay_id",None)
        start_dt = self.get_argument("start_dt",None)
        end_dt = self.get_argument("end_dt",None)
        print "request jy hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)
        #if hotplay_id == '0':
        #    self.write('test tornado server catch up handler. sucess. just return\n')
        #    return
        send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue') #tasks.test1是server2上celery任务函数的file_name.func_name
#file_name是任务函数所在文件相对于celery worker的路径
        #try:
        #    self.write("0")
        #    do_catchup.delay(hotplay_id, start_dt, end_dt)
        #except:
        #    self.write("-1")
 
application = tornado.web.Application(
            [
            (r"/init_catchup/(.*)", InitCatchupHandler),
            (r"/do_catchup/(.*)", DoCatchupHandler),
            (r"/do_catchup_jy/(.*)", DoCatchupJYHandler),
            ], 
            template_path = "template", static_path="static"
            )
            
if __name__ == '__main__':
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(TORNADO_SERVER_PORT)
    tornado.ioloop.IOLoop.instance().start()

代码中定义了3个handler,前两个负责在接收到相应的网页请求后,发起server1上定义的两个task function任务消息,消息发往celery broker的默认队列hotplay_sh_default_queue(使用task_name.delay函数发出的请求会加入到默认队列,使用task_name.apply_async或send_task函数则可以指定目标队列),最后由server1上的worker执行。网页请求的格式类似——http://10.121.72.94:10501/do_catchup_jy/?hotplay_id=pxftest&start_dt=2015-08-12&end_dt=2015-08-14。第3个handler发起一个名为tasks.test1的任务消息,发往celery broker的另一个队列hotplay_jy_queue,tasks.test1任务并没有在server1上的celery调度器中实现(也叫注册),而是放在了server2上,相应的,处理队列hotplay_jy_queue的worker也在server2上运行。这里,由于tasks.test1task function没有注册在server1上,所以使用send_task函数来发送任务消息;这是因为task_name.delay、task_name.apply_async函数发送任务请求需要先import task_name相应的python function,而send_task函数发送任务消息其实就相当于往celery broker发送一个字符串类似的任务请求、不需要调用事先写好的task function,然后该字符串类似的任务消息由worker获取、worker根据任务消息去寻找实际的task function来执行。这种机制也是celery实现任务统一收集、分发执行的基础。
    
    来看server2上的celery调度器,
|tasks.py (注意,要和tornado server中send_task()函数用的file_name一样)
|start_server.sh
由于只是功能测试,写得比较简单,
tasks.py
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from kombu import Queue
 
app = Celery("test",
        broker = "redis://10.121.76.204:17016/1"
    #         include = ['test.tasks']
             )
 
app.conf.update(
        CELERY_DEFAULT_QUEUE = 'hotplay_sh_default_queue', #可省略,但不能和server1的配置不一样
        CELERY_QUEUES = (Queue('hotplay_jy_queue'),),
    )
 
@app.task()
def test1(hotplay_id, start_dt, end_dt): #注意,名字要和tornado_server中send_task()函数用的func_name名字一样
    print 'hotplay_id is %s, stat from %s to %s'%(hotplay_id, start_dt, end_dt)

start_server.sh
celery -A tasks worker -n hotplay_jy_worker -c 2 -Q hotplay_jy_queue -l info
server2上调度器主要就是开了一个worker来取tornado server发往hotplay_jy_queue队列的任务并执行,当然,任务在哪里执行、相应的任务函数就应该放在哪里。此外,server2和server1上的celery实例app的消息队列配置应该保持一致,因为它们是对同一个celery broker的配置。
总结
    最后总结下上面项目架构的实现:所有的celery任务都由tornado server发起,统一由celery broker收集、不过分别由celery broker的hotplay_sh_default_queue和hotplay_jy_queue两个消息队列接收,最后分别由server1和server2上的worker去执行。
    在上面的项目架构中,tornado server是和server1上的celery调度器放在一起的,这是有必要的,因为send_task函数发送任务消息的时候,至少应该要知道celery broker等信息,而这些信息在server1的celery调度器上有(请注意hotplay_tornado_server.py中from proj.hotplay_taskimport do_init_catchup, do_catchup语句,该语句不仅import两个任务函数,还获取了celery实例app的信息,从而获得了celery broker等配置信息)。在这之后,如果有其他任务要集成进来,直接在hotplay_tornado_server.py中增加相应的handler(调用send_task函数向目标队列发送相应的任务消息,目标队列不需要在server1上申明)、并在其他server上写好相应的celery调度器(申明消息队列、实现celery task function、开启worker)即可。这时,tornado server负责所有任务(不止是本文提到的3个任务)的触发(通过网页触发比较方便)、然后使用send_task函数往某一个固定的celery broker发送任务消息、不同种类的任务消息发到celery broker上特定的消息队列,每种任务的执行由任务部署的服务器上的celery调度器(就和server2上的调度器)完成,由各个服务器上的celery调度器的worker会到自己目标队列中取任务消息来执行。这样做的好处是:一个broker搞定所有任务,不过有多少种不同的任务、broker上就会有多少个消息队列。
后续
    上文总结中提到tornado server需要和server1上的celery调度器放在一起,以获取celery broker的信息,经过尝试,tornado server是可以完全独立出来的。
    在tornado server的py文件中添加以下代码:
from celery import Celery
app = Celery(broker = "redis://10.121.76.204:17016/1",)
接着,改send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt],queue='hotplay_jy_queue')
app.send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue')
然后,就可以去掉下面两行了:
from celery.executeimport send_task
from proj.hotplay_taskimport do_init_catchup, do_catchup
这样子,tornado server就可以完全独立出来运行,而不必再和任何任务绑在一起以获得celery broker的信息,因为celery broker的信息直接写在tornado server的代码里了。当然,hotplay_tornado_server.py代码经过上面的修改、完全独立出来后,do_init_catchup.delay(user_name, album_id, album_name, channel_name)和do_catchup.delay(hotplay_id, start_dt, end_dt)需要用send_task函数改写,
app.send_task('proj.hotplay_task.do_init_catchup', args=[user_name, album_id, album_name, channel_name])    #send to default queue: hotplay_default_sh_queue
app.send_task('proj.hotplay_task.do_catchup', args=[hotplay_id, start_dt, end_dt])
    最后说明一下,tornado server完全独立出来的好处:如果不完全独立出来,那么和tornado server放在一起的celery调度器需要修改的话,则celery worker和tornado server也需要重启(tornado server代码调用了celery调度器的任务函数以及broker信息,所以要重启),tornado server至少和一个celery调度器存在耦合;完全独立后,解除了tornado server代码和celery调度器之间的耦合,这时tornado server中使用send_task函数发送任务消息、无需经过实际实现的celery任务函数,所以任何celery调度器的改动(只要别改任务函数名和任务函数的参数)都无需重启tornado server、而只要重启celery worker即可,也就是说任务的提交和任务的执行完全分离开来了。
参考:
http://www.avilpage.com/2014/11/scaling-celery-sending-tasks-to-remote.html
https://groups.google.com/forum/#!topic/celery-users/E37wUyOcd3I
http://programming.nullanswer.com/question/29340011
http://www.imankulov.name/posts/celery-for-internal-api.html

celery实现任务统一收集、分发执行相关推荐

  1. K8S使用filebeat统一收集应用日志

    今年3月份在公司的内部k8s培训会上,开发同事对应用整合进pod提出了几个问题,主要围绕在java应用的日志统一收集.集中存放和java jvm内存监控数据收集相关的点上,本文将介绍使用filebea ...

  2. Celery Redis未授权访问命令执行利用

    首发补天社区:https://forum.butian.net/share/224←走过路过帮点一下~ 前言 Celery 是一个简单.灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类 ...

  3. Oracle收集cbo统计信息,Oracle CBO 统计信息的收集与执行计划的选择

    --概要 主要总结一下Oracle是如何收集统计信息的是如何选择的,有一些好的Ref可以看看 --基本概念 首先要明确系统的自动收集机制 如果insert update delete truncate ...

  4. 网络主机防泄密安全保护方案

    网络主机防泄密安全保护方案 2008-3-22 请预先参考<可信移动存储管理白皮书> --- 附录 一.主机移动存储介质安全管理 移动存储介质管理系统分别从主机层次和传递介质层次对文件的读 ...

  5. 启动celery后执行任务报错:django.core.exceptions.ImproperlyConfigured

    工作的环境版本如下: [Django version]: 2.1 [celery version]:4.4.0rc2 [python version]: 3.7 [Redis version]:3.2 ...

  6. celery异步执行任务在Django中的应用实例

    1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp ...

  7. celery源码分析:multi命令分析

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...

  8. Kubernetes-基于EFK进行统一的日志管理

    1.统一日志管理的整体方案 通过应用和系统日志可以了解Kubernetes集群内所发生的事情,对于调试问题和监视集群活动来说日志非常有用.对于大部分的应用来说,都会具有某种日志机制.因此,大多数容器引 ...

  9. 借鉴开源框架自研日志收集系统

    踏浪无痕 岂安科技高级架构师 十余年数据研发经验,擅长数据处理领域工作,如爬虫.搜索引擎.大数据应用高并发等.担任过架构师,研发经理等岗位.曾主导开发过大型爬虫,搜索引擎及大数据广告DMP系统目前负责 ...

最新文章

  1. 伪数组(ArrayLike)
  2. LeetCode--046--全排列(java)
  3. 初步学习Spring Aop使用之注解方式
  4. kali工具中文手册_Kali Linux 2019.4发布了!解决Kali Linux 2019.4中文乱码问题
  5. 从源码出发深入理解 Android Service
  6. TCP的2MSL问题
  7. 如何同时让多台服务器安装系统,如何同时安装多台服务器?
  8. sort 、sorted、range、join方法 数字的正序、倒叙、翻转
  9. 10个受欢迎的英文名
  10. 模块之序列化模块json
  11. 基于地理距离的省际空间权重矩阵
  12. 友盟统计 H5 vue 隐藏友盟图标
  13. 海外问卷调查,招募合伙人
  14. jvm的内存分布,参数配置 和 GC处理机制
  15. 短视频怎么获得高流量?简单小技巧,让你的短视频被更多人看到
  16. Android 打开新浪微博特定页
  17. Eclipse ST-Link设置方法
  18. vue实际运用五:不需要响应式的数据的处理
  19. tools1.0.6
  20. Micro:bit 入门介绍

热门文章

  1. gin context和官方context_[系列文章] Gin框架 - 安装和路由配置
  2. 那些很厉害的人,是如何度过职场迷茫的?
  3. python入门-廖雪峰 Python教程
  4. 在 Visual Studio .NET 中使用 SQL Server 2000 创建数据库应用程序(1)
  5. 英特尔中国祝贺高亭宇夺冠:至强CPU提供更精准训练支持
  6. Meta率先发布虚拟世界Horizon Worlds
  7. 分房变卖房?董明珠承诺的3700套房即将交付,或将按成本价卖给员工
  8. 手机价格要上涨?小米和Realme确认芯片短缺:高通交付延长至30周以上
  9. 华为Mate40 Pro最新渲染图曝光:双孔瀑布屏有戏 再度惊艳
  10. 富豪被劫,二十年一遇