queue.py

该文件实现了几个容器类,可以看这些容器和redis交互频繁,同时使用了我们上边picklecompat中定义的序列化器。这个文件实现的几个容器大体相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。

从SpiderQueue的实现看出来,他的push函数就和其他容器的一样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(因为request对象实在是比较复杂,有方法有属性不好串行化),之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最早进去的那个,于是就先进先出了。 这些容器类都会作为scheduler调度request的容器,scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上,但是,因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名加queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。

from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider queue/stack base class"""def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.Parameters:server -- redis connectionspider -- spider instancekey -- key for this queue (e.g. "%(spider)s:queue")"""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError("serializer does not implement 'loads' function: %r"% serializer)if not hasattr(serializer, 'dumps'):raise TypeError("serializer '%s' does not implement 'dumps' function: %r"% serializer)self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):"""Encode a request object"""obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):"""Decode an request previously encoded"""obj = self.serializer.loads(encoded_request)return request_from_dict(obj, self.spider)def __len__(self):"""Return the length of the queue"""raise NotImplementedErrordef push(self, request):"""Push a request"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request"""raise NotImplementedErrordef clear(self):"""Clear queue/stack"""self.server.delete(self.key)class SpiderQueue(Base):"""Per-spider FIFO queue"""def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data)class SpiderPriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priority# We don't use zadd method as the order of arguments change depending on# whether the class is Redis or StrictRedis, and the option of using# kwargs only accepts strings, not bytes.self.server.execute_command('ZADD', self.key, score, data)def pop(self, timeout=0):"""Pop a requesttimeout not support in this queue class"""# use atomic range/remove using multi/execpipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute()if results:return self._decode_request(results[0])class SpiderStack(Base):"""Per-spider stack"""def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)__all__ = ['SpiderQueue', 'SpiderPriorityQueue', 'SpiderStack']

源码分析参考:Queue相关推荐

  1. 源码分析参考:Spider

    spider.py 设计的这个spider从redis中读取要爬的url,然后执行爬取,若爬取过程中返回更多的url,那么继续进行直至所有的request完成.之后继续从redis中读取url,循环这 ...

  2. Java 容器源码分析之Queue

    简介 Queue是一种很常见的数据结构类型,在java里面Queue是一个接口,它只是定义了一个基本的Queue应该有哪些功能规约.实际上有多个Queue的实现,有的是采用线性表实现,有的基于链表实现 ...

  3. 源码分析参考:Scheduler

    scheduler.py 此扩展是对scrapy中自带的scheduler的替代(在settings的SCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度.其利用的数据结构来自 ...

  4. 源码分析参考:Pipelines

    pipelines.py 这是是用来实现分布式处理的作用.它将Item存储在redis中以实现分布式处理.由于在这里需要读取配置,所以就用到了from_crawler()函数. from scrapy ...

  5. 源码分析参考:Dupefilter

    dupefilter.py 负责执行requst的去重,实现的很有技巧性,使用的Redis的设定数据结构.但是注意调度并不使用其中用于在这个模块中实现的dupefilter键做请求的调度,而是使用qu ...

  6. libuv 源码分析 —— 1.queue

    定义指针数组类型 typedef void *QUEUE[2]; 使用 QUEUE q; // 相当于 void *q[2] 定义基本操作 #define QUEUE_NEXT(q) (*(QUEUE ...

  7. 源码分析参考:Connection

    官方站点:https://github.com/rolando/scrapy-redis scrapy-redis的官方文档写的比较简洁,没有提及其运行原理,所以如果想全面的理解分布式爬虫的运行原理, ...

  8. PPP协议工作流程,结合ppp-2.4.9 源码分析

    ppp-2.4.9 源码分析 文章目录 ppp-2.4.9 源码分析 PPP协议工作流程 ppp-2.4.9 源码分析 全局变量和结构体说明 第一阶段 初始化 第二阶段 开始链接 第三阶段 建立PPP ...

  9. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

最新文章

  1. 如何阅读微控制器数据手册:探索硬件 ?
  2. LSM 自适应信号处理代码
  3. C++/Cli中事件对象处理函数的添加与删除
  4. VTK:可视化之VectorField
  5. 新一代球王!日本推AI篮球机器人,命中率接近100%!
  6. java hibernate的使用_《Hibernate快速开始 – 4 – 使用JAVA持久层 API (JPA)教程》
  7. MS-Sqlserver2008建立维护计划执行备份任务
  8. 基于Bootstrap的Asp.net Mvc 分页的实现(转)
  9. vcglib中面自相交的检测算法
  10. java用信号量写理发师_课内资源 - 基于Java实现的生产者与消费者问题、读者写者问题、哲学家进餐问题、理发师睡觉问题、医生看病问题...
  11. 工作流软件是未来web的支柱
  12. 很好听,可没机会跟你分享
  13. 基于Javaweb的图书馆管理系统设计与实现(开题报告+论文).doc
  14. 新手如果写一个软件,应该是怎么一个流程?
  15. ionic3 添加蒙版,弹出悬浮框
  16. DataBinding找不到符号,import xxx.xxx.ActivityxxxBindingImpl
  17. CPP-week thirteen
  18. rust怎么存水_rust怎么装水 | 手游网游页游攻略大全
  19. java 计算日出日落时间
  20. 利用Python进行数据分析(四):数据加载、存储与文件格式

热门文章

  1. 实践练习四:迁移 MySQL 数据到 OceanBase 集群
  2. 【翻译】ANDROID KTX – 使用Kotlin进行Android开发
  3. 荣耀发布了全球首款 4800 万像素手机,并推出 YOYO 智能音箱...
  4. LINUX负载均衡LVS-NAT搭建
  5. 高老师架构设计思考短句集(1)
  6. 转载:Android Display架构分析--侧重高通平台
  7. asp.net Cache缓存定时更新数据
  8. 2005年2月24日(星期四) 中午,晴+煙 - Central Incubator。
  9. Oracle结果集缓存(Result Cache)--服务器、客户端、函数缓存
  10. IntelliJ IDEA下载插件超时--解决方法