celery实现任务统一收集、分发执行
项目架构如下:一个服务,一部分task运行在server1上,一部分task运行在server2上,所有的任务都可以通过网页向tornado(部署在server1上)发起、tornado接到网页请求调用相应的task handler、task handler向celery broker相应的queue发任务消息、最后server1上的worker和server2上的worker各自去相应的队列中获取任务消息并执行任务。server1是上海集群的10.121.72.94,server2是济阳集群的10.153.104.76,celery broker是redis数据库:redis://10.121.76.204:17016/1。
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from kombu import Queue
app = Celery("proj",
broker = "redis://10.121.76.204:17016/1",
include = ['proj.hotplay_task']
)
app.conf.update(
CELERY_DEFAULT_QUEUE = 'hotplay_sh_default_queue',
#CELERY_QUEUES = (Queue('hotplay_jy_queue'),), #该队列是给server2用的,并不需要在这里申明
)
from __future__ import absolute_import
import sys
import os
import hashlib
import time
import subprocess
from proj.celery import app
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append(os.path.join(os.path.dirname(__file__), "./"))
HOTPLAY_CATCHUP_DIR = '/home/uaa/prog/hotplay_v2/online_task/catch_up'
@app.task(bind=True)
def do_init_catchup(self, user_name, album_id, album_name, channel_name):
print 'start to init catch up of user %s album %s:%s in channel %s'%(user_name, album_id, album_name, channel_name)
job_args = 'source %s/init_catch_up.sh %s %s %s %s > ./logs/%s_%s.log'%(HOTPLAY_CATCHUP_DIR, user_name, album_id, album_name, channel_name, album_id, user_name)
print 'job_args:', job_args
P = subprocess.Popen(job_args,shell=True)
rt_code = P.wait()
if rt_code == 0:
print 'job success...'
else:
print 'job error:%d'%(rt_code)
# print 'job error:%d, will retry in 5 min'%(rt_code)
# raise self.retry(countdown=300)
@app.task(bind=True)
def do_catchup(self, hotplay_id, start_dt, end_dt):
print 'start to catch up of %s:%s-%s'%(hotplay_id, start_dt, end_dt)
job_args = 'source %s/catch_up_all_run.sh %s %s %s > ./logs/%s.log 2>&1'%(HOTPLAY_CATCHUP_DIR, hotplay_id, start_dt, end_dt, hotplay_id)
print 'job_args:', job_args
P = subprocess.Popen(job_args,shell=True)
rt_code = P.wait()
if rt_code == 0:
print 'job success...'
else:
print 'job error:%d'%(rt_code)
# print 'job error:%d, will retry in 5 min'%(rt_code)
# raise self.retry(countdown=300)
nohup celery -A proj worker -n hotplay_default_worker -c 3 -Q hotplay_sh_default_queue -l info &
#-*-coding=utf-8-*-
from __future__ import absolute_import
import sys
import os
import tornado.web
import tornado.ioloop
import tornado.httpserver
from celery.execute import send_task
from proj.hotplay_task import do_init_catchup, do_catchup
reload(sys)
sys.setdefaultencoding('utf-8')
TORNADO_SERVER_PORT=10501
class InitCatchupHandler(tornado.web.RequestHandler):
def get(self, path):
user_name = self.get_argument("user_name", None)
album_id = self.get_argument("album_id",None)
album_name = self.get_argument("album_name",None)
channel_name = self.get_argument("channel_name", None)
print "request user_name+album_id+album_name+channel_name:%s+%s_%s+%s"%(user_name, album_id, album_name, channel_name)
if album_id == '0':
self.write('test tornado server init catch up handler. sucess. just return\n')
return
try:
self.write("0")
do_init_catchup.delay(user_name, album_id, album_name, channel_name)
except:
self.write("-1")
class DoCatchupHandler(tornado.web.RequestHandler):
def get(self, path):
hotplay_id = self.get_argument("hotplay_id",None)
start_dt = self.get_argument("start_dt",None)
end_dt = self.get_argument("end_dt",None)
print "request hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)
if hotplay_id == '0':
self.write('test tornado server catch up handler. sucess. just return\n')
return
try:
self.write("0")
do_catchup.delay(hotplay_id, start_dt, end_dt)
except:
self.write("-1")
class DoCatchupJYHandler(tornado.web.RequestHandler):
def get(self, path):
hotplay_id = self.get_argument("hotplay_id",None)
start_dt = self.get_argument("start_dt",None)
end_dt = self.get_argument("end_dt",None)
print "request jy hotplay_id+start_dt+end_dt:%s+%s+%s"%(hotplay_id, start_dt, end_dt)
#if hotplay_id == '0':
# self.write('test tornado server catch up handler. sucess. just return\n')
# return
send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue') #tasks.test1是server2上celery任务函数的file_name.func_name
#file_name是任务函数所在文件相对于celery worker的路径
#try:
# self.write("0")
# do_catchup.delay(hotplay_id, start_dt, end_dt)
#except:
# self.write("-1")
application = tornado.web.Application(
[
(r"/init_catchup/(.*)", InitCatchupHandler),
(r"/do_catchup/(.*)", DoCatchupHandler),
(r"/do_catchup_jy/(.*)", DoCatchupJYHandler),
],
template_path = "template", static_path="static"
)
if __name__ == '__main__':
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(TORNADO_SERVER_PORT)
tornado.ioloop.IOLoop.instance().start()
#-*-coding=utf-8-*-
from __future__ import absolute_import
from celery import Celery
from kombu import Queue
app = Celery("test",
broker = "redis://10.121.76.204:17016/1"
# include = ['test.tasks']
)
app.conf.update(
CELERY_DEFAULT_QUEUE = 'hotplay_sh_default_queue', #可省略,但不能和server1的配置不一样
CELERY_QUEUES = (Queue('hotplay_jy_queue'),),
)
@app.task()
def test1(hotplay_id, start_dt, end_dt): #注意,名字要和tornado_server中send_task()函数用的func_name名字一样
print 'hotplay_id is %s, stat from %s to %s'%(hotplay_id, start_dt, end_dt)
celery -A tasks worker -n hotplay_jy_worker -c 2 -Q hotplay_jy_queue -l info
from celery.executeimport send_task
from proj.hotplay_taskimport do_init_catchup, do_catchup
celery实现任务统一收集、分发执行相关推荐
- K8S使用filebeat统一收集应用日志
今年3月份在公司的内部k8s培训会上,开发同事对应用整合进pod提出了几个问题,主要围绕在java应用的日志统一收集.集中存放和java jvm内存监控数据收集相关的点上,本文将介绍使用filebea ...
- Celery Redis未授权访问命令执行利用
首发补天社区:https://forum.butian.net/share/224←走过路过帮点一下~ 前言 Celery 是一个简单.灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类 ...
- Oracle收集cbo统计信息,Oracle CBO 统计信息的收集与执行计划的选择
--概要 主要总结一下Oracle是如何收集统计信息的是如何选择的,有一些好的Ref可以看看 --基本概念 首先要明确系统的自动收集机制 如果insert update delete truncate ...
- 网络主机防泄密安全保护方案
网络主机防泄密安全保护方案 2008-3-22 请预先参考<可信移动存储管理白皮书> --- 附录 一.主机移动存储介质安全管理 移动存储介质管理系统分别从主机层次和传递介质层次对文件的读 ...
- 启动celery后执行任务报错:django.core.exceptions.ImproperlyConfigured
工作的环境版本如下: [Django version]: 2.1 [celery version]:4.4.0rc2 [python version]: 3.7 [Redis version]:3.2 ...
- celery异步执行任务在Django中的应用实例
1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp ...
- celery源码分析:multi命令分析
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...
- Kubernetes-基于EFK进行统一的日志管理
1.统一日志管理的整体方案 通过应用和系统日志可以了解Kubernetes集群内所发生的事情,对于调试问题和监视集群活动来说日志非常有用.对于大部分的应用来说,都会具有某种日志机制.因此,大多数容器引 ...
- 借鉴开源框架自研日志收集系统
踏浪无痕 岂安科技高级架构师 十余年数据研发经验,擅长数据处理领域工作,如爬虫.搜索引擎.大数据应用高并发等.担任过架构师,研发经理等岗位.曾主导开发过大型爬虫,搜索引擎及大数据广告DMP系统目前负责 ...
最新文章
- 伪数组(ArrayLike)
- LeetCode--046--全排列(java)
- 初步学习Spring Aop使用之注解方式
- kali工具中文手册_Kali Linux 2019.4发布了!解决Kali Linux 2019.4中文乱码问题
- 从源码出发深入理解 Android Service
- TCP的2MSL问题
- 如何同时让多台服务器安装系统,如何同时安装多台服务器?
- sort 、sorted、range、join方法 数字的正序、倒叙、翻转
- 10个受欢迎的英文名
- 模块之序列化模块json
- 基于地理距离的省际空间权重矩阵
- 友盟统计 H5 vue 隐藏友盟图标
- 海外问卷调查,招募合伙人
- jvm的内存分布,参数配置 和 GC处理机制
- 短视频怎么获得高流量?简单小技巧,让你的短视频被更多人看到
- Android 打开新浪微博特定页
- Eclipse ST-Link设置方法
- vue实际运用五:不需要响应式的数据的处理
- tools1.0.6
- Micro:bit 入门介绍
热门文章
- gin context和官方context_[系列文章] Gin框架 - 安装和路由配置
- 那些很厉害的人,是如何度过职场迷茫的?
- python入门-廖雪峰 Python教程
- 在 Visual Studio .NET 中使用 SQL Server 2000 创建数据库应用程序(1)
- 英特尔中国祝贺高亭宇夺冠:至强CPU提供更精准训练支持
- Meta率先发布虚拟世界Horizon Worlds
- 分房变卖房?董明珠承诺的3700套房即将交付,或将按成本价卖给员工
- 手机价格要上涨?小米和Realme确认芯片短缺:高通交付延长至30周以上
- 华为Mate40 Pro最新渲染图曝光:双孔瀑布屏有戏 再度惊艳
- 富豪被劫,二十年一遇