网络上有各种celery的资料,但看着眼花缭乱,以下资料自己通过各种网站搜索,整理汇总的,只作为自己笔记使用,外人看着会有点乱。
另外,说是celery使用redis作为broker,会出现任务重复执行问题。我通过各种手段模拟,特意把“visibility_timeout”时间改的比ETA还要小,结果还是没出现任务重传问题。不知道是否因为我用celery 4.3,截止到写这篇文章为止,是最新版本的缘故。
言归正传,这篇内容为celery的基础代码内容。

基本配置:

broker指定消息队列保存的位置;
backend指定执行结果保存的位置,如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪(可选配置,貌似不配置,运行效率会快很多)。

以下的配置全部采用redis作为broker。另外,需要安装:pip3 install redis,用来连接redis数据库

添加redis服务器代码的三种办法:
创建tasks.py,


from celery import Celery# 增加配置,redis为例
# 第一种(推荐)
app = Celery('demo',# broker指定消息队列保存的位置,如果没有密码,可以这么指定:'redis://:127.0.0.1:6379/1',broker='redis://:123456@192.168.0.100/10',# backend指定执行结果保存的位置backend='redis://:123456@192.168.0.100/11')
# 第二种
app = Celery('demo')
app.conf.update(broker_url='redis://:127.0.0.1:6379/1',result_backend='redis://:127.0.0.1:6379/2',
)
# 第三种,导入.py模块,config中指定broker_url/result_backend
app = Celery('demo')
app.config_from_object('config')

基础的celery任务配置:
①.创建一个任务 tasks.py

# 如果运行worker时,对以下代码进行修改了,需要重启worker才生效.
from celery import Celeryapp = Celery('Bruce Lin',# broker指定消息队列保存的位置,如果没有密码,可以这么指定:'redis://:127.0.0.1:6379/1',broker='redis://:123456@192.168.0.100/10',# backend指定执行结果保存的位置backend='redis://:123456@192.168.0.100/11')app.conf.update(# task_serializer='json',# Ignore other content# accept_content=['json'],# result_serializer='json',# 指定时区,不指定默认为 ‘UTC’timezone='Asia/Shanghai',enable_utc=False,# 任务结果一小时后过期(对应redis的ttl),缺省一天.# result_expires=3600,#  Number of CPU cores.机器缺省几核,这个数字就是多少.建议不要配置# worker_concurrency=4,# 并发量 同一个worker 可以同时处理任务上限,缺省为4个.官方推荐,不建议修改.貌似配置了不起效果# worker_prefetch_multiplier=10,
)# 将函数注册到app的任务里
# Bruce Lin为这个任务的名称,可自定义取,但不能有重名。默认的名称为:文件名.任务函数,比如:tasks.add
# 如果这里的函数,与上面的代码不写在一个py文件里,那么worker要运行的py文件,是这个文件名.
# 如果运行worker时,对以下代码进行修改了,需要重启worker才生效.
@app.task(name="Bruce Lin")
def add(x,y):print("running...",x,y)return x+y

②.启动Celery Worker来开始监听并执行任务

# tasks 为 tasks.py的文件名,是执行任务函数的那个py文件,必须要在同级目录执行。
# -l debug可以查看更加详细的日志,默认为 warning。
# -n指定worker的名称,当存在多台worker时,避免名称冲突,可使用本机IP地址。
# -A,指的是app程序,也就是要执行任务函数的那个py文件。
$ celery  worker -A tasks -l info -n 192.168.0.1

③.调用任务(同级目录,进入python3)

# 调用任务,使用 函数.delay 调用任务.
>>> from tasks import add
>>> add.delay(4, 4)

delay 实际上是 apply_async 的别名, 还可以使用如下方法调用, 但是 apply_async 支持更多的参数:
T.delay(arg, kwarg=value)

Star arguments shortcut to .apply_async. (.delay(*args, **kwargs) calls .apply_async(args, kwargs)).
T.apply_async((arg,), {‘kwarg’: value})

T.apply_async(countdown=10)
executes in 10 seconds from now.

T.apply_async(eta=now + timedelta(seconds=10))
executes in 10 seconds from now, specified using eta

T.apply_async(countdown=60, expires=120)
executes in one minute from now, but expires after 2 minutes.

T.apply_async(expires=now + timedelta(days=2))
expires in 2 days, set using datetime.

看你的worker终端会显示收到 一个任务,此时你想看任务结果的话,需要在调用 任务时 赋值个变量
在程序中,如果要获取执行结果等信息,最好配置time.sleep(),否则可能会因程序过快执行,当任务还未执行完,导致获取结果不准.

>>> result = add.delay(4, 4)
>>> result.ready()               # 返回执行状态
>>> result.result                # 查看返回结果
>>> result.get(timeout=1)        # 与result一样,查看返回结果.但get()可以添加参数,timeout意味着等待这个时间获取返回结果,超时未获得结果会报错.
>>> result.get(propagate=False)  # 程序执行过程出错报异常
>>> result.traceback             # 获取异常信息

下面介绍启动worker的2中方式:
启动worker(必须要在tasks的同级目录,才能启动worker)
启动worker 方式一:
celery worker -A tasks -l info -n 192.168.0.1
注意:如果要开启多台worker,一定要加-n 并指明当前这台worker的名称,否则可能会出现多台worker名称一样,导致任务重复执行问题。
当有多台主机做worker时,可以用-n 加本机IP的方式来避免冲突
加-n的具体指令为:
celery worker -A tasks -l info -n bruce_worker-1,当多台worker上线时,会收到如下信息:
第一台node
[2019-07-15 23:31:31,865: INFO/MainProcess] mingle: searching for neighbors
[2019-07-15 23:31:32,885: INFO/MainProcess] mingle: sync with 1 nodes
[2019-07-15 23:31:32,886: INFO/MainProcess] mingle: sync complete
[2019-07-15 23:31:32,913: INFO/MainProcess] celery@bruce_worker-1 ready.
[2019-07-15 23:31:39,119: INFO/MainProcess] sync with celery@bruce_worker-2
[2019-07-15 23:31:50,708: INFO/MainProcess] sync with celery@bruce_worker-3

第二台node
[2019-07-15 23:31:39,106: INFO/MainProcess] mingle: searching for neighbors
[2019-07-15 23:31:40,130: INFO/MainProcess] mingle: sync with 2 nodes
[2019-07-15 23:31:40,131: INFO/MainProcess] mingle: sync complete
[2019-07-15 23:31:40,140: INFO/MainProcess] celery@bruce_worker-2 ready.
[2019-07-15 23:31:50,707: INFO/MainProcess] sync with celery@bruce_worker-3

第三台node
[2019-07-15 23:31:50,695: INFO/MainProcess] mingle: searching for neighbors
[2019-07-15 23:31:51,715: INFO/MainProcess] mingle: sync with 2 nodes
[2019-07-15 23:31:51,715: INFO/MainProcess] mingle: sync complete
[2019-07-15 23:31:51,741: INFO/MainProcess] celery@bruce_worker-3 ready.

不加-n启动多worker时,会收到如下报错:
[2019-07-15 23:08:21,252: WARNING/MainProcess] /usr/local/lib/python3.7/site-packages/celery/app/control.py:54: DuplicateNodenameWarning: Received multiple replies from node name: celery@localhost.localdomain.
Please make sure you give each node a unique nodename using
the celery worker -n option.
pluralize(len(dupes), ‘name’), ', '.join(sorted(dupes)),

启动worker 方式二 后台启动

# /home/bruce/celery 是后台启动保存日志文件路径,路径目录为:/home/bruce/,文件为celery.log
# 同时系统会在/home/bruce/目录下,生成celery.pid的进程文件,进程停止后,这个文件会自动删除
# 以及系统会在/home/bruce/目录下,生成四个分别为:
# celery-1.log、celery-2.log、celery-3.log、celery-4.log,这是因为开启了四个并发数量。
# 也可以指定相对路径,比如:celery multi start logging/celery -A tasks -l info,以celery为日志名,存放在logging目录下.
# 如果要修改默认的并发数,可以在启动参数后面添加:--concurrency=10(不建议操作)
celery multi start /home/bruce/celery -A tasks -l info -n bruce_worker-1
celery multi restart /home/bruce/celery
celery multi stop /home/bruce/celery

注意:后台启动,看不到下面这段话:
以下内容为其它celery代码启动界面,与上面代码无关。

/usr/local/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!Please specify a different user using the --uid option.User information: uid=0 euid=0 gid=0 egid=0uid=uid, euid=euid, gid=gid, egid=egid,-------------- celery@worker3 v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Linux-3.10.0-957.el7.x86_64-x86_64-with-centos-7.6.1810-Core 2019-07-17 01:08:55
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         test:0x7f84046d7400
- ** ---------- .> transport:   redis://:**@192.168.0.100:6379/10
- ** ---------- .> results:     redis://:**@192.168.0.100/11
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- -------------- [queues].> app_task1        exchange=app_task1(direct) key=app_task1.> app_task2        exchange=app_task2(direct) key=app_task2.> default          exchange=default(direct) key=default[tasks]. schedule_add. task1. task2

设置开机启动:

# 注意,一定要进入tasks.py任务所在目录,才能执行,以下例子中,tasks.py处于/var/ftp/pub/celery里
chmod +x /etc/rc.d/rc.localvi /etc/rc.d/rc.localcd /var/ftp/pub/celery
celery multi start logging/celery -A tasks -l info -n bruce_worker-1

定时器beat配置

第一种定时器配置(不推荐):
新建tasks.py

from celery import Celery
from celery.schedules import crontab# 这里配置忽略,具体看基础配置内容
app = Celery()@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# Calls test('hello') every 10 seconds.sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# Calls test('world') every 30 secondssender.add_periodic_task(30.0, test.s('world'), expires=10)# Executes every Monday morning at 7:30 a.m.sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task
def test(arg):print(arg)test.s('world'),
# test为任务的函数,s为固定参数,('world')为实参.

第二种定时器配置(推荐):
新建tasks.py

from celery import Celery
from celery.schedules import crontab# 这里配置忽略,具体看基础配置内容
app = Celery()app.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.'add-every-monday-morning': {'task': 'schedule_add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),},
}# 下面的代码可以不和app.conf.beat_schedule配置在一个文件或者一台机器上,
# 但必须要保证任务名'schedule_add'的名字与app.conf.beat_schedule里的task对应值的名字匹配。
@app.task(name='schedule_add')
def add(x, y):print x + yreturn x + y

字段解释:
task:需要执行的任务名称.如@app.task(name=‘schedule_add’),则’task’: ‘schedule_add’,
schedule:任务执行时间设定,使用crontab可以实现复杂的定时任务。如果要实现秒级定时任务,使用:‘schedule’: 10.0,
args:一个元组或者列表,位置参数
kwargs:一个字典,关键字参数
options:一个字典,一些额外选项,apply_async()方法可用的参数,exchange, routing_key, expires等
relative:默认false

crontab:
一个表示时间间隔的对象,语法与linux的crontab类似。
minute和hour可以设置为*/15,/2,分别表示每隔15分钟和每隔2小时。
day_of_week用可以0-6的数字表示,也可以文字表示mon-fri。
/2并不是每2天,而是每半天。
官网一些具体例子:
crontab() 每分钟执行
crontab(minute=0, hour=0) 每天凌晨执行
crontab(minute=0, hour=’/3’) 每三个小时执行: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,hour=‘0,3,6,9,12,15,18,21’) 同上
crontab(minute=’
/15’) 每十五分钟执行
crontab(day_of_week=‘sunday’) 星期天每分钟执行
crontab(minute=’’,hour=’’, day_of_week=‘sun’) 同上
crontab(minute=’/10’,hour=‘3,17,22’, day_of_week=‘thu,fri’) 每十分钟执行, 但是只在星期四、五的 3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’) 每两个小时及每三个小时执行,意思是: 除了下面时间的每个小时: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour=’
/5’) 每五个小时执行。这意味着将在 3pm 而不是 5pm 执行 (因为 3pm 等于 24 小时制的 15, 能被 5 整除)
crontab(minute=0, hour=’/3,8-17’) 每三个小时, 以及 (8am-5pm) 之间的小时执行
crontab(0, 0, day_of_month=‘2’) 每个月的第二天执行
crontab(0, 0, day_of_month=‘2-30/3’) 每个月的偶数天执行
crontab(0, 0, day_of_month=‘1-7,15-21’) 每个月的第一个和第三个星期执行
crontab(0, 0, day_of_month=‘11’,month_of_year=‘5’) 每年五月份的第十一天执行
crontab(0, 0, month_of_year=’
/3’) 每个季度的第一个月执行

