使用BloomFilter优化scrapy-redis去重

1. 背景

  • 做爬虫的都知道,scrapy是一个非常好用的爬虫框架,但是scrapy吃内存非常的厉害。其中有个很关键的点就在于去重。
  • “去重”需要考虑三个问题:去重的速度和去重的数据量大小,以及持久化存储来保证爬虫能够续爬。
    • 去重的速度:为了保证较高的去重速度,一般是将去重放到内存中来做的。例如python内置的set( ),redis的set数据结构。但是当数据量变得非常大,达到千万级亿级时,因为内存有限,就需要用“位”来去重了, 因此BloomFilter应运而生,将去重工作由字符串直接转到bit位上,大大降低了内存占有率。
    • 去重的数据量大小:当数据量较大时,我们可以使用不同的加密算法,压缩算法(例如md5,hash)等,将长字符串压缩成16/32 长度的短字符串。然后再使用set等方式来去重。
    • 持久化存储:scrapy默认是开启去重的,而且提供了续爬设计,在爬虫终止时,会记录一个状态文件记录爬取过的request和状态。scrapy-redis的去重工作交给了redis,去重队列放到了redis中,而redis可以提供持久化存储。Bloomfilter是将去重对象映射到几个内存“位”,通过几个位的 0/1值来判断一个对象是否已经存在。Bloomfilter运行在一台机器的内存上,并不方便持久化,爬虫一旦终止,数据就丢失了。
  • 如上所述,对于scrapy-redis分布式爬虫来说,使用Bloomfilter来优化,必然会遇到两个问题:
    • 第一,要想办法让Bloomfilter能持久化存储下来。
    • 第二,对于scrapy-redis分布式爬虫来说,爬虫分布在好几台不同的机器上。而Bloomfilter是基于内存的,如何让各个不同的爬虫机器能够共享到同一个Bloomfilter,来达到统一去重?
  • 综上所述,将Bloomfilter挂载到redis上,持久化存储以及让各爬虫共享去重队列,这两个问题就都解决了。

2. 环境

  • 系统:win7
  • scrapy-redis
  • redis 3.0.5
  • python 3.6.1

3. Bloom Filter基本概念以及原理

  • 详情请参考文章:http://blog.csdn.net/jiaomeng/article/details/1495500
  • 简单来说,Bloom Filter是:

    • Bloom Filter 是一种空间效率很高的随机数据结构,利用位数组表示一个集合,并能判断一个元素是否属于这个集合。
    • Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。当然,如果这个元素属于这个集合,是一定不会被误判为不存在这个集合。
    • Bloom Filter不适合那些“0错误”的应用场合。
  • 为了能理解Bloom Filter的原理,必须熟悉以下几个基本元素的概念:

3.1. Bloom Filter位数组

  • Bloom Filter是用位数组表示集合的。初始状态时,Bloom Filter是一个包含m位的位数组 { 1, …, m },每一位都置为0。表现形式可以是一段空白的内存,长字符串,任意一种占用内存空间的数据结构……

3.2. 待去重元素

  • 对爬虫来说,也就是request队列,我们记为 S = { R1, R2, …, Rn } 这样一个n个元素的集合。

3.3. k个相互独立的哈希函数

  • Bloom Filter使用k个相互独立的哈希函数,我们记为 H = { H1( ), H2( ), …, Hk( ) }。利用这些hash函数,对集合S = { R1, R2, …, Rn } 中的每个元素进行处理,映射到Bloom Filter开辟内存{ 1, …, m }的某一位上。这样,对于R1来说,映射的结果就是{ H1( R1 ), H2( R1 ), …, Hk( R1 ) }

  • 需要注意的是,如果一个位置多次被置为1,那么只有第一次会起作用,后面几次将没有任何效果。
    从这一点就可以看出为什么会有误判,有可能会把不属于这个集合的元素误认为属于这个集合,就是因为这个元素被映射后的集合上那些位上,可能已经被置成1了。

