快拿小本本记重点了~

文章目录

  • Scrapy-redis
    • connection.py
    • defaults.py
    • dupefilter.py
    • picklecompat.py
    • pipelines.py
    • queue.py
    • scheduler.py
    • spider.py
    • utils.py

Scrapy-redis

scrapy-redis有两种分布式:爬虫分布式 和 item处理分布式。
分别是由模块scheduler和模块pipelines实现。

connection.py

负责根据setting中配置实例化redis连接。被dupefilter和scheduler调用,所以只要涉及到redis存取的都要使用到这个模块

import six
from scrapy.utils.misc import load_objectfrom . import defaults# Shortcut maps 'setting name' -> 'parmater name'.
#  redis的映射表
SETTINGS_PARAMS_MAP = {'REDIS_URL': 'url','REDIS_HOST': 'host','REDIS_PORT': 'port','REDIS_ENCODING': 'encoding',
}def get_redis_from_settings(settings):# 获取一个redis连接实例# 生成连接redis的参数"""Returns a redis client instance from given Scrapy settings object.This function uses ``get_client`` to instantiate the client and uses``defaults.REDIS_PARAMS`` global as defaults values for the parameters. Youcan override them using the ``REDIS_PARAMS`` setting.Parameters----------settings : SettingsA scrapy settings object. See the supported settings below.Returns-------serverRedis client instance.Other Parameters----------------REDIS_URL : str, optionalServer connection URL.REDIS_HOST : str, optionalServer host.REDIS_PORT : str, optionalServer port.REDIS_ENCODING : str, optionalData encoding.REDIS_PARAMS : dict, optionalAdditional client parameters."""# 浅拷贝,是为了防止params改变,会导致默认REDIS_PARAMS被改变params = defaults.REDIS_PARAMS.copy()# 将 settings中的参数更新到paramsparams.update(settings.getdict('REDIS_PARAMS'))# XXX: Deprecate REDIS_* settings.# 遍历映射表,获取指定的参数for source, dest in SETTINGS_PARAMS_MAP.items():# 优先使用settings中参数val = settings.get(source)# 如果settings中没有进行设置,则params不更新if val:params[dest] = val# Allow ``redis_cls`` to be a path to a class.if isinstance(params.get('redis_cls'), six.string_types):params['redis_cls'] = load_object(params['redis_cls'])return get_redis(**params)# Backwards compatible alias.
from_settings = get_redis_from_settingsdef get_redis(**kwargs):"""Returns a redis client instance.Parameters----------redis_cls : class, optionalDefaults to ``redis.StrictRedis``.url : str, optionalIf given, ``redis_cls.from_url`` is used to instantiate the class.**kwargsExtra parameters to be passed to the ``redis_cls`` class.Returns-------serverRedis client instance."""# 没有redis_cli ,则默认redis连接redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)url = kwargs.pop('url', None) # 判断kwargs有没有urlif url:# 用url连接redisreturn redis_cls.from_url(url, **kwargs)else:# 用字典连接redisreturn redis_cls(**kwargs)

defaults.py

import redis# For standalone use.
# 去重的键名
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
#  定义的存储items的key,spiders是爬虫的名称
PIPELINE_KEY = '%(spider)s:items'
# redis的连接对象,用于连接redis
REDIS_CLS = redis.StrictRedis
# 字符集编码
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
# redis数据库的连接参数
REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING,
}
# 队列的变量名,用于存储爬取的url队列
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
#优先级 队列,用于规定 队列 的 进出方式
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# 用于 去重 的key值,给request加指纹存储的地方
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
# 用于 生成指纹 的类
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 起始 url 对应的类
START_URLS_KEY = '%(name)s:start_urls'
# 起始 url 的类型
START_URLS_AS_SET = False

dupefilter.py

负责执行requst的去重,使用redis的set数据结构。但要注意scheduler并不使用其中用于在这个模块中实现的dupefilter键做request的调度,而是使用queue.py模块中实现的queue。当request不重复时,将其存入到queue中,调度时将其弹出

