2019独角兽企业重金招聘Python工程师标准>>>

celery worker guide abstract

启动worker:

e.g. celery -A proj worker -l info

celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h

备注:

The hostname argument can expand the following variables:

%h: Hostname including domain name.

%n: Hostname only.

%d: Domain name only.

E.g. if the current hostname is george.example.com then these will expand to:

worker1.%h -> worker1.george.example.com

worker1.%n -> worker1.george

worker1.%d -> worker1.example.com

关闭worker:

如果遇到worker死循环无法退出,可以用此:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

要重启worker,最简单的方法便是用celery multi,但是要注意为每个worker指定pidfile和logfile

celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid

celery multi restart 1 --pidfile=/var/run/celery/%n.pid

在生产环境中应使用init脚本或supervisord运行为daemon

几种信号:

The worker’s main process overrides the following signals:

TERM Warm shutdown, wait for tasks to complete.

QUIT Cold shutdown, terminate ASAP

USR1 Dump traceback for all active threads.

USR2 Remote debug, see celery.contrib.rdb.

其他补充说明:

Variables in file paths

The file path arguments for --logfile, --pidfile and --statedb can contain variables that the worker will expand:

Node name replacements

%h: Hostname including domain name.

%n: Hostname only.

%d: Domain name only.

%i: Prefork pool process index or 0 if MainProcess.

%I: Prefork pool process index with separator.

E.g. if the current hostname is george.example.com then these will expand to:

--logfile=%h.log -> george.example.com.log

--logfile=%n.log -> george.log

--logfile=%d -> example.com.log

关于prefork:

Prefork pool process index

The prefork pool process index specifiers will expand into a different filename depending on the process that will eventually need to open the file.

This can be used to specify one log file per child process.

Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. I.e. the number is the process index not the process count or pid.

%i - Pool process index or 0 if MainProcess.

Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:

worker1-0.log (main process)

worker1-1.log (pool process 1)

worker1-2.log (pool process 2)

%I - Pool process index with separator.

Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:

worker1.log (main process)

worker1-1.log (pool process 1)

worker1-2.log (pool process 2)

并发:

可以用eventlet或prefork等方案,但似乎使用eventlet时要有所注意

远程控制:

可以使用多种命令对运行中的worker修改配置之类

broadcast()

e.g.

启动worker:

root@workgroup0:~/celeryapp/configtest# ls

logging  proj

root@workgroup0:~/celeryapp/configtest# celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h

另一个终端中:

目录层次如下:

root@workgroup0:~/celeryapp/configtest# ls

logging  proj

root@workgroup0:~/celeryapp/configtest# cd proj

root@workgroup0:~/celeryapp/configtest/proj# la

agent.py  agent.pyc  celery.py  celery.pyc  config.py  config.pyc  __init__.py  __init__.pyc

root@workgroup0:~/celeryapp/configtest/proj#

root@workgroup0:~/celeryapp/configtest/proj# cd ..

root@workgroup0:~/celeryapp/configtest# python

Python 2.7.6 (default, Mar 22 2014, 22:59:56)

[GCC 4.8.2] on linux2

Type "help", "copyright", "credits" or "license" for more information.

>>> from proj.celery import app

>>> app.control.broadcast('rate_limit', {'task_name': 'proj.agent.add', 'rate_limit': '200/m'}, reply=True)

[{u'celery@worker1.workgroup0.hzg.com': {u'ok': u'new rate limit set successfully'}}]

>>>

可以看到已经成功修改了worker的rate-limit,

要注意相关模块的层次关系,否则会出现错误

也可以指定worker:

>>> app.control.broadcast('rate_limit', {

...     'task_name': 'myapp.mytask',

...     'rate_limit': '200/m'}, reply=True,

...                             destination=['worker1@example.com'])

[{'worker1.example.com': 'New rate limit set successfully'}]

取消任务和取消多个任务:

例子:

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',

...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',

...                    terminate=True, signal='SIGKILL')

>>> app.control.revoke([

...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',

...    'f565793e-b041-4b2b-9ca4-dca22762a55d',

...    'd9d35e03-2997-42d0-a13e-64a66b88a618',

])

