一、背景原理

部分内容参考基于python的MySQL和redis数据同步实现(redis做缓存)
这个说的真的很清楚,但缺少实际的案例

1、MongDB 数据库

MongDB是一种非关系型数据库,主要用于存放持久化数据,将数据存储在硬盘中,读取速度较慢。每次请求访问数据库时,都存在着I/O操作,如果反复频繁的访问数据库:会在反复链接数据库上花费大量时间,从而导致运行效率过慢;反复的访问数据库也会导致数据库的负载过高。所以,针对MongDB的缺点,衍生出了缓存的概念。

2、redis数据库

redis是一款非关系型数据库,是一种缓存数据库,数据存放在内存中,用于存储使用频繁的数据,这样减少访问数据库的次数,提高运行效率。所以redis数据库读取速度比较快,运行效率高。

3、二者区别与联系

(1) 作用:MongDB用于持久化的存储数据到硬盘,功能强大,速度较慢,基于磁盘,读写redis快,但是不受空间容量限制,性价比高;redis用于存储使用较为频繁的数据到缓存中,读取速度快,基于内存,读写速度快,也可做持久化,但是内存空间有限,当数据量超过内存空间时,需扩充内存,但内存价格贵;

(3) 需求:MongDB和redis因为需求的不同,一般都是配合使用。需要高性能的地方使用Redis,不需要高性能的地方使用MongDB。存储数据在MongDB和Redis之间做同步。所以一般情形下,使用MongDB作为持久化存储数据库存储数据,使用redis作为缓存提升读取速度。

二、数据同步实现方案

把二手房小区价格数据,持久化存储在MongDB数据库中,然后利用redis作为缓存数据库,实现数据的快速读取。这样就需要保持redis和MongDB数据库的数据一致性,接下来,主要讲解查询和数据更新过程的数据库一致性实现。

1、查询一致性

查询数据时,由于redis作为缓存实现快速读取数据,所以首先查询redis中是否存在数据,若存在则返回查询结果,若不存在,则向MongDB数据库请求查询数据,然后由MongDB数据库返回结果。查询流程如下如所示。而且,由于本文中redis作为缓存使用,所以需要添加过期时间,也就是为redis的每条数据记录添加过期时间,若过期时间数据没有被查询则清除,若此时间内,数据被查询,则过期时间重置,这样可以定时清除查询不频繁的数据存在redis中,增加数据读取速度。

2、数据实时重复

redis作为MongoDB的缓存在线实时去重,可以解决在多进程、多线程、异步爬虫时的数据实时重复问题。什么叫数据实时重复呢?
数据重复主要体现在,爬虫一但需要整个库全部数据的实时更新,但并不知道对方网站数据只有一部分数据更新。但要整个库全部数据不能有重复,没法监控对方网页,就需要重新爬整个网站。用案例最能说明问题。

案例一:二手房价格更新

8月数据如下

9月数据如下

如何判断是数据库有,但真实新数据变化的?
如果有数据,看新爬取的价格时间是否在数据库价格时间列表中,如果priceMonth不在其中则插入,如果在其中则不插入。这里例如第一次爬取在8月,数据大多展示7月数据。本月9月需要把数据更新到8月,要保证数据成功更新,又不重复。就需要快速判断什么页面需要更新,什么页面不需要更新。从而减少请求次数,提高爬虫效率,并且减少对方服务器压力。

明确重复数据的特征

首先明确什么样的数据是重复的。
因为有一万五相同小区名字,但在不同城市的数据。如果拿小区+月份重复,这一万五就丢掉了。就和一个"春天花园"一样,六七个省都有这个小区名字 。那就需要思考省市区(县)+小区+月份更准确 还是 城市+小区+月份就可以。会不会有同一个城市不同的区县有相同的小区名字。
因为中国没有相同市、区的名字,不加“省”也是可以的。最终拿市+区(县)+小区名字+年月作为主键,保证数据绝对的不重复。因此这里redis表的设计采用hash类型的数据,这样可以存在多个key-value对,以用户ID作为hash表的名称。

keyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]

基于Redis的数据库性质,查询插入hkeys,hmset的效率比用pandas去重,mongoDB去重,效率高很多,这是大费周章用redis的原因,不然为啥不用mongoDB呢?这里读者发现我不对,或者这种方法效率更低,我愿意有偿听你意见(我只是redis的菜鸟)

import redis
from multiprocessing.dummy import Pool as ThreadPool
import copy
pool =redis.ConnectionPool(host='localhost',port=6379,db=2)
connection = redis.Redis(connection_pool=pool)# connection.flushall()#清空数据库
list_=copy.deepcopy(mycol2_list)#list(mycol1.find())def pross_Redis(item):item["_id"]=str(item['_id'])item["center"]=str(item["center"])keyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]
#     print(len(connection.hkeys(keyend)))if len(connection.hkeys(keyend))!=10:#如果没有print(1)connection.hmset(keyend, mapping=item)  # 批量插入elif len(connection.hkeys(keyend))==10:#如果有if item["priceMonth"] not in [connection.hget(keyend,"priceMonth").decode('utf-8')]:print(2)connection.hmset(keyend, mapping=item)  # 批量插入else:print("重复数据")
#         referencePrice=connection.hget(keyend,"referencePrice")else:print("其他数据")
#         mycol1.insert_one(item)#.update({'_id':id_}, {'$rename': {'updateDate': 'priceMonth'}}, False, True)
pool = ThreadPool(10)
pool.map(pross_Redis,list_[:30])
pool.close()
pool.join()fauture=mycol2_list[36]
item=fauture
keyend=item["city"]+item["region"]+item["projectName"]+item["priceMonth"]
isExists = connection.hexists(keyend,"projectName")
if isExists!=True:#如果新数据不存在于数据库,插入print(1)

最终效果如下

具体设置方法
方案1 缓存方案
爬"上海市"的时候,把mongoDB上海市的数据缓存到redis,然后设置30分钟或者爬取完释放缓存。
方案2 Redis过滤器快速去重
(1)刚开始爬虫 把MongDB数据库所有数据缓存到redis
(2)把新爬的数据按照keyend和redis主键对比
(3)如果keyend不在redis主键中,插入redis,再插入MongDB
(4)如果keyend在redis主键中,不插入redis
(5) connection.flushall()释放redis数据的缓存
这个拿字典也能实现,只是效率没有redis高,等到一百万数据的时候这个会很明显

最终pipeline的写法如下

from itemadapter import ItemAdapter
import pymongo as pymongo
from .items import ShellItemimport pandas as pd
import redis
from multiprocessing.dummy import Pool as ThreadPool
import copyclass ShellPipeline:
"省略数据库部分"def process_item(self, items, spider):items1= ItemAdapter(items).asdict()global connection# 用redis作为缓存来去重数据pool = redis.ConnectionPool(host='localhost', port=6379, db=2)connection = redis.Redis(connection_pool=pool)#这里必须每次插入都导入最新redisdef pross_Redis(item):# item["_id"] = str(item['_id'])item = {"province": item["province"],"city": item["city"],"spiderDate": item["spiderDate"],"projectName": item["projectName"],"referencePrice": item["referencePrice"],"region": item["region"],"priceMonth": item["priceMonth"],"deliveryDate": item["deliveryDate"],"center": item["center"]}item["center"] = str(item["center"])keyend = item["city"] + item["region"] + item["projectName"] + item["priceMonth"]#     print(len(connection.hkeys(keyend)))if len(connection.hkeys(keyend)) != 9:  # 如果没有数据,9是我数据item有9个keyprint(1)connection.hmset(keyend, mapping=item)  # 批量插入return Trueelif len(connection.hkeys(keyend)) == 9:  # 如果有if item["priceMonth"] not in [connection.hget(keyend, "priceMonth").decode('utf-8')]:print(2)connection.hmset(keyend, mapping=item)  # 批量插入else:print("重复数据")else:print("其他数据")postItem = dict(items1)keyend1= items1["city"] + items1["region"] + items1["projectName"] + items1["priceMonth"]isExists = connection.hexists(keyend1, "projectName")if isExists != True:  # 如果新数据不存在于数据库,插入# 把item转化成字典形式print(postItem)judje=pross_Redis(postItem)if judje==True:print(postItem)self.coll.insert_one(postItem)#self.coll为MongoDBreturn items1