启动定时器:
celery beat -A tasks -l info -f logging/schedule_tasks.log
-f logging/schedule_tasks.log:定时任务日志输出路径

后台运行,加参数"–detach":
celery beat -A tasks -l info -f logging/schedule_tasks.log --detach

设置开机启动:

# 注意,一定要进入tasks.py任务所在目录,才能执行,以下例子中,schedule_test.py处于/var/ftp/pub/celery里
chmod +x /etc/rc.d/rc.localvi /etc/rc.d/rc.localcd /var/ftp/pub/celery
# 同时开机启动worker,执行任务。
celery multi start logging/celery -A tasks -l info -n bruce_worker-1
celery beat -A tasks -l info -f logging/schedule_tasks.log  --detach

关闭进程:
ps aux | grep celery
找到进程后,再使用kill pid杀掉。

redis对应操作

当celery worker中断后,未执行的任务会被保存在redis的库里,
如果broker配置的消息保存队列在redis 10库里,命令为:broker=‘redis://:123456@192.168.0.100/10’
进入redis 10库,
使用:keys * #可以看到"celery" key,未执行的任务全部放在这里.
使用:type celery #可以看到celery是list数据类型.
使用:llen celery #可以看到未执行的任务数量有多少.
查看某个未执行的任务:lindex celery 1

