scrapy-redis 使 redis 不止保存 url:https://my.oschina.net/u/4382439/blog/3712637

Scrapy-redis 和 Scrapyd 用法详解:https://zhuanlan.zhihu.com/p/44564597

Scrapy-redis GitHub 地址:https://github.com/rmax/scrapy-redis

scrapy-redis中url队列类型的控制(zset、list):https://blog.csdn.net/ryuhfxz/article/details/85782467

先看 scrapy-redis 源码 ( spider.py ):

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpiderfrom . import connection, defaults
from .utils import bytes_to_strclass RedisMixin(object):"""Mixin class to implement reading urls from a redis queue."""redis_key = Noneredis_batch_size = Noneredis_encoding = None# Redis client placeholder.server = Nonedef start_requests(self):"""Returns a batch of start requests from redis."""return self.next_requests()def setup_redis(self, crawler=None):"""Setup redis connection and idle signal.This should be called after the spider has set its crawler object."""if self.server is not None:returnif crawler is None:# We allow optional crawler argument to keep backwards# compatibility.# XXX: Raise a deprecation warning.crawler = getattr(self, 'crawler', None)if crawler is None:raise ValueError("crawler is required")settings = crawler.settingsif self.redis_key is None:self.redis_key = settings.get('REDIS_START_URLS_KEY', defaults.START_URLS_KEY,)self.redis_key = self.redis_key % {'name': self.name}if not self.redis_key.strip():raise ValueError("redis_key must not be empty")if self.redis_batch_size is None:# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).self.redis_batch_size = settings.getint('REDIS_START_URLS_BATCH_SIZE',settings.getint('CONCURRENT_REQUESTS'),)try:self.redis_batch_size = int(self.redis_batch_size)except (TypeError, ValueError):raise ValueError("redis_batch_size must be an integer")if self.redis_encoding is None:self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)self.logger.info("Reading start URLs from redis key '%(redis_key)s' ""(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",self.__dict__)self.server = connection.from_settings(crawler.settings)# The idle signal is called when the spider has no requests left,# that's when we will schedule new requests from redis queuecrawler.signals.connect(self.spider_idle, signal=signals.spider_idle)def next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:data = fetch_one(self.redis_key)if not data:# Queue empty.print('task is none')breakreq = self.make_request_from_data(data)if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def make_request_from_data(self, data):"""Returns a Request instance from data coming from Redis.By default, ``data`` is an encoded URL. You can override this method toprovide your own message decoding.Parameters----------data : bytesMessage from redis."""url = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(url)def schedule_next_requests(self):"""Schedules a request if available"""# TODO: While there is capacity, schedule a batch of redis requests.for req in self.next_requests():self.crawler.engine.crawl(req, spider=self)def spider_idle(self):"""Schedules a request if available, otherwise waits."""# XXX: Handle a sentinel to close the spider.self.schedule_next_requests()raise DontCloseSpiderclass RedisSpider(RedisMixin, Spider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: False)Use SET operations to retrieve messages from the redis queue. If False,the messages are retrieve using the LPOP command.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return objclass RedisCrawlSpider(RedisMixin, CrawlSpider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: True)Use SET operations to retrieve messages from the redis queue.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return obj

方法 1:

仔细看完的话会发现:make_request_from_data(self, data) 这个方法是从 redis 中返回一个请求实例,默认是一个 url 接下来重写一下这个方法直接传入到 self.make_requests_from_url 一个 json 串就好了,在这个方法里面可以把这个串解析了请求 url 或者生产 url。代码如下

