今天就来点比较有意思的东西

面前两篇充其量就是 Celery 的入门级,接下来就深入编写 Celery 框架,让 Celery 更 加健壮。

首先是定时任务,上文的编写的定时任务是在 config 文件里进行配置的,其实在Celery中提供了一系列的装饰器,比如前面说的 @app.task 等等,来看看如何使用装饰器来实现 app 实例中的定时任务

# 定时任务在文件中的写法
# -*- coding:utf-8 -*-
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)# 通过beat组件周期性将任务发送给woker执行。在示例中 新建文件period_task.py 并添加任务到配置文件中
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒执行addsender.add_periodic_task(crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四点五十六执行sayhaisayhi.s('wd'),name='say_hi')

也就是说通过方法调用的形式来执行定时任务

一般而言,这种格式需要在配置 Beat 中无法达到逻辑需求的时候使用,比较少见。

如果在 config 配置文件中编写了定时任务并希望 Celery Worker 服务进程的同时启动 Beat 模块,需要加入 -B 参数

celery -A haha worker -B -l info

关于路由指定的队列

一般是一个 task 对应一个队列 各个队列相互独立 可以执行不同的操作 比如taskA执行即时任务,taskB执行定时 延时任务等等 需要的是在 config 文件里声明

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
Queue("for_max",Exchange("for_max"),routing_key="for_max")
)
# 路由
CELERY_ROUTES = {'tasks.add':{"queue":"for_add","routing_key":"for_add"},
'tasks.max':{"queue":"for_max","routing_key":"for_max"}
}# 路由指明队列和routing_key
# routing_key认证所在的队列 用redis名称要一致(对于redis来说)

指定 CELERY_QUEUES 和 CELERY_ROUTES 选择要执行的任务,这个玩意在分布式任务上还是很方便的,下面说。

自定义Task

如果在Celery中想自定义 Task 基类,那就需要继承 Task 模块

# coding=utf-8
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery import Task
logger = get_task_logger(__name__)class demotask(Task): # 这是三种运行的状态def on_success(self, retval, task_id, args, kwargs):   # 任务成功执行logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))def on_failure(self, exc, task_id, args, kwargs, einfo):  #任务失败执行logger.info('task id:{} , arg:{} , failed ! error : {}' .format(task_id,args,exc))def on_retry(self, exc, task_id, args, kwargs, einfo):    #任务重试执行logger.info('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc))@app.task(base=demotask) # 需要继承这个基类
def func1(x,y):try:a=[]a[10] =1except Exception as e:logger.info(e)return x+y@app.task(base=demotask)
def func2(name):a=[]a[10] =1return 'hi {}'.format(name)@app.task(base=demotask)
def func3(a,b):return 'result = {} '.format(a+b)

这算是复写了 Task 模块,定义了几种状态,这个可以根据需求,只要在 task 中声明要继承的自定义类即可。

如果想要深入了解 Task 基类,可以查看 Celery 源码,这就不废话了。

PS:当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置)

@app.task
@decorator2
@decorator1
def add(x, y):return x + y

另外在Celery中还提供了自带的 logger 日志模块,可以这么用

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)@app.task
def add(x, y):logger.info('Adding {0} + {1}'.format(x, y))return x + y

Celery的可视化监控

Celery Flower 是一款Celery官方推荐使用的监控工具

能够实时监控 celery 的 Worker Tasks Borker Result等服务

安装也是极其简单

pip install flower

运行服务

celery flower --broker=redis://localhost:6379 --address=127.0.0.1 --port=5555或者flower -A proj --broker=redis://localhost:6379

这样就可以访问 Flower Web 了,浏览器访问 http: //127.0.0.1:5555 查看运行的服务

这样一来,在 Celery部署爬虫(一)的百度百科的爬虫就可以重新编写了

鬼子口音:Celery部署爬虫(一)​zhuanlan.zhihu.com

Config.py文件

# config.py
from __future__ import absolute_import
# broker
import datetime
BROKER_URL = 'redis://127.0.0.1:6379/0'
# backen
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017'
# 导入任务,如tasks.py
CELERY_IMPORTS = ('task', )
# 列化任务载荷的默认的序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'CELERY_ACCEPT_CONTENT = ['json']CELERY_TIMEZONE='Asia/Shanghai'    # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'
# CELERY_ENABLE_UTC = True