完整的操作命令如下:
192.168.0.100:6379[10]> keys *

  1. “celery”
  2. “_kombu.binding.celery”
    192.168.0.100:6379[10]> type celery
    list
    192.168.0.100:6379[10]> llen celery
    (integer) 5
    192.168.0.100:6379[10]> lindex celery 0
    “{“body”: “W1sxMCwgMTIwXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d”, “content-encoding”: “utf-8”, “content-type”: “application/json”, “headers”: {“lang”: “py”, “task”: “task.add”, “id”: “a740de65-114a-46ec-a542-9a7ca23f29b3”, “shadow”: null, “eta”: null, “expires”: null, “group”: null, “retries”: 0, “timelimit”: [null, null], “root_id”: “a740de65-114a-46ec-a542-9a7ca23f29b3”, “parent_id”: null, “argsrepr”: “(10, 120)”, “kwargsrepr”: “{}”, “origin”: “gen8040@bruce-x260”}, “properties”: {“correlation_id”: “a740de65-114a-46ec-a542-9a7ca23f29b3”, “reply_to”: “d3c374f8-6672-3537-910e-c232ad78df79”, “delivery_mode”: 2, “delivery_info”: {“exchange”: “”, “routing_key”: “celery”}, “priority”: 0, “body_encoding”: “base64”, “delivery_tag”: “3fd7cb83-43ce-4727-8a5b-92e6bcb394a8”}}”
    192.168.0.100:6379[10]>