3.4. 错误率

  • Bloomfilter算法会有漏失概率,即不存在的字符串有一定概率被误判为已经存在。这个概率的大小与seeds的数量、申请的内存大小、去重对象的数量有关。下面有一张表,m表示内存大小(多少个位),n表示去重对象的数量,k表示seed的个数。例如我代码中申请了256M,即1<<31(m=2^31,约21.5亿),seed设置了7个。看k=7那一列,当漏失率为8.56e-05时,m/n值为23。所以n = 21.5/23 = 0.93(亿),表示漏失概率为8.56e-05时,256M内存可满足0.93亿条字符串的去重。同理当漏失率为0.000112时,256M内存可满足0.98亿条字符串的去重。

4. redis的setbit功能

4.1. 官方说明

# SETBIT key offset value :设置或清除该位在存储在键的字符串值偏移对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。位的设置或清除取决于 value 参数,可以是 0 也可以是 1 。当 key 不存在时,自动生成一个新的字符串值。字符串会进行伸展(grown)以确保它可以将 value 保存在指定的偏移量上。当字符串值进行伸展时,空白位置以 0 填充。offset 参数必须大于或等于 0 ,小于 2^32 (bit 映射被限制在 512 MB 之内)。

4.2. 使用案例

在redis中,字符串都是以二级制的形式进行存储的。

第一步:设置一个 key-value ,字符串 testStr = ‘ab’

我们知道 ‘a’ 的ASCII码是 97, 转换为二进制是:01100001
‘b’的的ASCII码是 98,转换为二进制是:01100010。
‘ab’转换成二进制就是:0110000101100010

第二步:设置偏移
offset代表偏移,从0开始从左往右计数的,也就是从高位往低位 。
比如我们想将 011000010110001 0 (ab)置成 011000010110001 1(ac) ,也就是将第15位由0置成1,此时b变成了c

setbit完之后,会有有一个(integer) 0或者(integer)1的返回值,这个是在进行setbit 之前,该offset位的比特值。 
这就是redis 中 “SETBIT” 的基本用法。

redis还有一个与此相关的功能:bitcount,就是统计字符串的二级制编码中有多少个’1’。 所以这里
bitcount testStr 得到的结果就是 7

5. 详细部署

  • 结合上面Bloom Filter和redis的setbit功能,我们就知道如何将Bloom Filter挂载在redis上了。没错,就是一个大的字符串!
  • 下面是在scrapy-redis分布式爬虫中挂入Bloom Filter的详细步骤:

5.1. 编写Bloom Filter算法。

