#sora#celery worker guide abstract
2019独角兽企业重金招聘Python工程师标准>>>
celery worker guide abstract
启动worker:
e.g. celery -A proj worker -l info
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
备注:
The hostname argument can expand the following variables:
%h: Hostname including domain name.
%n: Hostname only.
%d: Domain name only.
E.g. if the current hostname is george.example.com then these will expand to:
worker1.%h -> worker1.george.example.com
worker1.%n -> worker1.george
worker1.%d -> worker1.example.com
关闭worker:
如果遇到worker死循环无法退出,可以用此:
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
要重启worker,最简单的方法便是用celery multi,但是要注意为每个worker指定pidfile和logfile
celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
celery multi restart 1 --pidfile=/var/run/celery/%n.pid
在生产环境中应使用init脚本或supervisord运行为daemon
几种信号:
The worker’s main process overrides the following signals:
TERM Warm shutdown, wait for tasks to complete.
QUIT Cold shutdown, terminate ASAP
USR1 Dump traceback for all active threads.
USR2 Remote debug, see celery.contrib.rdb.
其他补充说明:
Variables in file paths
The file path arguments for --logfile, --pidfile and --statedb can contain variables that the worker will expand:
Node name replacements
%h: Hostname including domain name.
%n: Hostname only.
%d: Domain name only.
%i: Prefork pool process index or 0 if MainProcess.
%I: Prefork pool process index with separator.
E.g. if the current hostname is george.example.com then these will expand to:
--logfile=%h.log -> george.example.com.log
--logfile=%n.log -> george.log
--logfile=%d -> example.com.log
关于prefork:
Prefork pool process index
The prefork pool process index specifiers will expand into a different filename depending on the process that will eventually need to open the file.
This can be used to specify one log file per child process.
Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. I.e. the number is the process index not the process count or pid.
%i - Pool process index or 0 if MainProcess.
Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:
worker1-0.log (main process)
worker1-1.log (pool process 1)
worker1-2.log (pool process 2)
%I - Pool process index with separator.
Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:
worker1.log (main process)
worker1-1.log (pool process 1)
worker1-2.log (pool process 2)
并发:
可以用eventlet或prefork等方案,但似乎使用eventlet时要有所注意
远程控制:
可以使用多种命令对运行中的worker修改配置之类
broadcast()
e.g.
启动worker:
root@workgroup0:~/celeryapp/configtest# ls
logging proj
root@workgroup0:~/celeryapp/configtest# celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
另一个终端中:
目录层次如下:
root@workgroup0:~/celeryapp/configtest# ls
logging proj
root@workgroup0:~/celeryapp/configtest# cd proj
root@workgroup0:~/celeryapp/configtest/proj# la
agent.py agent.pyc celery.py celery.pyc config.py config.pyc __init__.py __init__.pyc
root@workgroup0:~/celeryapp/configtest/proj#
root@workgroup0:~/celeryapp/configtest/proj# cd ..
root@workgroup0:~/celeryapp/configtest# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.celery import app
>>> app.control.broadcast('rate_limit', {'task_name': 'proj.agent.add', 'rate_limit': '200/m'}, reply=True)
[{u'celery@worker1.workgroup0.hzg.com': {u'ok': u'new rate limit set successfully'}}]
>>>
可以看到已经成功修改了worker的rate-limit,
要注意相关模块的层次关系,否则会出现错误
也可以指定worker:
>>> app.control.broadcast('rate_limit', {
... 'task_name': 'myapp.mytask',
... 'rate_limit': '200/m'}, reply=True,
... destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]
取消任务和取消多个任务:
例子:
>>> result.revoke()
>>> AsyncResult(id).revoke()
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
... terminate=True)
>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
... terminate=True, signal='SIGKILL')
>>> app.control.revoke([
... '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
... 'f565793e-b041-4b2b-9ca4-dca22762a55d',
... 'd9d35e03-2997-42d0-a13e-64a66b88a618',
])
任务撤销持久化:
文档描述:
Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.
The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:
e.g.
celery -A proj worker -l info --statedb=/var/run/celery/worker.state
celery multi start 2 -l info --statedb=/var/run/celery/%n.state
设置任务超时:
文档描述:
The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it
e.g.
软实时:
from myapp import app
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
do_work()
except SoftTimeLimitExceeded:
clean_up_in_a_hurry()
也可以通过设置CELERYD_TASK_TIME_LIMIT / CELERYD_TASK_SOFT_TIME_LIMIT变量处理
运行时修改worker参数:
Changing time limits at runtime
e.g.
>>> app.control.time_limit('tasks.crawl_the_web',
soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]
Changing rate-limits at runtime
e.g.
>>> app.control.rate_limit('myapp.mytask', '200/m')
>>> app.control.rate_limit('myapp.mytask', '200/m',
... destination=['celery@worker1.example.com'])
tips:如果CELERY_DISABLE_RATE_LIMITS是enable,这个修改将会无效
其他参数与功能(注意有些并发方案并不支持某些特性):
Max tasks per child setting:
With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.
The option can be set using the workers –maxtasksperchild argument or using the CELERYD_MAX_TASKS_PER_CHILD setting.
Autoscaling:
The autoscaler component is used to dynamically resize the pool based on load:
The autoscaler adds more pool processes when there is work to do,
and starts removing processes when the workload is low.
It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:
--autoscale=AUTOSCALE
Enable autoscaling by providing
max_concurrency,min_concurrency. Example:
--autoscale=10,3 (always keep 3 processes, but grow to
10 if necessary).
tips:You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the CELERYD_AUTOSCALER setting.
Queues:
A worker instance can consume from any number of queues. By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).
e.g.
$ celery -A proj worker -l info -Q foo,bar,baz
If the queue name is defined in CELERY_QUEUES it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the CELERY_CREATE_MISSING_QUEUES option).
You can also tell the worker to start and stop consuming from a queue at runtime using the remote control commands add_consumer and cancel_consumer.
还有Adding consumers,Cancelling consumers和List of active queues三个子模块在文档中,有更详细的example
Autoreloading(测试时可以用,不建议引入生产环境):
Starting celery worker with the --autoreload option will enable the worker to watch for file system changes to all imported task modules imported (and also any non-task modules added to the CELERY_IMPORTS setting or the -I|--include option).
警告:This is an experimental feature intended for use in development only, using auto-reload in production is discouraged as the behavior of reloading a module in Python is undefined, and may cause hard to diagnose bugs and crashes.
Pool Restart Command
Requires the CELERYD_POOL_RESTARTS setting to be enabled.
The remote control command pool_restart sends restart requests to the workers child processes. It is particularly useful for forcing the worker to import new modules, or for reloading already imported modules. This command does not interrupt executing tasks.
e.g.
>>> app.control.broadcast('pool_restart',
... arguments={'modules': ['foo', 'bar']})
>>> app.control.broadcast('pool_restart',
... arguments={'modules': ['foo'],
... 'reload': True})
If you don’t specify any modules then all known tasks modules will be imported/reloaded:
>>> app.control.broadcast('pool_restart', arguments={'reload': True})
查看worker的相关信息:
app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.
You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.
e.g.
# Inspect all nodes.
>>> i = app.control.inspect()
# Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
'worker2.example.com'])
# Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')
有以下几种功能:
Dump of registered tasks
You can get a list of tasks registered in the worker using the registered()
Dump of currently executing tasks
You can get a list of active tasks using active()
Dump of scheduled (ETA) tasks
You can get a list of tasks waiting to be scheduled by using scheduled():
Dump of reserved tasks
Reserved tasks are tasks that has been received, but is still waiting to be executed.
You can get a list of these using reserved()
Statistics
The remote control command inspect stats (or stats()) will give you a long list of useful (or not so useful) statistics about the worker:
e.g.
$ celery -A proj inspect stats
关于其输出的相关说明请参见文档
其他功能:
Remote shutdown
Ping
Enable/disable events
Writing your own remote control commands
(见文档)
转载于:https://my.oschina.net/hochikong/blog/415929
#sora#celery worker guide abstract相关推荐
- #SORA#celery研究笔记
2019独角兽企业重金招聘Python工程师标准>>> 最近看到celery文档task部分,做一下小结 实际处理时,我们可以使用一个类似于logging的模块生成日志. 对于某些任 ...
- #SORA#celery实践1
2019独角兽企业重金招聘Python工程师标准>>> 这次研究celery的Next Step部分. 先创建一个python module: mkdir proj cd proj ...
- #SORA#celery原生配置文件研究
2019独角兽企业重金招聘Python工程师标准>>> ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http: ...
- #sora#celery笔记——call the task
2019独角兽企业重金招聘Python工程师标准>>> 基本的两种task调用方式: apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数 快速 ...
- celery源码分析-worker初始化分析(下)
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...
- python celery多worker、多队列、定时任务
多worker.多队列 celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务. 如果要说celery的分布式应用的话, ...
- Celery 应用:Application
Celery 应用:Application Celery 在使用前必须要进行实例化,实例化之后被称为应用程序(简称 app). 应用程序是线程安全的,可以配置多个不同配置.组件.任务的 Celery ...
- Celery中文翻译-Application
Celery在使用前必须实例化,称为application或app.app是线程安全的,具有不同配置.组件.task的多个Celery应用可以在同一个进程空间共存. # 创建Celery应用 > ...
- python Celery 分布式任务队列快速入门
本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...
最新文章
- linux下如何查看某个软件 是否安装??? 安装路径在哪???
- UIWindow简单介绍
- revit建筑样板_Revit出建筑施工图步骤及注意事项
- angularJS 全选反选批量删除
- boost::mp11::mp_transform_third相关用法的测试程序
- 基本数据类型之间的运算
- git学习(五)分支操作和解决冲突
- Python 编写规范
- Windows 10 + kali Linux 双系统安装教程(详细版)
- 考研数学笔记:曲率数学公式推导
- 开源视频云转码 m3u8_8种开源视频游戏
- 202. 快乐数 (Python 实现)
- 目标检测结果IOU不同取值的意义(0.5:0.95等)
- 中国染料医用激光器行业市场供需与战略研究报告
- 新美大 java待遇_入我新美大的Java后台开发面试题总结
- python建立窗口并美化_Python GUI教程(十六):在PyQt5中美化和装扮图形界面
- 《安富莱嵌入式周报》第295期:世界杯球员和足球实时跟踪,开源手持矢量网络分析仪,自制柔性电容式传感器,IAR加强对VSCode支持、索尼早期PSX的光驱模拟器
- win7 本地连接共享无线网络(即两网卡共享)
- android---加速传感器
- vue3+Echart