启动函数main如下

from twisted.internet import reactor, defer
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
import time
import logging
from scrapy.utils.project import get_project_settings
import multiprocessing
# import psycopg2
import time# # 在控制台打印日志
# configure_logging()
# # CrawlerRunner获取settings.py里的设置信息
# runner = CrawlerRunner(get_project_settings())
import redis
from multiprocessing.dummy import Pool as ThreadPool
import copy
global connection
import pymongo
myclient = pymongo.MongoClient("mongodb://localhost:27017/")  #,username='root',password='18091471364@ch'使用MongoClient对象,连接数据库
collist  = myclient.list_database_names()  # 获取所有数据库
mydb = myclient["companyln"]  # 数据库名 esfcomunicate
datalist=[]
#for i in ["test_815_Night","test_815_Night_end","test_esf815"]:
mycol2 = mydb["company_second_hand_house_price"]# collection集合(类似SQL的表)
# datalist=datalist+list(mycol.find())# 用redis作为缓存来去重数据
pool1 = redis.ConnectionPool(host='localhost', port=6379, db=2)
connection = redis.Redis(connection_pool=pool1)
# 更新取消注释,把底库导入redis
#
# def pross_Redis(item):
#     # item["_id"] = str(item['_id'])
#     item = {"province": item["province"],
#             "city": item["city"],
#             "spiderDate": item["spiderDate"],
#             "projectName": item["projectName"],
#             "referencePrice": item["referencePrice"],
#             "region": item["region"],
#             "priceMonth": item["priceMonth"],
#             "deliveryDate": item["deliveryDate"],
#             "center": item["center"]}
#
#     item["center"] = str(item["center"])
#     keyend = item["city"] + item["region"] + item["projectName"] + item["priceMonth"]
#     #     print(len(connection.hkeys(keyend)))
#     if len(connection.hkeys(keyend)) != 9:  # 如果没有
#         print(1)
#         connection.hmset(keyend, mapping=item)  # 批量插入
#         return True
#     elif len(connection.hkeys(keyend)) == 9:  # 如果有
#         if item["priceMonth"] not in [connection.hget(keyend, "priceMonth").decode('utf-8')]:
#             print(2)
#             connection.hmset(keyend, mapping=item)  # 批量插入
#         else:
#             print("重复数据")
#     #         referencePrice=connection.hget(keyend,"referencePrice")
#     else:
#         print("其他数据")# connection.flushall()#清空数据库
# 如何判断是数据库有,但真实新数据变化的
# [connection.hget(keyend,"priceMonth").decode('utf-8')]#==xinshuji.encode('utf-8')
# 把不同月份的合并起来成一个列表,但只显示最新数据
# 如果有数据,看新爬取的价格时间是否在数据库价格时间列表中,如果priceMonth不在其中则插入,如果在其中则不插入
# list_ = copy.deepcopy(list(mycol2.find()))# from multiprocessing import Pool
# multiprocessing = Pool(processes=8)#
# def ThreadPool1(list_):
# pool = ThreadPool(10)
# pool.daemon=True
# pool.map(pross_Redis, list_)
# pool.close()
# pool.join()# multiprocessing.map(pross_Redis, list_)
# multiprocessing.close()
# multiprocessing.join()#第1行代码导入CMDLINE模块来执行命令行指令。第2行代码用split()函数根据空格拆分指令字符串,再用execute()函数输入到命令行中执行,相当于直接在终端中执行指令“scapy crawl爬虫名”。from scrapy.crawler import CrawlerProcessfrom scrapy.utils.project import get_project_settings
from scrapy.spiderloader import SpiderLoader
import time
from multiprocessing import Poollist_all=[['End_gansu','End_guangdong','End_guangxi','End_guizhou'],
['End_hainan','End_hebei','End_heilongjiang','End_henan'],
['End_hubei','End_jiangsu','End_jiangxi','End_shanghai'],
['End_shanxi','End_sichuan','End_tianjing','End_xinjiang'],
['End_fujian','End_shandong','End_yunnan','End_zhejiang'],
['End_anhui', 'End_beijing','End_chongqing',]]#['End_liaoning','End_jilin','End_neimenggu','End_ningxia'],# 根据项目配置获取 CrawlerProcess 实例
def process1(name):# try:process = CrawlerProcess(settings=get_project_settings())process.crawl(name)process.start()# except:#     pass# print(process)
# # 获取 spiderloader 对象,以进一步获取项目下所有爬虫名称
spider_loader = list_all#SpiderLoader(list_all)
if __name__ == '__main__':for P in list_all:# LIST1=Pstart_3=time.time()pool = Pool(processes=4)pool.daemon = Truepool.map(process1, P)#LIST1pool.close()pool.join()end_3=time.time()print('四个进程',end_3-start_3)