# 文件:Bloomfilter.py# encoding=utf-8import redis
from hashlib import md5# 根据 开辟内存大小 和 种子,生成不同的hash函数
# 也就是构造上述提到的:Bloom Filter使用k个相互独立的哈希函数,我们记为 **H = { H1( ),  H2( ),  ...,  Hk( ) }**
class SimpleHash(object):def __init__(self, bitSize, seed):self.bitSize = bitSizeself.seed = seeddef hash(self, value):ret = 0for i in range(len(value)):# print(f"value[i] = {value[i]},  ord(value[i]) = {ord(value[i])}")ret += self.seed * ret + ord(value[i])# 控制hashValue的值在这个内存空间的范围hashValue = (self.bitSize - 1) & ret# print(f"value = {value}, hashValue = {hashValue}")return hashValue# 在redis中初始化一个大字符串,也可以认为是在redis中开辟了一块内存空间
# 需要指定数据库名, 比如这儿用到的就是db2
# 指定使用数据块个数,也就是开辟几个这样的大字符串。
# 当数据达到非常大时,512M肯定是不够用的,可能每个位都被置为1了,所以需要开辟多个大字符串
# 大字符串名name = (key + int)
class BloomFilter(object):def __init__(self, host='localhost', port=6379, db=2, blockNum=1, key='bloomfilter'):""":param host: the host of Redis:param port: the port of Redis:param db: witch db in Redis:param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.:param key: the key's name in Redis"""self.server = redis.Redis(host=host, port=port, db=db)# 2^31 = 256M# 这是一个限制值,最大为256M,因为在redis中,字符串值可以进行伸展,伸展时,空白位置以0填充。self.bit_size = 1 << 31  # Redis的String类型最大容量为512M,现使用256Mself.seeds = [5, 7, 11, 13, 31, 37, 61]self.key = keyself.blockNum = blockNumself.hashfunc = []for seed in self.seeds:# 根据seed 构造出 k=7 个独立的hash函数self.hashfunc.append(SimpleHash(self.bit_size, seed))# 判断元素是否在集合中def isContains(self, str_input):if not str_input:return Falsem5 = md5()m5.update(str_input.encode('utf-8'))# 先取目标字符串的md5值str_input = m5.hexdigest()ret = Truename = self.key + str(int(str_input[0:2], 16) % self.blockNum)for f in self.hashfunc:loc = f.hash(str_input)ret = ret & self.server.getbit(name, loc)return ret# 将str_input映射的结果,写入到大字符串中,也就是置上相关的标志位def insert(self, str_input):m5 = md5()m5.update(str_input.encode('utf-8'))str_input = m5.hexdigest()name = self.key + str(int(str_input[0:2], 16) % self.blockNum)for f in self.hashfunc:loc = f.hash(str_input)# print(f"name = {name}, loc = {loc}")self.server.setbit(name, loc, 1)if __name__ == '__main__':# 第一次运行时会显示 not exists, 之后再运行会显示 existsbf = BloomFilter()if bf.isContains('http://www.sina.com.cn/'):   # 判断字符串是否存在print('url exists!')else:print('url not exists!')bf.insert('http://www.sina.com.cn/')

5.2. 修改scrapy-redis的去重算法。

5.2.1. 分析源码调度过程。

# 调度过程:1. 第一步,调度文件: scheduler.py
open() ——> self.df = load_object(self.dupefilter_cls) ——> dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS ——> SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' # 加入调度队列
def enqueue_request(self, request):if not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return Falseif self.stats:self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)self.queue.push(request)return True
可见是使用dupefilter文件中的RFPDupeFilter类的 request_seen 方法来进行去重的。2. 第二步,去重文件:dupefilter.py
def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request)# This returns the number of values added, zero if already exists.added = self.server.sadd(self.key, fp)return added == 0
可以知道scrapy_redis是利用set数据结构来去重的,去重的对象是request的fingerprint。def request_fingerprint(request, include_headers=None):"""Return the request fingerprint.The request fingerprint is a hash that uniquely identifies the resource therequest points to. For example, take the following two urls:http://www.example.com/query?id=111&cat=222http://www.example.com/query?cat=222&id=111Even though those are two different URLs both point to the same resourceand are equivalent (ie. they should return the same response).Another example are cookies used to store session ids. Suppose thefollowing page is only accesible to authenticated users:http://www.example.com/members/offers.htmlLot of sites use a cookie to store the session id, which adds a randomcomponent to the HTTP Request and thus should be ignored when calculatingthe fingerprint.For this reason, request headers are ignored by default when calculatingthe fingeprint. If you want to include specific headers use theinclude_headers argument, which is a list of Request headers to include."""if include_headers:include_headers = tuple(to_bytes(h.lower())for h in sorted(include_headers))cache = _fingerprint_cache.setdefault(request, {})if include_headers not in cache:fp = hashlib.sha1()fp.update(to_bytes(request.method))fp.update(to_bytes(canonicalize_url(request.url)))fp.update(request.body or b'')if include_headers:for hdr in include_headers:if hdr in request.headers:fp.update(hdr)for v in request.headers.getlist(hdr):fp.update(v)cache[include_headers] = fp.hexdigest()return cache[include_headers]从request_fingerprint可以看出,fingerprint到底是什么,,其实就是用hashlib.sha1()对request对象的某些字段信息进行压缩,用调试也可以看到,其实fp就是request对象加密压缩后的一个字符串(40个字符,0~f)
  • 总结,从上面的调度过程我们就可以看出,修改点就在于dupefilter.request_seen()函数。