当启动worker后(celery -A tasks worker -l info),这些任务会自动运行.

查看数据库11的大小(有多少个key)
192.168.0.100:6379[11]> dbsize
(integer) 24

清空所有数据库
192.168.0.100:6379[11]> flushall
OK
192.168.0.100:6379[11]>

celery基础知识相关推荐

  1. python爬虫入门基础知识_【PYTHON】【爬虫】关于python爬虫的一些基础知识

    基础知识 HTTP协议 我们浏览网页的浏览器和手机应用客户端与服务器通信几乎都是基于HTTP协议,而爬虫可以看作是一个另类的客户端,它把自己伪装成浏览器或者手机应用客户端,按照自己的逻辑贪婪的向服务器 ...

  2. python爬虫需要哪些基础知识-【PYTHON】【爬虫】关于python爬虫的一些基础知识

    基础知识 HTTP协议 我们浏览网页的浏览器和手机应用客户端与服务器通信几乎都是基于HTTP协议,而爬虫可以看作是一个另类的客户端,它把自己伪装成浏览器或者手机应用客户端,按照自己的逻辑贪婪的向服务器 ...

  3. 嵌入式Linux的OTA更新,基础知识和实现

    嵌入式Linux的OTA更新,基础知识和实现 OTA updates for Embedded Linux, Fundamentals and implementation 更新的需要 一旦嵌入式Li ...

  4. 计算机基础知识第十讲,计算机文化基础(第十讲)学习笔记

    计算机文化基础(第十讲)学习笔记 采样和量化PictureElement Pixel(像素)(链接: 采样的实质就是要用多少点(这个点我们叫像素)来描述一张图像,比如,一幅420x570的图像,就表示 ...

  5. 嵌入式linux编程,嵌入式Linux学习笔记 - 嵌入式Linux基础知识和开发环境的构建_Linux编程_Linux公社-Linux系统门户网站...

    注:所有内容基于友善之臂Mini2440开发板 一.嵌入式Linux开发环境的构建 嵌入式开发一般分为三个步骤: 1.编译bootloader,烧到开发板 2.编译嵌入式Linux内核,烧到开发板 3 ...

  6. 《计算机网络应用基础》模拟试卷(六),《计算机与网络应用基础知识1》模拟试卷...

    <计算机与网络应用基础知识1>模拟试卷 (4页) 本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦! 9.9 积分 <计算机与网络应用基础知识1& ...

  7. python向量计算库教程_NumPy库入门教程:基础知识总结

    原标题:NumPy库入门教程:基础知识总结 视学算法 | 作者 知乎专栏 | 来源 numpy可以说是 Python运用于人工智能和科学计算的一个重要基础,近段时间恰好学习了numpy,pandas, ...

  8. python常用变量名_python基础知识整理

    Python Python开发 Python语言 python基础知识整理 序言:本文简单介绍python基础知识的一些重要知识点,用于总结复习,每个知识点的具体用法会在后面的博客中一一补充程序: 一 ...

  9. 计算机基础知识掌握欠缺,《计算机基础知识》实验教学改革探讨.pdf

    <计算机基础知识>实验教学改革探讨.pdf Science& TechnologyVision 科 技 视 界 科技 探·索·争鸣 计<算机基础知识>实验教学改革探讨 ...

最新文章

  1. 【技术贴】火狐的悬停激活标签扩展插件下载。Tab Focus
  2. 1012: [JSOI2008]最大数maxnumber 线段树
  3. 移动互联环境下的流程管理
  4. rust(20)-字符
  5. storm-kafka源码分析
  6. 全志A33-BootLoader的两个阶段:boot0和second boot
  7. 最全前端开发面试问题及答案整理
  8. [Android Pro] listView和GridView的item设置的高度和宽度不起作用
  9. spring 随笔(一) bean Dependency Injection
  10. 窗体应用程序:四则运算
  11. AUTOSAR Network Wakeup(Can) Configuration
  12. 华为服务器故障灯不开机_华为手机开不了机指示灯亮,怎么办
  13. JAVA、PHP统一社会信用代码、身份证号算法解析验证
  14. 阿里云大数据组件的基本介绍
  15. JS自写带描述标签云
  16. tidyverse笔记——tidyr包
  17. 【WAF剖析】——文件上传之安全狗bypass
  18. vue分类筛选方法,filer
  19. SAR ADC 介绍 核芯CL1606/CL1689/CL1680 替代AD7607/AD7689/AD1980
  20. 结对编程项目-最长英语单词链

热门文章

  1. 服务器状态监控php源码,服务器状态监控_监控Linux服务器网站状态的SHELL脚本
  2. 国外问卷调查做题工具
  3. 无法割舍的乡情--去外公家
  4. 微信公众号主体注销了,如何办理账号迁移?
  5. 安川机器人怎样与变位器编程_基于CAM Function的安川机器人弧焊焊接离线编程与离线仿真...
  6. 在国内愚人节可以开的10个玩笑
  7. ubuntu修改ssh端口_在Ubuntu上更改SSH欢迎横幅
  8. 程序员常用工具网站汇总(一)
  9. 杯具”箴言成网络流行语
  10. Android繁星眨眼动画效果