案例 二 小程序 显示受限数据实时更新

遇到每次展示150条,但每次请求返回不同的数据,这种数据库看似深不见底,因为不知道到底有多少数据,就需要尽可能多地爬取。

确定enterpriseName公司名称为主键,作为去重keyend

具体设置方法
方案1 缓存方案
爬"上海市"的时候,把mongoDB的数据缓存到redis,然后设置30分钟或者爬取完释放缓存。

方案2 Redis过滤器快速去重
(1)刚开始爬虫 把MongDB数据库所有数据缓存到redis
(2)把新爬的数据按照keyend和redis主键对比
(3)如果keyend不在redis主键中,插入redis,再插入MongDB
(4)如果keyend在redis主键中,不插入redis
(5) connection.flushall()释放redis数据的缓存
这个拿字典也能实现,只是效率没有redis高,等到一百万数据的时候这个会很明显

    def process_item(self, item, spider):items1= ItemAdapter(item).asdict()global connection# 用redis作为缓存来去重数据pool = redis.ConnectionPool(host='localhost', port=6379, db=3)connection = redis.Redis(connection_pool=pool)def pross_Redis(item):# item["_id"] = str(item['_id'])item.pop('_id', None)item["certList"] = str(item["certList"])keyend = item["enterpriseName"]#     print(len(connection.hkeys(keyend)))isExists = connection.hexists(keyend, "certList")if isExists != True:  # 如果新数据不存在于数据库,插入print(1)connection.hmset(keyend, mapping=item)  # 批量插入return Trueelif isExists == True:  # 如果有print("重复数据")else:print("其他数据")postItem = dict(items1)keyend1=  postItem["enterpriseName"]isExists = connection.hexists(keyend1, "certList")if isExists != True:  # 如果新数据不存在于数据库,插入# 把item转化成字典形式print(postItem)judje=pross_Redis(postItem)if judje==True:print(postItem)self.coll.insert_one(postItem)return items1