5.2.2. 修改源码。

  • 原始文件 dupefilter.py
# 原始文件 .\Lib\site-packages\scrapy_redis\dupefilter.pyimport logging
import timefrom scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprintfrom . import defaults
from .connection import get_redis_from_settingslogger = logging.getLogger(__name__)# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""server = get_redis_from_settings(settings)# XXX: This creates one-time key. needed to support to use this# class as standalone dupefilter with scrapy's default scheduler# if scrapy passes spider on open() method this wouldn't be needed# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request)# This returns the number of values added, zero if already exists.added = self.server.sadd(self.key, fp)return added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)def close(self, reason=''):"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request)s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request)s"" - no more duplicates will be shown"" (see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = False
  • 修改后的文件: dupefilter.py
# 修改后的文件 .\Lib\site-packages\scrapy_redis\dupefilter.pyimport logging
import timefrom scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprintfrom . import defaults
from .connection import get_redis_from_settingsisUseBloomfilter = False
try:from .Bloomfilter import BloomFilter
except Exception as e:print(f"there is no BloomFilter, used the default redis set to dupefilter.")
else:isUseBloomfilter = Truelogger = logging.getLogger(__name__)# TODO: Rename class to RedisDupeFilter.
class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True# 使用 Bloonfilter 来对url去重if isUseBloomfilter == True:self.bf = BloomFilter()@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""server = get_redis_from_settings(settings)# XXX: This creates one-time key. needed to support to use this# class as standalone dupefilter with scrapy's default scheduler# if scrapy passes spider on open() method this wouldn't be needed# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""if isUseBloomfilter == True:# 使用 Bloomfilter 来对url去重fp = self.request_fingerprint(request)if self.bf.isContains(fp):  # 如果已经存在return Trueelse:self.bf.insert(fp)return Falseelse:# 使用redis默认的set进行去重fp = self.request_fingerprint(request)# This returns the number of values added, zero if already exists.added = self.server.sadd(self.key, fp)return added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)def close(self, reason=''):"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request)s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request)s"" - no more duplicates will be shown"" (see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = False

5.2.3. 文件结构

5.3. 结果展示。

  • 使用前:

  • 使用后:

5.4. 特别说明。

5.4.1. 关于错误率

  • 对于爬虫来说,需要根据数据量,来配置不同的参数信息,如果参数配置不准确,错误率将会非常之高,导致request丢失(上面的hash算法映射的位数太多,容易导致大的误判率),非常的不划算……

5.4.2. 关于移除Bloom Filter

  • 不仅需要移除BloomFilter.py,而且要将下面的缓存文件一并移除。

附录:参考文章

  1. scrapy_redis去重优化(已有7亿条数据),附Demo福利:http://blog.csdn.net/bone_ace/article/details/53099042
  2. Bloom Filter概念和原理:http://blog.csdn.net/jiaomeng/article/details/1495500
  3. 基于Redis的Bloomfilter去重(附Python代码):http://blog.csdn.net/Bone_ACE/article/details/53107018#insertcode