Celery.py文件

# Celery.py
# coding=utf-8
from __future__ import absolute_import
from celery import Celeryapp = Celery("STQ")
# 加载配置模块
app.config_from_object('config')if __name__ == '__main__':app.start()

Task.py 文件

# coding=utf-8
from __future__ import absolute_import
import re
import requests
from lxml import etree
import urllib.request
from Celery import appheaders ={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}def get_html(url):r = urllib.request.Request(url,headers=headers)res = urllib.request.urlopen(r)return res.read().decode("utf-8")@app.task
def parse(html):ret = re.compile(r'<title>(.*?)</title>')title = re.findall(ret,html)return title

Run_task.py 文件

from __future__ import absolute_import
from tasks import get_html,parsewith open("url.txt","r") as f:for data in f.readlines():url = data.strip('n')html = get_html(url)result = parse.delay(html)

黑窗口键入

celery -A Celery worker -l info -P gevent -c 10

然后发布任务让它从消息队列中消费

python run_task.py

这样一来。所有的参数配置都可以在 config 文件中进行配置了

关于路由 队列的使用

CELERY_QUEUES 设置一个指定 routing_key 的队列

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
)

CELERY_ROUTES 设置路由

通过 routing_key 来关联上面的 CELERY_QUEUES,所以这里的 routing_key 需要和上面参数的一致。

# 路由
CELERY_ROUTES = {'tasks.add':{"queue":"for_add","routing_key":"for_add"},
}

tasks.add 表示在 tasks.py 的函数文件的 add 方法属于哪个路由,而这个路由关联着哪个队列。也就是说声明哪个任务放入哪个队列

如果开启了这个路由,那么此任务就会被执行,反之则不会。

那么如果没有指定队列的方法要怎么执行呢?

celery -A Celery worker -l info -n add -Q for_add -P gevent -c 10

可以在命令中去指定某个任务对应某个队列,所以,发布出去的任务会通过路由关联到指定队列,不同 worker 会从不同的队列来消费任务。

而且每个队列都是相互独立的,这样一来,每个任务之间就不会相互影响了,即时任务,定时任务就可以有明确的分工。

分布式集群

接下来就该说说 Celery 分布式,基本的架构就是通过中间件来共享消息队列。

事实上分布式架构的核心无非就是机器间的通信,而一般的分布式爬虫架构都会体现在共享数据库队列,Redis 和 RabbitMQ 就是典型的消息队列。

比如通过redis来配置共享中间件

 redis://ip:6379/0

比如现在有 Master, Slave1, Slave2三台主机,使用Master编写主配置文件,celery主文件,任务和调度文件。

然后拷贝两份分别放到两台Slave里去,在黑窗口中分别键入

celery -A Celery worker -l info

可以看到,在终端中服务器之间已经相互连通

然后发布任务

python run_haha.py

本来监听就监听着 redis队列 的机器就会进行任务消费

需要知道的一点就是,运行队列里的任务的时候,每个机器真正调用的是自身任务文件里面的任务函数,只要该函数存在于队列之中。

然后就和一般的分布式没啥两样了。

好了,今天就这样了

但是,我在想,是不是应该来点刺激的

欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。

个人技术博客​www.gzky.live

