异步爬虫模块aiohttp实战之infoq
点击上方蓝字关注
异步爬虫模块aiohttp实战之infoq
之前写过很多的关于异步的文章,介绍了asyncio的使用,大多的只是通过简单的例子说明了asyncio的使用,但是没有拿出具体的使用例子,所以这次特意用一个具体例子来说明异步模块aiohttp配合asyncio的使用。
这次以infoq网站为例子进行讲解,infoq是一个怎样的网站,这里引用官网的一句话
「InfoQ 是一个实践驱动的社区资讯站点,致力于促进软件开发领域知识与创新的传播。… InfoQ 每周精要 订阅InfoQ 每周精要,加入资深开发者汇聚的庞大技术社区。」 很明显了infoq是一个技术型的网站,这次我们就对该网站的推荐内容进行讲解。
思路分析
这里先说下思路,这里分了两步,首先爬取列表页,然后再去爬详情页的内容。
爬取列表页的数据保存到数据库,这里我使用的是mongodb,同时加一个字段标注状态,这个状态是为了后面详情页可以做到一个续爬,这里我设置三种状态0,1,2其中0表示初始状态也就是还没开始爬,1表示开始下载,2表示下载成功。通过状态我们可以直到已经完成了多少链接的爬取了。进而达到的一个续爬的效果。爬取详情页的时候就去读取数据库中状态为0的数据。根据状态码我们就可以作出对应的操作了,例如我们任务都完了,查看列表页的数据发现状态是1,可以知道这个爬取的过程中出现了问题,所以我们将状态改为0,再运行详情页的程序即可。
网站分析之列表部分
首先让我分析一下,这个推荐内容部分都有哪些请求,经过查看发现在链接https://www.infoq.cn/public/v1/my/recommond 里面有我们需要的内容
很明显这是一个ajax请求的页面,然后就是分析这个链接的请求方式和请求参数了
请求链接我们已经知道了,然后看请求参数部分经过观察我们发现请求参数的格式为
{"size":12,"score":1549328400000}
然后我们只要直到参数来源就好了,一般通过多翻几页去总结规律,观察发现size的值都是12,也就是一页显示的内容,然后是score内容,我们发现每一页的score内容是不同的,通过搜索 可以发现再上一页的json内容中包含着这个字段。
于是我们就可以写代码了。
列表部分的代码
因为列表页部分是一步接一步的不适合并发所以我们用requests模块常规的方式爬取即可。
文件名infoq_seed_spider.py
网络请求部分
def get_req(self, data=None): req = self.session.post(self.start_url, data=json.dumps(data)) if req.status_code in [200, 201]: return req
数据解析部分
def save_data(self, data): tasks = [] for item in data: try: dic = {} uuid = item.get("uuid") dic["uuid"] = uuid# 经过分析发现uuid就是详情页链接的组成部分。 dic["url"] = f"https://www.infoq.cn/article/{uuid}" dic["title"] = item.get("article_title") dic["cover"] = item.get("article_cover") dic["summary"] = item.get("article_summary") author=item.get("author") if author: dic["author"] = author[0].get("nickname") else: dic["author"]=item.get("no_author","").split(":")[-1] score=item.get("publish_time") dic["publish_time"] = datetime.datetime.utcfromtimestamp(score/1000).strftime("%Y-%m-%d %H:%M:%S") dic["tags"] = ",".join([data.get("name") for data in item.get("topic")]) translate = item.get("translator") dic["translator"] = dic["author"] if translate: dic["translator"] = translate[0].get("nickname") dic["status"] = 0 dic["update_time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") tasks.append(dic) except IndexError as e: crawler.error("解析出错") Mongo().save_data(tasks) crawler.info(f"add {len(tasks)} datas to mongodb") return score
程序入口
def start(self): i = 0 post_data = {"size": 12} while i < 10: #这里只爬取了10页,这个值可以自己设置 req = self.get_req(post_data) data = req.json().get("data") score = self.save_data(data) post_data.update({"score": score}) #通过上一页的内容得知下一次要请求的参数。 i += 1 time.sleep(random.randint(0, 5))
列表页逻辑简单代码也是同步,就没有什么需要强调的地方了
新手需要注意是访问量和频率,这里只是做一个学习范例所以请求量和请求频率相应的比较低。
网站分析之详情部分
这里分为两部分存储,一个是详情内容的封面我将图片保存到了本地。另外一个就是详情内容的爬取保存到数据库里,保存图片的同时获取图片的路径一起保存在数据库,方便找到文章的配图。
详情页的思考
一开始我以为详情页的内容请求类似https://www.infoq.cn/article/cY3lZj1G-cR2iJ3ONUd6链接就行了,但是后来事实证明我想简单了,详情页其实也是通过ajax加载的,经过和列表页类似的分析,发现请求链接为:https://www.infoq.cn/public/v1/article/getDetail,请求参数为 uuid,也就是详情页的链接后面的部分,我们在爬取列表页的时候已经获取了。
也就是说详情页的爬虫我们是通过访问数据库的形式去进行爬取的,这里我们就可以做到并发去访问了。这里就让我们熟悉一下aiohttp的使用已经其他异步库的使用。
详情页的代码部分
首先说下我们需要的包,aiohttp异步网络请求包,motor异步mongodb请求包,aiofiles异步文件操作包,async_timeout请求timeout设置包。以上包我们都可以通过pip进行安装,下面我们对这些包进行一一了解。
数据读取
首先我们需要读取数据的数据,这里我们使用pymongo。
首先是导入必备的包pip install pymongo安装即可。
读取部分没有涉及太多的操作,我们就直接获取一个生成器格式的数据即可
def find_data(self, col="infoq_seed"): # 获取状态为0的数据 data = self.db[col].find({"status": 0}) gen = (item for item in data) return gen
入口函数
async def run(data): crawler.info("Start Spider") async with aiohttp.connector.TCPConnector(limit=300, force_close=True, enable_cleanup_closed=True) as tc: #限制tcp连接数 async with aiohttp.ClientSession(connector=tc) as session: #创建一个可持续链接的session,传递下去, coros = (asyncio.ensure_future(bound_fetch(item, session)) for item in data) await start_branch(coros)
协程分流
这里主要做一个任务的均分,对于同步中的迭代器我们可以使用itertools的islice模块来实现
# -*- coding: utf-8 -*-# @Time : 2019/1/2 11:52 AM# @Author : cxa# @File : 切片.py# @Software: PyCharmfrom itertools import islice
la = (x for x in range(20))print(type(la))for item in islice(la, 5, 9): # 取下标5-9的元素 print(item)
但是异步生成器没有这中方法所以定义了如下方式进行分流。
下面代码的作用就是每次并发10个。通过修改limited_as_completed
方法的第二个参数可以设置不同的并发量。
async def start_branch(tasks): # 分流 [await _ for _ in limited_as_completed(tasks, 10)]
async def first_to_finish(futures, coros): while True: await asyncio.sleep(0.01) for f in futures: if f.done(): futures.remove(f) try: new_future = next(coros) futures.append(asyncio.ensure_future(new_future)) except StopIteration as e: pass return f.result()
def limited_as_completed(coros, limit): futures = [asyncio.ensure_future(c) for c in islice(coros, 5, limit)]
while len(futures) > 0: yield first_to_finish(futures, coros)
一般对于并发100万以及更大的数据量时,可以使用此方案。
下面具体说下网络请求部分的逻辑。
网络请求
这里分了两部分进行并行抓取,图片部分和详情内容部分
async def bound_fetch(item, session): md5name = item.get("md5name") file_path = os.path.join(os.getcwd(), "infoq_cover") image_path = os.path.join(file_path, f"{md5name}.jpg")
item["md5name"] = md5name item["image_path"] = image_path item["file_path"] = file_path async with sema: await fetch(item, session) #内容抓取部分协程 await get_buff(item, session) #图片抓去部分协程
内容部分核心内容
async def fetch(item, session, retry_index=0): try: refer = item.get("url") name = item.get("title") uuid = item.get("uuid") md5name = hashlib.md5(name.encode("utf-8")).hexdigest() # 图片的名字 item["md5name"] = md5name data = {"uuid": uuid} headers["Referer"] = refer if retry_index == 0: await MotorBase().change_status(uuid, item, 1) #开始下载并修改列表页的状态 with async_timeout.timeout(60): async with session.post(url=base_url, headers=headers, data=json.dumps(data)) as req: res_status = req.status
if res_status == 200: jsondata = await req.json() await get_content(jsondata, item) #获取内容 await MotorBase().change_status(uuid, item, 2) #修改状态下载成功 except Exception as e: jsondata = None if not jsondata: crawler.error(f'Retry times: {retry_index + 1}') retry_index += 1 return await fetch(item, session, retry_index)
图片抓取部分核心内容
async def get_buff(item, session): url = item.get("cover") with async_timeout.timeout(60): async with session.get(url) as r: if r.status == 200: buff = await r.read() if len(buff): crawler.info(f"NOW_IMAGE_URL:, {url}") await get_img(item, buff)
数据库的异步读取操作
首先导入mongo的异步库motor。
from motor.motor_asyncio import AsyncIOMotorClient
然后创建数据库链接
class MotorBase(): def __init__(self): self.__dict__.update(**db_configs) if self.user: self.motor_uri = f"mongodb://{self.user}:{self.passwd}@{self.host}:{self.port}/{self.db_name}?authSource={self.user}" self.motor_uri = f"mongodb://{self.host}:{self.port}/{self.db_name}" self.client = AsyncIOMotorClient(self.motor_uri) self.db = self.client.spider_data
上面的代码可以根据是否需要用户名来创建不同的链接方式
其中读取的配置内容格式如下
# 数据库基本信息db_configs = { 'type': 'mongo', 'host': '127.0.0.1', 'port': '27017', "user": "", "password": "", 'db_name': 'spider_data'}
状态修改
async def change_status(self, uuid, item, status_code=0): # status_code 0:初始,1:开始下载,2下载完了 try: # storage.info(f"修改状态,此时的数据是:{item}") item["status"] = status_code await self.db.infoq_seed.update_one({'uuid': uuid}, {'$set': item}, upsert=True) except Exception as e: if "immutable" in e.args[0]: await self.db.infoq_seed.delete_one({'_id': item["_id"]}) storage.info(f"数据重复删除:{e.args},此时的数据是:{item}") else: storage.error(f"修改状态出错:{e.args}此时的数据是:{item}")
数据保存
async def save_data(self, item): try: await self.db.infoq_details.update_one({ 'uuid': item.get("uuid")}, {'$set': item}, upsert=True) except Exception as e: storage.error(f"数据插入出错:{e.args}此时的item是:{item}")
图片保存
async def get_img(item, buff): # 题目层目录是否存在 file_path = item.get("file_path") image_path = item.get("image_path") if not os.path.exists(file_path): os.makedirs(file_path)
# 文件是否存在 if not os.path.exists(image_path): storage.info(f"SAVE_PATH:{image_path}") async with aiofiles.open(image_path, 'wb') as f: await f.write(buff)
motor,aiofiles的使用类似pymongo,os.open只不过使用async/await关键字实现而已.大家有什么不太懂地方可以加我微信italocxa,进群探讨。后台回复infoq可以得到完整的项目代码哦
更多异步操作
1
PEP525--异步生成器
2
异步网络库aiohttp学习
3
asyncio之Coroutines,Tasks&Future
4
PEP530--异步推导式
致转行AI的在校大学生的一封信
转行AI需要看的一些文章
转行学AI,具体细分方向如何选,来自一线工程师的感悟
用法律武器,痛击腾讯侵权行为!!!湾区人工智能可以改善知识产权现状吗?
【送书PDF】Python编程从入门到实践
Python从入门到精通,深度学习与机器学习资料大礼包!
【免费】某机构最新3980元机器学习/大数据课程高速下载,限量200份
觉得不错, 请随意转发,麻烦点个在看!
异步爬虫模块aiohttp实战之infoq相关推荐
- 异步爬虫爬取实战-asyncio
第一步,导入模块 import requests import asyncio import aiohttp import time 第二步创建类绑定内容,写入口函数 class WZ(object) ...
- python链家网高并发异步爬虫asyncio+aiohttp+aiomysql异步存入数据
python链家网二手房异步IO爬虫,使用asyncio.aiohttp和aiomysql 很多小伙伴初学python时都会学习到爬虫,刚入门时会使用requests.urllib这些同步的库进行单线 ...
- aiohttp保存MySQL_python链家网高并发异步爬虫asyncio+aiohttp+aiomysql异步存入数据
python链家网二手房异步IO爬虫,使用asyncio.aiohttp和aiomysql 很多小伙伴初学python时都会学习到爬虫,刚入门时会使用requests.urllib这些同步的库进行单线 ...
- 前端调用mysql异步_python链家网高并发异步爬虫asyncio+aiohttp+aiomysql异步存入数据...
python链家网二手房异步IO爬虫,使用asyncio.aiohttp和aiomysql 很多小伙伴初学python时都会学习到爬虫,刚入门时会使用requests.urllib这些同步的库进行单线 ...
- aiohttp mysql_python异步爬虫asyncio+aiohttp+aiomysql异步存入数据
异步IO爬虫,使用asyncio.aiohttp和aiomysql 很多小伙伴初学python时都会学习到爬虫,刚入门时会使用requests.urllib这些同步的库进行单线程爬虫,速度是比较慢的, ...
- 利用aiohttp实现异步爬虫
asyncio可以实现单线程并发IO操作,是Python中常用的异步处理模块.关于asyncio模块的介绍,笔者会在后续的文章中加以介绍,本文将会讲述一个基于asyncio实现的HTTP框架--a ...
- 第17讲:aiohttp 异步爬虫实战
在上一课时我们介绍了异步爬虫的基本原理和 asyncio 的基本用法,另外在最后简单提及了 aiohttp 实现网页爬取的过程,这一可是我们来介绍一下 aiohttp 的常见用法,以及通过一个实战案例 ...
- 异步爬虫-aiohttp库、Twisted库简介
为什么要用异步爬虫? 爬虫本质上就是模拟客户端与服务端的通讯过程.以浏览器端的爬虫为例,我们在爬取不同网页过程中,需要根据url构建很多HTTP请求去爬取,而如果以单个线程为参考对象,平常我们所采取 ...
- python 协程可以嵌套协程吗_Python实战异步爬虫(协程)+分布式爬虫(多进程)
引言:我们在写爬虫时常会遇到这样的问题,当需要爬取多个URL时,写一个普通的基于requests库的爬虫程序爬取时间会很长.因为是顺序请求网页的,而网页请求和获得响应过程比较耗费时间,程序不得不等待获 ...
最新文章
- 10474 - Where is the Marble?
- 如何将zipoutputstream返回_性能问题|如何正确使用“缓存”?
- 花生增产对话万书波-农业大健康·万祥军:获山东科技最高奖
- 正则表达式匹配单行注解
- 在macOS搭建React Native for IOS开发环境
- sphinx随笔记了一下
- 深度学习算法简要综述(下)
- 网络爬虫--26.Scrapy中下载器中间件Downloader Middlewares的使用
- 【今日CV 视觉论文速览】21 Nov 2018
- 写论文的用到的常用技巧
- Delphi自动清除临时文件及备份文件的批处理
- jrebel离线激活_jrebel激活
- 制作MHDD启动U盘
- 数学建模番外篇2:作图练习-美赛2020E题
- Hive字符串函数-空格处理
- 图像处理-图像边缘处理
- 目标检测算法——YOLOv5/YOLOv7改进之结合CBAM
- (leetcode)1723. 完成所有工作的最短时间 -2021/5/8
- 坐月子 请月嫂吗?如何请月嫂?
- kubernetes中的PV、PVC