任务撤销持久化:

文档描述:

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.

The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:

e.g.

celery -A proj worker -l info --statedb=/var/run/celery/worker.state

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

设置任务超时:

文档描述:

The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it

e.g.

软实时:

from myapp import app

from celery.exceptions import SoftTimeLimitExceeded

@app.task

def mytask():

try:

do_work()

except SoftTimeLimitExceeded:

clean_up_in_a_hurry()

也可以通过设置CELERYD_TASK_TIME_LIMIT / CELERYD_TASK_SOFT_TIME_LIMIT变量处理

运行时修改worker参数:

Changing time limits at runtime

e.g.

>>> app.control.time_limit('tasks.crawl_the_web',

soft=60, hard=120, reply=True)

[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

Changing rate-limits at runtime

e.g.

>>> app.control.rate_limit('myapp.mytask', '200/m')

>>> app.control.rate_limit('myapp.mytask', '200/m',

...            destination=['celery@worker1.example.com'])

tips:如果CELERY_DISABLE_RATE_LIMITS是enable,这个修改将会无效

其他参数与功能(注意有些并发方案并不支持某些特性):

Max tasks per child setting:

With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.

The option can be set using the workers –maxtasksperchild argument or using the CELERYD_MAX_TASKS_PER_CHILD setting.

Autoscaling:

The autoscaler component is used to dynamically resize the pool based on load:

The autoscaler adds more pool processes when there is work to do,

and starts removing processes when the workload is low.

It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:

--autoscale=AUTOSCALE

Enable autoscaling by providing

max_concurrency,min_concurrency.  Example:

--autoscale=10,3 (always keep 3 processes, but grow to

10 if necessary).

tips:You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the CELERYD_AUTOSCALER setting.

Queues:

A worker instance can consume from any number of queues. By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).

e.g.

$ celery -A proj worker -l info -Q foo,bar,baz

If the queue name is defined in CELERY_QUEUES it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the CELERY_CREATE_MISSING_QUEUES option).

You can also tell the worker to start and stop consuming from a queue at runtime using the remote control commands add_consumer and cancel_consumer.

还有Adding consumers,Cancelling consumers和List of active queues三个子模块在文档中,有更详细的example

Autoreloading(测试时可以用,不建议引入生产环境):

Starting celery worker with the --autoreload option will enable the worker to watch for file system changes to all imported task modules imported (and also any non-task modules added to the CELERY_IMPORTS setting or the -I|--include option).

警告:This is an experimental feature intended for use in development only, using auto-reload in production is discouraged as the behavior of reloading a module in Python is undefined, and may cause hard to diagnose bugs and crashes.

Pool Restart Command

Requires the CELERYD_POOL_RESTARTS setting to be enabled.

The remote control command pool_restart sends restart requests to the workers child processes. It is particularly useful for forcing the worker to import new modules, or for reloading already imported modules. This command does not interrupt executing tasks.

e.g.

>>> app.control.broadcast('pool_restart',

...                       arguments={'modules': ['foo', 'bar']})

>>> app.control.broadcast('pool_restart',

...                       arguments={'modules': ['foo'],

...                                  'reload': True})

If you don’t specify any modules then all known tasks modules will be imported/reloaded:

>>> app.control.broadcast('pool_restart', arguments={'reload': True})

查看worker的相关信息:

app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.

You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.

e.g.

# Inspect all nodes.

>>> i = app.control.inspect()

# Specify multiple nodes to inspect.

>>> i = app.control.inspect(['worker1.example.com',

'worker2.example.com'])

# Specify a single node to inspect.

>>> i = app.control.inspect('worker1.example.com')

有以下几种功能:

Dump of registered tasks

You can get a list of tasks registered in the worker using the registered()

Dump of currently executing tasks

You can get a list of active tasks using active()

Dump of scheduled (ETA) tasks

You can get a list of tasks waiting to be scheduled by using scheduled():

Dump of reserved tasks

Reserved tasks are tasks that has been received, but is still waiting to be executed.