import logging
import timefrom scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprintfrom . import defaults
from .connection import get_redis_from_settingslogger = logging.getLogger(__name__)#  scrapy 去重是利用集合来实现的
# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.redis 连接实例key : str  存储requests 指纹的地方Redis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests.是否记录过滤的requests"""# 看server是怎么生成的,通过server可获取redis的队列self.server = serverself.key = keyself.debug = debugself.logdupes = True# 类方法传递当前的方法@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""# 获取redis 的连接实例server = get_redis_from_settings(settings)# XXX: This creates one-time key. needed to support to use this# class as standalone dupefilter with scrapy's default scheduler# if scrapy passes spider on open() method this wouldn't be needed# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.# 存取指纹的keykey = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG') # 默认值是false# 传给当前类,并把参数传递给init函数return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request) #  生成一个指纹# This returns the number of values added, zero if already exists.#  将 指纹加入redis ,是一个集合类型# self.server redis 连接实例# self.key 是存储指纹的 key# fp 是 指纹added = self.server.sadd(self.key, fp)# 当 added 为 0 时 ,则说明指纹已经存在,返回 true ,否则返回 Falsereturn added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)@classmethoddef from_spider(cls, spider):settings = spider.settingsserver = get_redis_from_settings(settings)dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)key = dupefilter_key % {'spider': spider.name}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)def close(self, reason=''):# 当爬虫结束时,清空指纹的地方"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)# 生成日志def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request)s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request)s"" - no more duplicates will be shown"" (see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = False

picklecompat.py

这里实现了loads和dumps两个函数,就是实现了一个serializer,因为redis数据库不能存储复杂对象,所以存啥都要先串行化成文本才行。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象,至于为什么不实用json格式,我也不是很懂,item pipeline的串行化默认用的就是json

"""A pickle wrapper module with protocol=-1 by default."""try:import cPickle as pickle  # PY2
except ImportError:import pickle # PY3用的包# 反序列化指 : 将字符串数据转化成json数据
def loads(s):return pickle.loads(s)# 序列化指: 将json数据转化成字符串
def dumps(obj):return pickle.dumps(obj, protocol=-1)

pipelines.py

用来实现分布式处理的作用。它将Item存储在redis中以实现分布式处理。另外可以发现,同样是编写pipelines,在这里的编码实现不同于文章中所分析的情况,由于在这里需要读取配置,所以就用到了from_crawler()函数

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThreadfrom . import connection, defaults# 序列化 字符串
default_serialize = ScrapyJSONEncoder().encode# 用于处理爬虫爬取的数据将数据序列化到redis中
class RedisPipeline(object):"""Pushes serialized item into a redis list/queueSettings--------REDIS_ITEMS_KEY : strRedis key where to store items.REDIS_ITEMS_SERIALIZER : strObject path to serializer function."""def __init__(self, server,key=defaults.PIPELINE_KEY,serialize_func=default_serialize):"""Initialize pipeline.Parameters----------server : StrictRedisRedis client instance.key : strRedis key where to store items.serialize_func : callableItems serializer function."""self.server = serverself.key = keyself.serialize = serialize_func# 将类本身传入函数,用来生成参数和redis连接实例@classmethoddef from_settings(cls, settings):# 生成redis连接实例params = {'server': connection.from_settings(settings),}# 如果 设置中有REDIS_ITEMS_KEY,则就用设置中的if settings.get('REDIS_ITEMS_KEY'):params['key'] = settings['REDIS_ITEMS_KEY']# 如果 设置中有序列化的函数,则优先使用设置中的if settings.get('REDIS_ITEMS_SERIALIZER'):params['serialize_func'] = load_object(settings['REDIS_ITEMS_SERIALIZER'])return cls(**params)@classmethoddef from_crawler(cls, crawler):return cls.from_settings(crawler.settings)# 将 item 传递过来,自动触发这个函数,def process_item(self, item, spider):# 创建一个线程,用于存储 item,也就是 item还没有存储完,下一个item就可以同时存储return deferToThread(self._process_item, item, spider)# 实现存储的函数def _process_item(self, item, spider):key = self.item_key(item, spider)  # 生成 item_keydata = self.serialize(item) # 使用默认序列化函数,将item序列化为字符串self.server.rpush(key, data) # 把数据放到redis里面# self.server是redis的连接实例return item# 用于存储itemdef item_key(self, item, spider):"""Returns redis key based on given spider.Override this function to use a different key depending on the itemand/or spider."""return self.key % {'spider': spider.name}

