源码分析参考:Queue
queue.py
该文件实现了几个容器类,可以看这些容器和redis交互频繁,同时使用了我们上边picklecompat中定义的序列化器。这个文件实现的几个容器大体相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。
从SpiderQueue的实现看出来,他的push函数就和其他容器的一样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(因为request对象实在是比较复杂,有方法有属性不好串行化),之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最早进去的那个,于是就先进先出了。 这些容器类都会作为scheduler调度request的容器,scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上,但是,因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名加queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。
from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider queue/stack base class"""def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.Parameters:server -- redis connectionspider -- spider instancekey -- key for this queue (e.g. "%(spider)s:queue")"""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError("serializer does not implement 'loads' function: %r"% serializer)if not hasattr(serializer, 'dumps'):raise TypeError("serializer '%s' does not implement 'dumps' function: %r"% serializer)self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):"""Encode a request object"""obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):"""Decode an request previously encoded"""obj = self.serializer.loads(encoded_request)return request_from_dict(obj, self.spider)def __len__(self):"""Return the length of the queue"""raise NotImplementedErrordef push(self, request):"""Push a request"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request"""raise NotImplementedErrordef clear(self):"""Clear queue/stack"""self.server.delete(self.key)class SpiderQueue(Base):"""Per-spider FIFO queue"""def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data)class SpiderPriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priority# We don't use zadd method as the order of arguments change depending on# whether the class is Redis or StrictRedis, and the option of using# kwargs only accepts strings, not bytes.self.server.execute_command('ZADD', self.key, score, data)def pop(self, timeout=0):"""Pop a requesttimeout not support in this queue class"""# use atomic range/remove using multi/execpipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute()if results:return self._decode_request(results[0])class SpiderStack(Base):"""Per-spider stack"""def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)__all__ = ['SpiderQueue', 'SpiderPriorityQueue', 'SpiderStack']
源码分析参考:Queue相关推荐
- 源码分析参考:Spider
spider.py 设计的这个spider从redis中读取要爬的url,然后执行爬取,若爬取过程中返回更多的url,那么继续进行直至所有的request完成.之后继续从redis中读取url,循环这 ...
- Java 容器源码分析之Queue
简介 Queue是一种很常见的数据结构类型,在java里面Queue是一个接口,它只是定义了一个基本的Queue应该有哪些功能规约.实际上有多个Queue的实现,有的是采用线性表实现,有的基于链表实现 ...
- 源码分析参考:Scheduler
scheduler.py 此扩展是对scrapy中自带的scheduler的替代(在settings的SCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度.其利用的数据结构来自 ...
- 源码分析参考:Pipelines
pipelines.py 这是是用来实现分布式处理的作用.它将Item存储在redis中以实现分布式处理.由于在这里需要读取配置,所以就用到了from_crawler()函数. from scrapy ...
- 源码分析参考:Dupefilter
dupefilter.py 负责执行requst的去重,实现的很有技巧性,使用的Redis的设定数据结构.但是注意调度并不使用其中用于在这个模块中实现的dupefilter键做请求的调度,而是使用qu ...
- libuv 源码分析 —— 1.queue
定义指针数组类型 typedef void *QUEUE[2]; 使用 QUEUE q; // 相当于 void *q[2] 定义基本操作 #define QUEUE_NEXT(q) (*(QUEUE ...
- 源码分析参考:Connection
官方站点:https://github.com/rolando/scrapy-redis scrapy-redis的官方文档写的比较简洁,没有提及其运行原理,所以如果想全面的理解分布式爬虫的运行原理, ...
- PPP协议工作流程,结合ppp-2.4.9 源码分析
ppp-2.4.9 源码分析 文章目录 ppp-2.4.9 源码分析 PPP协议工作流程 ppp-2.4.9 源码分析 全局变量和结构体说明 第一阶段 初始化 第二阶段 开始链接 第三阶段 建立PPP ...
- SpringMVC异常处理机制详解[附带源码分析]
SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...
最新文章
- 如何阅读微控制器数据手册:探索硬件 ?
- LSM 自适应信号处理代码
- C++/Cli中事件对象处理函数的添加与删除
- VTK:可视化之VectorField
- 新一代球王!日本推AI篮球机器人,命中率接近100%!
- java hibernate的使用_《Hibernate快速开始 – 4 – 使用JAVA持久层 API (JPA)教程》
- MS-Sqlserver2008建立维护计划执行备份任务
- 基于Bootstrap的Asp.net Mvc 分页的实现(转)
- vcglib中面自相交的检测算法
- java用信号量写理发师_课内资源 - 基于Java实现的生产者与消费者问题、读者写者问题、哲学家进餐问题、理发师睡觉问题、医生看病问题...
- 工作流软件是未来web的支柱
- 很好听,可没机会跟你分享
- 基于Javaweb的图书馆管理系统设计与实现(开题报告+论文).doc
- 新手如果写一个软件,应该是怎么一个流程?
- ionic3 添加蒙版,弹出悬浮框
- DataBinding找不到符号,import xxx.xxx.ActivityxxxBindingImpl
- CPP-week thirteen
- rust怎么存水_rust怎么装水 | 手游网游页游攻略大全
- java 计算日出日落时间
- 利用Python进行数据分析(四):数据加载、存储与文件格式
热门文章
- 实践练习四:迁移 MySQL 数据到 OceanBase 集群
- 【翻译】ANDROID KTX – 使用Kotlin进行Android开发
- 荣耀发布了全球首款 4800 万像素手机,并推出 YOYO 智能音箱...
- LINUX负载均衡LVS-NAT搭建
- 高老师架构设计思考短句集(1)
- 转载:Android Display架构分析--侧重高通平台
- asp.net Cache缓存定时更新数据
- 2005年2月24日(星期四) 中午,晴+煙 - Central Incubator。
- Oracle结果集缓存(Result Cache)--服务器、客户端、函数缓存
- IntelliJ IDEA下载插件超时--解决方法