# -*- coding: utf-8 -*-from scrapy.http import Request
from scrapy_redis.utils import bytes_to_str
from scrapy_redis.spiders import RedisSpiderclass MySpider(RedisSpider):name = 'my_spider'redis_key = f'start_urls:{name}'def make_request_from_data(self, data):""":param data: params data bytes, Message from redis:return:"""company = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(company)def make_requests_from_url(self, company):data = eval(company)url = data["url"]headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 ""(KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36","Accept": "*/*"}return Request(url, self.parse, meta={"data": data}, dont_filter=True, headers=headers)def parse(self, response):passif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl my_spider'.split())pass

所以,只需要在重写  make_request_from_data 方法,然后在里面解析 data 即可。

方法 2:重写 next_requests 方法

def next_requests(self):redis_ip = self.settings.get('REDIS_HOST')redis_port = self.settings.get('REDIS_PORT')redis_pw = self.settings.get('REDIS_PARAMS').get('password')redis_pool = redis.ConnectionPool(host=redis_ip, port=redis_port, password=redis_pw)self.redis_conn = redis.Redis(connection_pool=redis_pool)found = 0while found < self.redis_batch_size:data_raw = self.redis_conn.spop(self.redis_key)  # 从redis中取出内容if not data_raw:breakdata = json.loads(data_raw)  # 存入redis的内容是json,需要转化if "source_url" not in data:breakreq = scrapy.Request(url=data['source_url'],meta=data['meta'])  # 发出请求if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %s", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

示例代码( 为了演示,这个代码有点冗余,可以根据自己情况重写和删除 ):

# -*- coding: utf-8 -*-
# @Author  :
# @File    : simple_spider_base_class.py
# @Software: PyCharm
# @description : XXXimport json
import datetime
from abc import ABC
from scrapy_redis import defaults
from scrapy_redis.spiders import RedisSpider
from scrapy.utils.project import get_project_settingsclass SpiderBaseClass(RedisSpider, ABC):def __init__(self):super(SpiderBaseClass, self).__init__()self.temp = Noneself.__server_pipeline = Noneself.task_string = Nonedef __del__(self):passdef __get_server_pipeline(self):if not self.__server_pipeline:self.__server_pipeline = self.server.pipeline()return self.__server_pipelinedef __get_custom_redis_key(self):custom_redis_key = self.redis_key.split(':')[1]return custom_redis_keydef return_url(self, task_string=None):"""用来重写,返回值是一个 url:param task_string::return: url"""self.temp = Nonereturn 'http://www.examplewebsite.com'def redis_sort_set_pop(self, r_k=None):"""Pop a requesttimeout not support in this queue class"""t_k = r_k if r_k else self.redis_key# use atomic range/remove using multi/execpipe = self.__get_server_pipeline()pipe.multi()pipe.zrange(t_k, 0, 0).zremrangebyrank(t_k, 0, 0)results, count = pipe.execute()return results, countdef next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:try:data = fetch_one(self.redis_key)except BaseException as e:results, count = self.redis_sort_set_pop()if len(results):task_string = results[0].decode('utf-8')  # 字节类型self.task_string = task_stringmsg = 'get task : {0}'.format(task_string)print(msg)self.logger.info(msg)task_dict = json.loads(task_string)if task_dict.get('spider_url', None):data = task_dict['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)if not data:# Queue empty.print('task is none')break# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())# task_string = json.dumps(dict_data, ensure_ascii=False)# try:#     self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:#     passreq = self.make_request_from_data(data)req.meta['task_string'] = self.task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def __custom_get_task(self):row_str = Nonerow_score = Nonepipe = self.__get_server_pipeline()loop = Truewhile loop:try:# 升序temp = pipe.zrange(self.redis_key, 0, 0, withscores=True)[0]# temp = pipe.zrange(self.custom_redis_key, 0, 0, withscores=True)[0]# 降序# temp = pipe.zrevrange(self.redis_key, 0, 0, withscores=True)[0]row_value_score = pipe.execute()[0][0]  # <class 'tuple'>: (b'99', 99.0)row_str, row_score = row_value_scorepipe.watch(self.redis_key)pipe.multi()results = pipe.zrem(self.redis_key, row_str.decode('utf-8')).execute()# results = pipe.zrem(self.custom_redis_key, row_str.decode('utf-8')).execute()pipe.unwatch()if results[0]:  # results 是一个 list, 第一个元素代表成功与失败。1:成功。0:失败# 删除成功。退出循环loop = False  # 没有出现异常,说明不同线程或进程操作正确,跳出循环breakelse:# 删除失败,说明数据已经被别的进程取到并且删除,需要继续取任务和删除任务continueexcept WatchError as watch_error:# 释放锁,继续下次循环pipe.unwatch()continueexcept IndexError as i_e:# 当 redis 里面 没有任务时,再操作 redis 时报 IndexError 异常# 没有任务时,结束循环,令 row_str=Noneloop = Falserow_str = Noneexcept BaseException as e:row_str = Nonebreakreturn row_str, row_scoredef next_requests_old(self):"""Returns a request to be scheduled or none.:return:"""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)# fetch_one = self.server.spop if use_set else self.server.lpopall_settings = get_project_settings()flag = all_settings.getbool('REDIS_START_URLS_AS_SET_TAG')# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:if flag:row_str, row_score = self.__custom_get_task()else:fetch_one = self.server.spop if use_set else self.server.lpopjson_data = fetch_one(self.redis_key) if not flag else row_strsource = Noneif json_data:task_string = json_data.decode('utf-8')  # 字节类型self.task_string = task_stringself.logger.info('get task : {0}'.format(task_string))dict_data = json.loads(task_string)source = dict_data.get('source', None)task_id = dict_data.get('id', None)# LOG.INFO('get task is {0}'.format(self.task_string))if dict_data.get('spider_url', None):data = dict_data['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)else:data = None# LOG.INFO('get task is None')print('get task is None')self.logger.info('get task is None')if not data:# Queue empty.breakif source:try:filter_conditions = {'_id': ObjectId(task_id)}except BaseException as e:filter_conditions = {'_id': task_id}item = {"$set": {"status": 'running'}}db_template_name = all_settings.get('DB_TEMPLATE_NAME', None)if db_template_name:db_name = db_config.get(db_template_name).get('database', None)tb_name = db_config.get(db_template_name).get('echo_tb', None)is_success = self.wb.update_one(db_name=db_name, tb_name=tb_name,filter_conditions=filter_conditions, item=item)if 0 == is_success:self.logger.info('update status : running')else:self.logger.info('update status fail')# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())task_string = json.dumps(dict_data, ensure_ascii=False)# try:#     self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:#     passreq = self.make_request_from_data(data)req.meta['task_string'] = task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)if __name__ == '__main__':pass

scrapy-redis 使 redis 不止保存 url(例如:json)相关推荐

  1. 【檀越剑指大厂--redis】redis高阶篇

    一.数据结构与对象 1.什么是 SDS? Redis 没有直接使用 C 语言传统的字符吕表示 (以空字符结尾的字符数组,以下简称 C 字符串),而是自己构建了 一种名为简单动态字符串(simple d ...

  2. 什么是redis,用redis进行秒杀项目

    文章目录 什么是NoSql 演变过程 NoSql的特点 什么是Redis Redis能干嘛 http://www.redis.cn 中文网 Redis入门 安装 性能测试 基础知识 五大基本数据类型 ...

  3. 初识redis(redis基础命令)

    redis简介 redis是一个开源(BSD许可)的使用C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,它可以用作数据库.缓存和消息中间件,并提供多种语言的API.从20 ...

  4. vagrant系列四:vagrant搭建redis与redis的监控程序redis-stat

    上一篇php7环境的搭建 真是火爆.仅仅两天时间,就破了我之前swagger系列的一片文章,看来,大家对搭建好开发环境真是情有独钟. 为了訪问量,我今天再来一篇redis的搭建. 当然不能仅仅是red ...

  5. vagrant系列教程(四):vagrant搭建redis与redis的监控程序redis-stat(转)

    阅读目录 下载redis 解压redis 编译安装redis 配置redis redis开机自启动 系统参数的调整 上一篇php7环境的搭建 真是火爆,仅仅两天时间,就破了我之前swagger系列的一 ...

  6. linux配置redis服务,记一次linux下安装redis, 设置redis服务, 及添加环境变量

    一. redis的安装 cd /opt                                                                                # ...

  7. 深入Redis客户端(redis客户端属性、redis缓冲区、关闭redis客户端)

    深入Redis客户端(redis客户端属性.redis缓冲区.关闭redis客户端) Redis 数据库采用 I/O 多路复用技术实现文件事件处理器,服务器采用单线程单进程的方式来处理多个客户端发送过 ...

  8. centos环境访问php显示源码,CentOS 6.8 搭建LNAMP环境(五)- PHP7源码安装Redis和Redis拓展...

    一.安装Redis 1.下载redis源码包,将源码包放到/usr/local/src/目录下 这里用的是redis-4.0.2.tar.gz 2.进入src/目录 cd /usr/local/src ...

  9. [转载]redis和 redis的php扩展

    原文地址:redis和 redis的php扩展作者:Web开发 Redis介绍     数据库主要类型有对象数据库,关系数据库,键值数据库等等,对象数据库太超前了,现阶段不提也罢:关系数据库就是平常说 ...

最新文章

  1. windows上安装zipMongoDB安装包
  2. Python OpenCV学习笔记之:分水岭算法分割图像
  3. python 文件指定位置写入-Python从文件中读取指定的行以及在文件指定位置写入...
  4. python字典遍历 没有顺序_Python中字典的顺序问题(为什么实践发现字典的遍历和方法popitem并不是随机的?)...
  5. python3 os模块相关方法
  6. Android 应用开发---App 移动应用中九种导航设计总结及其优缺点分
  7. hadoop-0.20.1+120 hive-0.3.99.1+0 试用hwi(hive web interface
  8. [转载] PYTHON 网络编程
  9. maven 的 oracle的Missing artifact com.oracle:******:jar:11.2.0.2.0
  10. 多少天能学会php,如何在十天内学会php之第八天_php
  11. c语言程序求对称矩阵,C中使用CBLAS/LAPACK的对称矩阵求逆
  12. Mac连接不上无线网络的解决方法
  13. 有没有ai绘画教程?什么软件能实现ai绘画?
  14. 网易企业邮箱注册后,管理员怎么管理邮箱?
  15. python怎样删除某一行_python删除某一行
  16. pthon爬虫笔记--名著小说网
  17. 10100java压力测试_3DMark新增压力测试 你的电脑可靠?得先过这关
  18. 中国激光直接成型LDS级树脂市场发展态势及项目投资建议报告2022-2028年
  19. 组织项目管理(PMP知识整理)
  20. 2021年武威铁路中学高考的成绩查询,武威铁路中学统筹推进2020-2021学年第二学期开学工作纪实...

热门文章

  1. 深入解析GBDT二分类算法(附代码实现)
  2. 阿里P8架构师谈:什么是缓存雪崩?服务器雪崩的场景与解决方案
  3. 梁家卿 | 百科知识图谱同步更新
  4. 仅使用numpy从头开始实现神经网络,包括反向传播公式推导过程
  5. 使用numpy实现神经网络模块
  6. java.lang.NoClassDefFoundError: * : Landroid/support/v7/gridlayout/R$styleable 异常终极解决办法
  7. 【Java】Stream流和方法引用
  8. 【LeetCode】3月16日打卡-Day1
  9. 国科大高级人工智能笔记1-搜索
  10. SSM:出现Connections could not be acquired from the underlying database异常的解决