celery 可视化_Celery部署爬虫(三)相关推荐

  1. 单机 docker 部署fastfds_云服务器使用docker可视化一键部署Wrodpress个人博客,操作简单,适合小白...

    原文链接在我的博客: 教你云服务器使用docker可视化一键部署Wrodpress个人博客,操作简单,适合小白 - Kyellow's blog​kyellow.gitee.io 前段时间领取了一台云 ...

  2. 用python爬取基金网信息数据,保存到表格,并做成四种简单可视化。(爬虫之路,永无止境!)

    用python爬取基金网信息数据,保存到表格,并做成四种简单可视化.(爬虫之路,永无止境!) 上次 2021-07-07写的用python爬取腾讯招聘网岗位信息保存到表格,并做成简单可视化. 有的人留 ...

  3. Kubernetes部署(三):CA证书制作

    相关内容: Kubernetes部署(一):架构及功能说明 Kubernetes部署(二):系统环境初始化 Kubernetes部署(三):CA证书制作 Kubernetes部署(四):ETCD集群部 ...

  4. scrapyd部署爬虫项目到LINUX服务器

    1,首先把scrapy爬虫项目上传到服务器 我的服务器架设在公司内网里.所以在这里使用WinSCP作为上传工具. 2,修改项目的配置文件scrapy.cfg 给爬虫项目指定分组,具体方法是在deplo ...

  5. Lync Server 2010的部署系列(三) lync批量导入用户联系人

    Lync Server 2010的部署系列(三) lync批量导入用户联系人 一.批量导入原理介绍 二.导入联系人操作指南 一.批量导入原理介绍 (介绍摘自http://ucworld.blog.51 ...

  6. Zabbix 3.0 部署监控 [三]

    Zabbix 3.0 部署监控 [三] zabbix  时间:2016年9月22日  笔者QQ:381493251  Abcdocker交流群:454666672  如果遇到什么问题可以进群询问,我们 ...

  7. Exchange Server 2016 独立部署/共存部署 (三)—— 安装Exchange程序

    Exchange Server 2016 独立部署/共存部署 (三)-- 安装Exchange程序 https://blog.51cto.com/horse87/1748999 经过了上面两篇文章的准 ...

  8. 一步一步教你使用云服务器部署爬虫

    一步一步教你使用云服务器部署爬虫 1. 注册阿里云,可以免费试用一个月的云服务器.每天十点0元抢购 2. 点击进入阿里云领取页面 3. 点击管理可以修改密码 4.用显示的公网ip远程登录服务器,默认r ...

  9. JAVA爬虫三种方法

    文章目录 前言 一.JDK 二.HttpClient 三.Jsoup 总结 前言 记录JAVA爬虫三种方式 一.JDK 使用JDK自带的URLConnection实现网络爬虫. public void ...

  10. python爬虫——三步爬得电影天堂电影下载链接,30多行代码即可搞定:

    python爬虫--三步爬得电影天堂电影下载链接,30多行代码即可搞定: 本次我们选择的爬虫对象是:https://www.dy2018.com/index.html 具体的三个步骤:1.定位到202 ...

最新文章

  1. 5-3 最长连续递增子序列 (20分)
  2. 《算法竞赛进阶指南》打卡-基本算法-AcWing 95. 费解的开关:位运算、枚举、递推
  3. JZOJ 3815. 【NOIP2014模拟9.7】克卜勒
  4. 励志!送女儿去厦大读研后,爸爸回家就考了厦大的博士,现在是女儿的“学弟”...
  5. 利用python求解度中心性
  6. 敏捷开发FAQ[转]
  7. python request 库传送formdata_Python Requests库 form-data 上传文件操作
  8. 程序员必读的30本书籍
  9. 常用BUG管理工具系统
  10. Busboy 上传文件到指定目录,并重命名,node.js
  11. 那时的回忆~九州幻想!
  12. 计算机总线拓扑和环形拓扑,网络拓扑结构总线型环形星型,各自的优缺点是什么..._网络编辑_帮考网...
  13. 解决ImportError: cannot import name ‘soft_unicode‘ from ‘markupsafe‘
  14. 前端提高篇(十一)JS进阶8函数参数及arguments
  15. 【2022新版】Java 终极学习路线(文末高清大图)-共计9大模块/6大框架/13个中间件
  16. flutter学习笔记--传递信息
  17. 怎么做网线,网线水晶头接法和线序(图文详解)
  18. 云平台 造就智能家居
  19. 大数据产业链包括哪几个环节,具体包含哪些内容
  20. yolov5-5.0训练模型+瑞芯微rv1126上实现模型部署

热门文章

  1. ZooKeeper 初体验
  2. idea整个项目乱码解决办法
  3. centos7 centos6.5部KVM使用NAT联网并为虚拟机配置firewalld iptables防火墙端口转发...
  4. 一些CFD名词缩写的含义(持续更新中)
  5. AngularJs-指令1
  6. Mac OS X中Apache开启ssl
  7. U-BOOT 编译过程
  8. python语言数字类型字节_Python中 各种数字类型的判别(numerica, digital, decimal)
  9. 腾讯云 python接口_python调用腾讯云短信接口
  10. 频繁刷新页面websocket会报错_代码优化:Node+WebSocket+Vue聊天室