celery 可视化_Celery部署爬虫(三)
今天就来点比较有意思的东西
面前两篇充其量就是 Celery 的入门级,接下来就深入编写 Celery 框架,让 Celery 更 加健壮。
首先是定时任务,上文的编写的定时任务是在 config 文件里进行配置的,其实在Celery中提供了一系列的装饰器,比如前面说的 @app.task 等等,来看看如何使用装饰器来实现 app 实例中的定时任务
# 定时任务在文件中的写法
# -*- coding:utf-8 -*-
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)# 通过beat组件周期性将任务发送给woker执行。在示例中 新建文件period_task.py 并添加任务到配置文件中
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒执行addsender.add_periodic_task(crontab(hour=16, minute=56, day_of_week=1), #每周一下午四点五十六执行sayhaisayhi.s('wd'),name='say_hi')
也就是说通过方法调用的形式来执行定时任务
一般而言,这种格式需要在配置 Beat 中无法达到逻辑需求的时候使用,比较少见。
如果在 config 配置文件中编写了定时任务并希望 Celery Worker 服务进程的同时启动 Beat 模块,需要加入 -B 参数
celery -A haha worker -B -l info
关于路由指定的队列
一般是一个 task 对应一个队列 各个队列相互独立 可以执行不同的操作 比如taskA执行即时任务,taskB执行定时 延时任务等等 需要的是在 config 文件里声明
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
Queue("for_max",Exchange("for_max"),routing_key="for_max")
)
# 路由
CELERY_ROUTES = {'tasks.add':{"queue":"for_add","routing_key":"for_add"},
'tasks.max':{"queue":"for_max","routing_key":"for_max"}
}# 路由指明队列和routing_key
# routing_key认证所在的队列 用redis名称要一致(对于redis来说)
指定 CELERY_QUEUES 和 CELERY_ROUTES 选择要执行的任务,这个玩意在分布式任务上还是很方便的,下面说。
自定义Task
如果在Celery中想自定义 Task 基类,那就需要继承 Task 模块
# coding=utf-8
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery import Task
logger = get_task_logger(__name__)class demotask(Task): # 这是三种运行的状态def on_success(self, retval, task_id, args, kwargs): # 任务成功执行logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))def on_failure(self, exc, task_id, args, kwargs, einfo): #任务失败执行logger.info('task id:{} , arg:{} , failed ! error : {}' .format(task_id,args,exc))def on_retry(self, exc, task_id, args, kwargs, einfo): #任务重试执行logger.info('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))@app.task(base=demotask) # 需要继承这个基类
def func1(x,y):try:a=[]a[10] =1except Exception as e:logger.info(e)return x+y@app.task(base=demotask)
def func2(name):a=[]a[10] =1return 'hi {}'.format(name)@app.task(base=demotask)
def func3(a,b):return 'result = {} '.format(a+b)
这算是复写了 Task 模块,定义了几种状态,这个可以根据需求,只要在 task 中声明要继承的自定义类即可。
如果想要深入了解 Task 基类,可以查看 Celery 源码,这就不废话了。
PS:当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置)
@app.task
@decorator2
@decorator1
def add(x, y):return x + y
另外在Celery中还提供了自带的 logger 日志模块,可以这么用
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)@app.task
def add(x, y):logger.info('Adding {0} + {1}'.format(x, y))return x + y
Celery的可视化监控
Celery Flower 是一款Celery官方推荐使用的监控工具
能够实时监控 celery 的 Worker Tasks Borker Result等服务
安装也是极其简单
pip install flower
运行服务
celery flower --broker=redis://localhost:6379 --address=127.0.0.1 --port=5555或者flower -A proj --broker=redis://localhost:6379
这样就可以访问 Flower Web 了,浏览器访问 http: //127.0.0.1:5555 查看运行的服务
这样一来,在 Celery部署爬虫(一)的百度百科的爬虫就可以重新编写了
鬼子口音:Celery部署爬虫(一)zhuanlan.zhihu.com
Config.py文件
# config.py
from __future__ import absolute_import
# broker
import datetime
BROKER_URL = 'redis://127.0.0.1:6379/0'
# backen
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017'
# 导入任务,如tasks.py
CELERY_IMPORTS = ('task', )
# 列化任务载荷的默认的序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'CELERY_ACCEPT_CONTENT = ['json']CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'
# CELERY_ENABLE_UTC = True
Celery.py文件
# Celery.py
# coding=utf-8
from __future__ import absolute_import
from celery import Celeryapp = Celery("STQ")
# 加载配置模块
app.config_from_object('config')if __name__ == '__main__':app.start()
Task.py 文件
# coding=utf-8
from __future__ import absolute_import
import re
import requests
from lxml import etree
import urllib.request
from Celery import appheaders ={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}def get_html(url):r = urllib.request.Request(url,headers=headers)res = urllib.request.urlopen(r)return res.read().decode("utf-8")@app.task
def parse(html):ret = re.compile(r'<title>(.*?)</title>')title = re.findall(ret,html)return title
Run_task.py 文件
from __future__ import absolute_import
from tasks import get_html,parsewith open("url.txt","r") as f:for data in f.readlines():url = data.strip('n')html = get_html(url)result = parse.delay(html)
黑窗口键入
celery -A Celery worker -l info -P gevent -c 10
然后发布任务让它从消息队列中消费
python run_task.py
这样一来。所有的参数配置都可以在 config 文件中进行配置了
关于路由 队列的使用
CELERY_QUEUES 设置一个指定 routing_key 的队列
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
)
CELERY_ROUTES 设置路由
通过 routing_key 来关联上面的 CELERY_QUEUES,所以这里的 routing_key 需要和上面参数的一致。
# 路由
CELERY_ROUTES = {'tasks.add':{"queue":"for_add","routing_key":"for_add"},
}
tasks.add 表示在 tasks.py 的函数文件的 add 方法属于哪个路由,而这个路由关联着哪个队列。也就是说声明哪个任务放入哪个队列
如果开启了这个路由,那么此任务就会被执行,反之则不会。
那么如果没有指定队列的方法要怎么执行呢?
celery -A Celery worker -l info -n add -Q for_add -P gevent -c 10
可以在命令中去指定某个任务对应某个队列,所以,发布出去的任务会通过路由关联到指定队列,不同 worker 会从不同的队列来消费任务。
而且每个队列都是相互独立的,这样一来,每个任务之间就不会相互影响了,即时任务,定时任务就可以有明确的分工。
分布式集群
接下来就该说说 Celery 分布式,基本的架构就是通过中间件来共享消息队列。
事实上分布式架构的核心无非就是机器间的通信,而一般的分布式爬虫架构都会体现在共享数据库队列,Redis 和 RabbitMQ 就是典型的消息队列。
比如通过redis来配置共享中间件
redis://ip:6379/0
比如现在有 Master, Slave1, Slave2三台主机,使用Master编写主配置文件,celery主文件,任务和调度文件。
然后拷贝两份分别放到两台Slave里去,在黑窗口中分别键入
celery -A Celery worker -l info
可以看到,在终端中服务器之间已经相互连通
然后发布任务
python run_haha.py
本来监听就监听着 redis队列 的机器就会进行任务消费
需要知道的一点就是,运行队列里的任务的时候,每个机器真正调用的是自身任务文件里面的任务函数,只要该函数存在于队列之中。
然后就和一般的分布式没啥两样了。
好了,今天就这样了
但是,我在想,是不是应该来点刺激的
欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
个人技术博客www.gzky.live
celery 可视化_Celery部署爬虫(三)相关推荐
- 单机 docker 部署fastfds_云服务器使用docker可视化一键部署Wrodpress个人博客,操作简单,适合小白...
原文链接在我的博客: 教你云服务器使用docker可视化一键部署Wrodpress个人博客,操作简单,适合小白 - Kyellow's blogkyellow.gitee.io 前段时间领取了一台云 ...
- 用python爬取基金网信息数据,保存到表格,并做成四种简单可视化。(爬虫之路,永无止境!)
用python爬取基金网信息数据,保存到表格,并做成四种简单可视化.(爬虫之路,永无止境!) 上次 2021-07-07写的用python爬取腾讯招聘网岗位信息保存到表格,并做成简单可视化. 有的人留 ...
- Kubernetes部署(三):CA证书制作
相关内容: Kubernetes部署(一):架构及功能说明 Kubernetes部署(二):系统环境初始化 Kubernetes部署(三):CA证书制作 Kubernetes部署(四):ETCD集群部 ...
- scrapyd部署爬虫项目到LINUX服务器
1,首先把scrapy爬虫项目上传到服务器 我的服务器架设在公司内网里.所以在这里使用WinSCP作为上传工具. 2,修改项目的配置文件scrapy.cfg 给爬虫项目指定分组,具体方法是在deplo ...
- Lync Server 2010的部署系列(三) lync批量导入用户联系人
Lync Server 2010的部署系列(三) lync批量导入用户联系人 一.批量导入原理介绍 二.导入联系人操作指南 一.批量导入原理介绍 (介绍摘自http://ucworld.blog.51 ...
- Zabbix 3.0 部署监控 [三]
Zabbix 3.0 部署监控 [三] zabbix 时间:2016年9月22日 笔者QQ:381493251 Abcdocker交流群:454666672 如果遇到什么问题可以进群询问,我们 ...
- Exchange Server 2016 独立部署/共存部署 (三)—— 安装Exchange程序
Exchange Server 2016 独立部署/共存部署 (三)-- 安装Exchange程序 https://blog.51cto.com/horse87/1748999 经过了上面两篇文章的准 ...
- 一步一步教你使用云服务器部署爬虫
一步一步教你使用云服务器部署爬虫 1. 注册阿里云,可以免费试用一个月的云服务器.每天十点0元抢购 2. 点击进入阿里云领取页面 3. 点击管理可以修改密码 4.用显示的公网ip远程登录服务器,默认r ...
- JAVA爬虫三种方法
文章目录 前言 一.JDK 二.HttpClient 三.Jsoup 总结 前言 记录JAVA爬虫三种方式 一.JDK 使用JDK自带的URLConnection实现网络爬虫. public void ...
- python爬虫——三步爬得电影天堂电影下载链接,30多行代码即可搞定:
python爬虫--三步爬得电影天堂电影下载链接,30多行代码即可搞定: 本次我们选择的爬虫对象是:https://www.dy2018.com/index.html 具体的三个步骤:1.定位到202 ...
最新文章
- 5-3 最长连续递增子序列 (20分)
- 《算法竞赛进阶指南》打卡-基本算法-AcWing 95. 费解的开关:位运算、枚举、递推
- JZOJ 3815. 【NOIP2014模拟9.7】克卜勒
- 励志!送女儿去厦大读研后,爸爸回家就考了厦大的博士,现在是女儿的“学弟”...
- 利用python求解度中心性
- 敏捷开发FAQ[转]
- python request 库传送formdata_Python Requests库 form-data 上传文件操作
- 程序员必读的30本书籍
- 常用BUG管理工具系统
- Busboy 上传文件到指定目录,并重命名,node.js
- 那时的回忆~九州幻想!
- 计算机总线拓扑和环形拓扑,网络拓扑结构总线型环形星型,各自的优缺点是什么..._网络编辑_帮考网...
- 解决ImportError: cannot import name ‘soft_unicode‘ from ‘markupsafe‘
- 前端提高篇(十一)JS进阶8函数参数及arguments
- 【2022新版】Java 终极学习路线(文末高清大图)-共计9大模块/6大框架/13个中间件
- flutter学习笔记--传递信息
- 怎么做网线,网线水晶头接法和线序(图文详解)
- 云平台 造就智能家居
- 大数据产业链包括哪几个环节,具体包含哪些内容
- yolov5-5.0训练模型+瑞芯微rv1126上实现模型部署
热门文章
- ZooKeeper 初体验
- idea整个项目乱码解决办法
- centos7 centos6.5部KVM使用NAT联网并为虚拟机配置firewalld iptables防火墙端口转发...
- 一些CFD名词缩写的含义(持续更新中)
- AngularJs-指令1
- Mac OS X中Apache开启ssl
- U-BOOT 编译过程
- python语言数字类型字节_Python中 各种数字类型的判别(numerica, digital, decimal)
- 腾讯云 python接口_python调用腾讯云短信接口
- 频繁刷新页面websocket会报错_代码优化:Node+WebSocket+Vue聊天室