You can get a list of these using reserved()

Statistics

The remote control command inspect stats (or stats()) will give you a long list of useful (or not so useful) statistics about the worker:

e.g.

$ celery -A proj inspect stats

关于其输出的相关说明请参见文档

其他功能:

Remote shutdown

Ping

Enable/disable events

Writing your own remote control commands

(见文档)

转载于:https://my.oschina.net/hochikong/blog/415929

#sora#celery worker guide abstract相关推荐

  1. #SORA#celery研究笔记

    2019独角兽企业重金招聘Python工程师标准>>> 最近看到celery文档task部分,做一下小结 实际处理时,我们可以使用一个类似于logging的模块生成日志. 对于某些任 ...

  2. #SORA#celery实践1

    2019独角兽企业重金招聘Python工程师标准>>> 这次研究celery的Next Step部分. 先创建一个python module: mkdir proj cd proj ...

  3. #SORA#celery原生配置文件研究

    2019独角兽企业重金招聘Python工程师标准>>> ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http: ...

  4. #sora#celery笔记——call the task

    2019独角兽企业重金招聘Python工程师标准>>> 基本的两种task调用方式: apply_async()和delay(),前者要把参数放在元组或字典中,后者直接使用参数 快速 ...

  5. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

  6. python celery多worker、多队列、定时任务

    多worker.多队列 celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务. 如果要说celery的分布式应用的话, ...

  7. Celery 应用:Application

    Celery 应用:Application Celery 在使用前必须要进行实例化,实例化之后被称为应用程序(简称 app). 应用程序是线程安全的,可以配置多个不同配置.组件.任务的 Celery ...

  8. Celery中文翻译-Application

    Celery在使用前必须实例化,称为application或app.app是线程安全的,具有不同配置.组件.task的多个Celery应用可以在同一个进程空间共存. # 创建Celery应用 > ...

  9. python Celery 分布式任务队列快速入门

    本节内容 Celery介绍和基本使用 在项目中如何使用celery 启用多个workers Celery 定时任务 与django结合 通过django配置celery periodic task 一 ...

最新文章

  1. linux下如何查看某个软件 是否安装??? 安装路径在哪???
  2. UIWindow简单介绍
  3. revit建筑样板_Revit出建筑施工图步骤及注意事项
  4. angularJS 全选反选批量删除
  5. boost::mp11::mp_transform_third相关用法的测试程序
  6. 基本数据类型之间的运算
  7. git学习(五)分支操作和解决冲突
  8. Python 编写规范
  9. Windows 10 + kali Linux 双系统安装教程(详细版)
  10. 考研数学笔记:曲率数学公式推导
  11. 开源视频云转码 m3u8_8种开源视频游戏
  12. 202. 快乐数 (Python 实现)
  13. 目标检测结果IOU不同取值的意义(0.5:0.95等)
  14. 中国染料医用激光器行业市场供需与战略研究报告
  15. 新美大 java待遇_入我新美大的Java后台开发面试题总结
  16. python建立窗口并美化_Python GUI教程(十六):在PyQt5中美化和装扮图形界面
  17. 《安富莱嵌入式周报》第295期:世界杯球员和足球实时跟踪,开源手持矢量网络分析仪,自制柔性电容式传感器,IAR加强对VSCode支持、索尼早期PSX的光驱模拟器
  18. win7 本地连接共享无线网络(即两网卡共享)
  19. android---加速传感器
  20. vue3+Echart

热门文章

  1. 2019计算与系统神经科学大会Cosyne 前沿研究汇总
  2. 传感器数据完善 AI 功能,激起机器人“网络效应”
  3. 四款 5G 版 iPhone 12 齐发,支持北斗系统,你准备好了吗?
  4. 全网最详细 TCP 参数讲解,不用担心没有面试机会
  5. js获取网页高度(详细整理)
  6. 35.angularJS的ng-repeat指令
  7. java--设计模式总结
  8. OSPF中virtual-link
  9. 融合基础设施会给私有云带来什么?
  10. pyenv、pipenv 环境管理