queue.py

作用和 dupefilter.py一样,但是这里实现了三种方式的queue:FIFO的SpiderQueue,SpiderPriorityQueue,以及LIFI的SpiderStack## queue.py

from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider base queue class"""def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.Parameters----------server : StrictRedisRedis client instance.spider : SpiderScrapy spider instance.key: strRedis key where to put and get messages.serializer : objectSerializer object with ``loads`` and ``dumps`` methods."""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompat# 当序列化没有 laods 函数时,就会抛出异常# 抛出异常的目的:是为了传过来的序列化必须是loads函数if not hasattr(serializer, 'loads'):raise TypeError("serializer does not implement 'loads' function: %r"% serializer)# 当序列化没有dumps函数时,就会抛出异常if not hasattr(serializer, 'dumps'):raise TypeError("serializer '%s' does not implement 'dumps' function: %r"% serializer)# 下面的这些函数当类的多有函数,都可以使用self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializer# 将 requests 进行编码成字符串def _encode_request(self, request):"""Encode a request object"""# 将requests转化成字典obj = request_to_dict(request, self.spider)# 将字典转化字符串并返回return self.serializer.dumps(obj)# 将已经编码的encode_request解码为字典def _decode_request(self, encoded_request):"""Decode an request previously encoded"""# 将dict转换为requestsobjec取出,直接通过下载器进行下载obj = self.serializer.loads(encoded_request)return request_from_dict(obj, self.spider)# 下面len方法、push方法、pop 方法 必须重载,否则不能使用def __len__(self):"""Return the length of the queue"""raise NotImplementedErrordef push(self, request):"""Push a request"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request"""raise NotImplementedError# 删除指定的self.key的值def clear(self):"""Clear queue/stack"""self.server.delete(self.key)# 先进先出
class FifoQueue(Base):"""Per-spider FIFO queue"""# 返回队列长度def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)# 从头插入 requestdef push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""# timeout超时,一般默认为0if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:# 这个是从尾部删除data = self.server.rpop(self.key)if data:# 弹出元素在解码为request直接给下载器进行下载return self._decode_request(data)# 优先级队列
class PriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priority# We don't use zadd method as the order of argument schange depending on# whether the class is Redis or StrictRedis, and the option of using# kwargs only accepts strings, not bytes.# 使用有序集合实现优先级队列self.server.execute_command('ZADD', self.key, score, data)# self.server.zadd(self.server,score,data)def pop(self, timeout=0):"""Pop a requesttimeout not support in this queue class"""# use atomic range/remove using multi/exec# pipeline是 self.server的一个方法# pipe是 实例化函数pipe = self.server.pipeline()pipe.multi()# zrange是从小到大排序后返回第一个值# zremrangebyrank是  删除第一个requestpipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)# 执行上面的语句,删除的同时返回被删除的数据# results 接收的是第一条数据# count 删除的元素,返回值是1或 0results, count = pipe.execute()if results:# 只要有一个元素results是真值# 将获取到的第一个元素,进行解码return self._decode_request(results[0])# 后进先出
class LifoQueue(Base):"""Per-spider LIFO queue."""def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue

scheduler.py

此扩展是对scrapy中自带的scheduler的替代,正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构