使用BloomFilter优化scrapy-redis去重相关推荐

  1. Scrapy 爬虫去重效率优化之 Bloom Filter的算法的对接

    From:https://cloud.tencent.com/developer/article/1084962 Python分布式爬虫打造搜索引擎Scrapy精讲-将bloomfilter(布隆过滤 ...

  2. 三种去重方式——HashSet、Redis去重、布隆过滤器(BloomFilter)

    三种去重方式 去重就有三种实现方式,那有什么不同呢? HashSet 使用java中的HashSet不能重复的特点去重.优点是容易理解.使用方便. 缺点:占用内存大,性能较低. Redis去重 使用R ...

  3. Python面试必备—分布式爬虫scrapy+redis解析

    传智播客博学谷 微信号:boxuegu- get最新最全的IT技能 免费领取各种视频资料 注意:文末送书 很多小伙伴留言,最近准备跳槽,但是面试的机会比较少,好不容易得到面试机会,由于技术点的匮乏,面 ...

  4. Scrapy框架-去重原理讲解、数据收集以及信号量知识

    scrapy的去重原理 信号无处不在 [知其然且知其所以然,才能够更好的理解这个框架,而且在使用和改动的时候也能够想出更合理的方法.] (开始测试前,到settings.py中注释掉下载中间件的设置, ...

  5. redis专题:redis键值设计、性能优化以及redis连接池配置

    文章目录 1.redis键值设计 ①:key设计规范 ②:value设计规范 2. 命令使用优化 3. redis连接池配置参数设计 4. redis连接池预热 5. redis的key过期删除策略 ...

  6. Scrapy+redis+mongodb分布式爬虫抓取小说《冰与火之歌1-5》

    一年前写了python简单实战项目:<冰与火之歌1-5>角色关系图谱构建的数据库设计和数据可视化共现图谱的构建,中间唯独缺了数据的采集,因为想着只是个小爬虫,应该无关痛痒,后面也觉得这个系 ...

  7. java redis 去重_redis去重方案

    tpn(taobao push notification)在使用redis计算消息未读数的过程中,遇到了一系列的问题,下面把这个过程整理了一下,也让大家了解这个纠结的过程,供大家以后使用redis或者 ...

  8. scrapy分布式去重组件源码及其实现过程

    scrapy_redis在继承scrapy去重组件的基础上覆盖了某些方法,原scrapy去重是基于单机情况下的内部去重,但是分布式是多机条件下的多爬虫协同去重,因此需要让不同及其上的同一个爬虫能够在同 ...

  9. scrapy+redis+mongodb爬取苏宁商城图书价格

    之前的爬取苏宁图书信息的时候因为懒得分析图书的价格,所有今天把图书的价格给弄了 图书的价格是动态生成的,不过稍稍花点时间就分析出来了,本来长长的·一大串,慢慢删减慢慢试就剩一个,看下图 然后我在网页源 ...

最新文章

  1. jpa删除数据后数据库无修改_java – JPA不删除数据库行
  2. -9 逆序输出一个整数的各位数字_计算机基础知识: 信息数字化
  3. android 设置窗口透明效果,android - 如何将对话框窗口背景设置为透明,而不影响其边距...
  4. 加快信息化建设对地方发展的_加快设计师职业发展的9种方法
  5. Tooltip工具提示控件的使用
  6. 控制反转(IOC)入门
  7. 重磅!微软在 GitHub 又一开源力作面世,代号「女娲」!
  8. 填写【2fpmi2j】
  9. python单位根检验看结果_python做adf检验
  10. 10.5 Vue电商后台管理完善--订单详情页面显示商品信息,添加备注
  11. c语言圆周率小数点后500万位,圆周率小数点后500位数字是多少
  12. [BZOJ3772]精神污染(主席树+链剖)
  13. Css3中-moz、-ms、-webkit的使用
  14. python-数据分析(3-Matplotlib之各种图形应用)
  15. 根据起始日期、起始时间、终止日期、终止时间计算天数
  16. 上云节省 35%计算资源,420 个运维人天:运满满实时计算实践和思考
  17. 编程求一个9位的整数,数字由1-9构成,每个数字只能出现一次。并且这个整数的前一位能被1整除,前两位能被2整除, ......以此类推,前九位能被9整除。
  18. Web Push功能使用
  19. OE(OSA)期刊模板下载
  20. VS2019/MFC编程入门教程:组合框控件Combo Box

热门文章

  1. idea注释模版配置(吐血推荐!!!)
  2. WebAPI面试题总结03
  3. 华为手机删除用户凭据(用于 WLAN)
  4. Unhandled exception at 0x0fd238de in face_rec.exe: 0xC000001D: Illegal Instruction
  5. 如何成为月入五万的程序员,他们告诉你!
  6. 一篇搞懂python的内存管理
  7. 白血病最新研究进展(2022年2月)
  8. RDkit四:数据处理过程中smiles编码的清洗统一化
  9. 8位双向移位寄存器verilog设计
  10. 如何利用python制作几个简单的游戏?(简单易上手版)