redis作为MongoDB的缓存在线实时去重相关推荐

  1. 常用数据库 知识点大全 (Mysql,Redis,MongoDB)

    目录 Mysql 1.1 5大引擎 共10个 1.2 事务 1.3 锁1 锁2 1.4 Btree/B+tree 1.5 mysql进阶 Redis 2.1 Redis介绍 2.2 redis的五大数 ...

  2. HBase、Redis、MongoDB、Couchbase、LevelDB 五款主流NoSQL数据库大比拼

    在 HBase.Redis.MongoDB.Couchbase.LevelDB 五款较主流的数据库产品中,本文将主要对它们进行分析对比. 鉴于缺乏项目中的实战经验沉淀,本文内容和观点主要还是从各平台资 ...

  3. HBase、Redis、MongoDB、Couchbase、LevelDB主流 NoSQL 数据库的对比

    最近小组准备启动一个 node 开源项目,从前端亲和力.大数据下的IO性能.可扩展性几点入手挑选了 NoSql 数据库,但具体使用哪一款产品还需要做一次选型. 我们最终把选项范围缩窄在 HBase.R ...

  4. memcache、Redis与MongoDB的学习-1

    除此接触这三个词的概念,对今天看的资料最了一些整理. 之前经常有看到memcache.Redis与MongoDB相关的数据库,最开始意味这些都只是用来做数据库优化的缓存工具,后来具体看了一些资料之后才 ...

  5. 基于Redis+MySQL+MongoDB存储架构应用

    摘  要: Redis+MySQL+MongoDB技术架构实现了本项目中大数据存储和实时云计算的需求.使用MongoDB切片的水平动态添加,可在不中断平台业务系统的同时保障扩容后的查询速度和云计算效能 ...

  6. Redis,MongoDB面试题

    什么是Redis Redis(Remote Dictionary Server) 是一个使用 C 语言编写的,开源的(BSD许可)高性能非关系型(NoSQL)的键值对数据库. Redis 可以存储键和 ...

  7. 基于 Flink 的超大规模在线实时反欺诈系统的建设与实践

    在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力.这个过程中,某些中介机构会搜集大量的号并进行"养号"工作,即在一年周期里让这些号形成正常的消费.通讯记录, ...

  8. 基于Flink的超大规模在线实时反欺诈系统的建设与实践

    作者:关贺宇 在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力.这个过程中,某些中介机构会搜集大量的号并进行"养号"工作,即在一年周期里让这些号形成正常的消 ...

  9. Redis和MongoDB的区别

    Redis和MongoDB的区别(面试受用) 项目中用的是MongoDB,但是为什么用其实当时选型的时候也没有太多考虑,只是认为数据量比较大,所以采用MongoDB. 最近又想起为什么用MongoDB ...

最新文章

  1. SVM-支持向量机原理详解与实践之一
  2. 巧用Linux命令完成统计排序功能yes2
  3. 设有n个正整数,将他们连接成一排,组成一个最大的多位整数
  4. raspberry pi_如何将Raspberry Pi配置为微控制器
  5. nc 发布元数据_智联科发布新一代连续式NC膜粘膜划膜一体机
  6. jQuery 选择器简介
  7. c# textbox和listbox多行显示
  8. Java基础之continue与break区别
  9. 阿里钉钉向1000万企业组织免费开放在家办公系统
  10. linux安装离线docker包教程,Centos7离线安装Docker环境
  11. 小米 MIUI 12 Magisk root教程(无需刷REC)
  12. 永磁同步电机矢量控制(四)——simulink仿真搭建
  13. 关于lua加密luac的有关问题
  14. 解决Symantec卸载需要密码问题又一新招
  15. div+css实现圆形loading动画,渐变拖尾动画
  16. Trainning 1 DAY
  17. Flutter安装后出现HTTP host not reachable.
  18. 【解决】Android 腾讯地图 选点定位组件,获取当前位置有偏差所遇到的坑!!
  19. 京东到家话费券系统NIO实战
  20. 抓包工具Fiddler抓取手机包和修改接口数据

热门文章

  1. SQL之分配、收回权限
  2. Dev C++中文乱码问题
  3. Drupal9笔记之block权限踩坑
  4. 如何利用Python写一个病毒,去逗逗室友!毕竟他新买的电脑
  5. 【Nmap使用详解】黑客渗透工具Nmap下载安装/实战使用,保姆级Namp教程
  6. Mysql 监控 performance_schema 拿得起,放不下(2)
  7. MySql8.0.22数据库安装教程
  8. 《巴菲特致股东的信(第4版)》笔记——那些不变的
  9. 360安卓_数据 | TalkingData:360手机助手份额32.27%居第一
  10. GD32F130之FMC用户非易失存储器