import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults# TODO: add SCRAPY_JOB support.
class Scheduler(object):"""Redis-based schedulerSettings--------SCHEDULER_PERSIST : bool (default: False)Whether to persist or clear redis queue.SCHEDULER_FLUSH_ON_START : bool (default: False)Whether to flush redis queue on start.SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)How many seconds to wait before closing if no message is received.SCHEDULER_QUEUE_KEY : strScheduler redis key.SCHEDULER_QUEUE_CLASS : strScheduler queue class.SCHEDULER_DUPEFILTER_KEY : strScheduler dupefilter redis key.SCHEDULER_DUPEFILTER_CLASS : strScheduler dupefilter class.SCHEDULER_SERIALIZER : strScheduler serializer."""def __init__(self, server,persist=False,flush_on_start=False,queue_key=defaults.SCHEDULER_QUEUE_KEY,queue_cls=defaults.SCHEDULER_QUEUE_CLASS,dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,idle_before_close=0,serializer=None):"""Initialize scheduler.Parameters----------server : RedisThe redis server instance.persist : boolWhether to flush requests when closing. Default is False.flush_on_start : boolWhether to flush requests on start. Default is False.queue_key : strRequests queue key.queue_cls : strImportable path to the queue class.dupefilter_key : strDuplicates filter key.dupefilter_cls : strImportable path to the dupefilter class.idle_before_close : intTimeout before giving up."""if idle_before_close < 0:raise TypeError("idle_before_close cannot be negative")self.server = serverself.persist = persistself.flush_on_start = flush_on_startself.queue_key = queue_keyself.queue_cls = queue_clsself.dupefilter_cls = dupefilter_clsself.dupefilter_key = dupefilter_keyself.idle_before_close = idle_before_closeself.serializer = serializerself.stats = Nonedef __len__(self):return len(self.queue)@classmethoddef from_settings(cls, settings):kwargs = {# 将数据持久化'persist': settings.getbool('SCHEDULER_PERSIST'),# 将队列数据清空,默认false'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),# close之前,发布一个idle'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),}# If these values are missing, it means we want to use the defaults.# 映射表optional = {# TODO: Use custom prefixes for this settings to note that are# specific to scrapy-redis.'queue_key': 'SCHEDULER_QUEUE_KEY','queue_cls': 'SCHEDULER_QUEUE_CLASS','dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',# We use the default setting name to keep compatibility.'dupefilter_cls': 'DUPEFILTER_CLASS','serializer': 'SCHEDULER_SERIALIZER',}#  将设置中定义好的键更新进入kwargsfor name, setting_name in optional.items():# 获取settings的参数val = settings.get(setting_name)if val:kwargs[name] = val# Support serializer as a path to a module.if isinstance(kwargs.get('serializer'), six.string_types):kwargs['serializer'] = importlib.import_module(kwargs['serializer'])# redis的连接实例server = connection.from_settings(settings)# 验证是否连接成功# Ensure the connection is working.server.ping()return cls(server=server, **kwargs)@classmethoddef from_crawler(cls, crawler):instance = cls.from_settings(crawler.settings)# FIXME: for now, stats are only supported from this constructorinstance.stats = crawler.statsreturn instancedef open(self, spider):self.spider = spidertry:self.queue = load_object(self.queue_cls)(server=self.server,spider=spider,key=self.queue_key % {'spider': spider.name},serializer=self.serializer,)except TypeError as e:raise ValueError("Failed to instantiate queue class '%s': %s",self.queue_cls, e)self.df = load_object(self.dupefilter_cls).from_spider(spider)if self.flush_on_start:self.flush()# notice if there are requests already in the queue to resume the crawlif len(self.queue):spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))def close(self, reason):if not self.persist:self.flush()def flush(self):self.df.clear()self.queue.clear()# 入队函数def enqueue_request(self, request):# self.df.request_seen(request) 返回的bool值,返回true代表request存在# not request.dont_filter 默认返回时true# 当我们选择是过滤而且request 已经进入队列 ,返回一个falseif not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return False# 默认 noneif self.stats:self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)self.queue.push(request)return True# 出队函数def next_request(self):block_pop_timeout = self.idle_before_close# 弹出一条数据request = self.queue.pop(block_pop_timeout)if request and self.stats:self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)# 返回request给引擎,引擎给下载器,进行下载网页return requestdef has_pending_requests(self):return len(self) > 0

spider.py

spider从redis中读取要爬的url,然后执行爬取,若爬取过程中返回更多的url,那么继续进行直至所有的request完成。之后继续从redis中读取url,循环这个过程

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpiderfrom . import connection, defaults
from .utils import bytes_to_strclass RedisMixin(object):"""Mixin class to implement reading urls from a redis queue."""redis_key = None  # 在redis里起始url对应keyredis_batch_size = None # 容量redis_encoding = None # 字符集编码# Redis client placeholder.server = None# 重写start_requests方法def start_requests(self):"""Returns a batch of start requests from redis."""return self.next_requests()def setup_redis(self, crawler=None):"""Setup redis connection and idle signal.This should be called after the spider has set its crawler object."""if self.server is not None:returnif crawler is None:# We allow optional crawler argument to keep backwards# compatibility.# XXX: Raise a deprecation warning.crawler = getattr(self, 'crawler', None)if crawler is None:raise ValueError("crawler is required")settings = crawler.settingsif self.redis_key is None:self.redis_key = settings.get('REDIS_START_URLS_KEY', defaults.START_URLS_KEY,)# 起始url的键名self.redis_key = self.redis_key % {'name': self.name}if not self.redis_key.strip():raise ValueError("redis_key must not be empty")if self.redis_batch_size is None:# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).self.redis_batch_size = settings.getint('REDIS_START_URLS_BATCH_SIZE',settings.getint('CONCURRENT_REQUESTS'),)try:self.redis_batch_size = int(self.redis_batch_size)except (TypeError, ValueError):raise ValueError("redis_batch_size must be an integer")if self.redis_encoding is None:self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)self.logger.info("Reading start URLs from redis key '%(redis_key)s' ""(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",self.__dict__)# redis的连接实例self.server = connection.from_settings(crawler.settings)# The idle signal is called when the spider has no requests left,# that's when we will schedule new requests from redis queuecrawler.signals.connect(self.spider_idle, signal=signals.spider_idle)def next_requests(self):"""Returns a request to be scheduled or none."""# 默认使用redis_keys,否则是集合use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)# 从redis数据库里面取出起始url# use_set=false 返回 self.server.lpop(列表数据类型)# use_set= true,返回 self.server.spop (集合类型)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:# 从数据库中取出起始url数据,返回一个列表data = fetch_one(self.redis_key)if not data:# Queue empty.break# 取出urlreq = self.make_request_from_data(data)if req:yield req # 把req给Requestfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)# 返回的是requestss实例,通过 来自redis的data数据def make_request_from_data(self, data):"""Returns a Request instance from data coming from Redis.By default, ``data`` is an encoded URL. You can override this method toprovide your own message decoding.Parameters----------data : bytesMessage from redis."""# 将获取到的byte类型数据转化为字符串url = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(url)def schedule_next_requests(self):"""Schedules a request if available"""# TODO: While there is capacity, schedule a batch of redis requests.for req in self.next_requests():self.crawler.engine.crawl(req, spider=self)def spider_idle(self):"""Schedules a request if available, otherwise waits."""# XXX: Handle a sentinel to close the spider.self.schedule_next_requests()raise DontCloseSpiderclass RedisSpider(RedisMixin, Spider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: False)Use SET operations to retrieve messages from the redis queue. If False,the messages are retrieve using the LPOP command.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return objclass RedisCrawlSpider(RedisMixin, CrawlSpider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: True)Use SET operations to retrieve messages from the redis queue.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return obj

utils.py

import sixdef bytes_to_str(s, encoding='utf-8'):"""Returns a str if a bytes object is given."""# 将bytes类型转化为字符串if six.PY3 and isinstance(s, bytes):return s.decode(encoding)return s

scrapy-redis源码解析相关推荐

  1. Redis源码解析——双向链表

    相对于之前介绍的字典和SDS字符串库,Redis的双向链表库则是非常标准的.教科书般简单的库.但是作为Redis源码的一部分,我决定还是要讲一讲的.(转载请指明出于breaksoftware的csdn ...

  2. Redis源码解析——字典基本操作

    有了<Redis源码解析--字典结构>的基础,我们便可以对dict的实现进行展开分析.(转载请指明出于breaksoftware的csdn博客) 创建字典 一般字典创建时,都是没有数据的, ...

  3. Redis源码解析——内存管理

    在<Redis源码解析--源码工程结构>一文中,我们介绍了Redis可能会根据环境或用户指定选择不同的内存管理库.在linux系统中,Redis默认使用jemalloc库.当然用户可以指定 ...

  4. Redis源码解析——前言

    今天开启Redis源码的阅读之旅.对于一些没有接触过开源代码分析的同学来说,可能这是一件很麻烦的事.但是我总觉得做一件事,不管有多大多难,我们首先要在战略上蔑视它,但是要在战术上重视它.除了一些高大上 ...

  5. Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式

    Redis源码解析(1) 动态字符串与链表 Redis源码解析(2) 字典与迭代器 Redis源码解析(3) 跳跃表 Redis源码解析(4) 整数集合 Redis源码解析(5) 压缩列表 Redis ...

  6. Redis源码解析——有序整数集

    有序整数集是Redis源码中一个以大尾(big endian)形式存储,由小到大排列且无重复的整型集合.它存储的类型包括16位.32位和64位的整型数.在介绍这个库的实现前,我们还需要先熟悉下大小尾内 ...

  7. Redis源码解析——Zipmap

    本文介绍的是Redis中Zipmap的原理和实现.(转载请指明出于breaksoftware的csdn博客) 基础结构 Zipmap是为了实现保存Pair(String,String)数据的结构,该结 ...

  8. Redis源码解析——字典结构

    C++语言中有标准的字典库,我们可以通过pair(key,value)的形式存储数据.但是C语言中没有这种的库,于是就需要自己实现.本文讲解的就是Redis源码中的字典库的实现方法.(转载请指明出于b ...

  9. Redis源码解析——字典遍历

    之前两篇博文讲解了字典库的基础,本文将讲解其遍历操作.之所以将遍历操作独立成一文来讲,是因为其中的内容和之前的基本操作还是有区别的.特别是高级遍历一节介绍的内容,充满了精妙设计的算法智慧.(转载请指明 ...

  10. Redis源码解析(1)——源码目录介绍

    概念 redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合)和zset(有序集合).这些 ...

最新文章

  1. SQLserver安全设置攻略
  2. 【Qt】QtCreator中配置clang-format
  3. 04741计算机网络原理知识点,04741计算机网络原理知识点整理.doc
  4. sql server创建新用户名登录以及为表添加角色和权限的多种方法
  5. 【机器学习】太强了!这个建模神器可以玩一辈子
  6. 静态创意和动态创意_我在22岁时学到的关于创意指导的知识
  7. 计算机上播放时没声音什么故障,事实:在笔记本电脑上播放歌曲时如果没有声音怎么办...
  8. mysql mydumper_MySQL 之mydumper安装详解
  9. 谷歌cloud_通过使用Google Cloud ML大规模提供机器学习模型,我们学到了什么
  10. 优麒麟桌面闪烁_稳定性持续增强,优麒麟 19.10.1 发布
  11. 数据流InputStream转字符串
  12. wps设置页码,从某一页重新开始编号
  13. Qt笔记(六十三)之Qt实现窗口以及控件的全屏效果
  14. 联想扬天ACPI\LBAI0100未知设备的处理方法 此驱动为电源管理
  15. matlab p文件转码 matlab pcode文件 将matlab中的p文件转为m文件工具
  16. 【LeetCode】买卖股票的最佳时机含手续费 [M](动态规划)
  17. 黑客攻防技术宝典(十八)
  18. 山东科技大学OJ题库 1013-多少张钞票
  19. 高精度气体压力控制中TESCOM ER5000及其配套背压阀的国产化替代案例分析
  20. php 递归递实现无限层级

热门文章

  1. 邮箱不可用 550 User has no permission
  2. ip netns的使用及network namespace 简介
  3. 解密 Go interface 的类型转换原理
  4. Laravel最佳实践--API请求频率限制(Throttle中间件)
  5. wireshark抓组播数据_HCIE学习笔记--组播路由协议PIM-DM工作机制解析
  6. Netty Reactor线程模型与EventLoop详解
  7. C++利用栈实现计算器
  8. Java SAO操作-使用lambda代替字符串
  9. 立即执行函数(IIFE)闭包
  10. 转载:GCC 提供的原子操作