Python基础入门教程:使用 Python 3 协程快速获得一个代理池

前言

在执行 IO 密集型任务的时候,程序会因为等待 IO 而阻塞。比如我们使用 requests 库来进行网络爬虫请求的话,如果网站响应速度过慢,程序会一直等待网站响应,最终导致其爬取效率十分低下。本文以爬取 IP 代理池为例,演示 Python 中如何利用异步协程来加速网络爬虫。

注:本文示例代码,需要 Python 3.7 及以上版本。

协程

协程(Coroutine),又称微线程,纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,在调度回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合。

协程本质上是个单进程,协程相对于多进程来说,无需进程间上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

我们可以使用协程来实现异步操作,比如在网络爬虫场景下,在发出一个请求之后,需要等待一定的时间才能得到响应。其实在这个等待过程中,程序可以干许多其他的事情,等到响应返回之后再切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势。

Python 中的协程

从 Python 3.4 开始,Python 中加入了协程的概念,这个版本的协程是通过生成器对象来实现的,在 Python 3.5 中增加了 asyncio 库和 async、await 关键字,使得协程的实现更加方便。

asyncio 库

首先我们先来看一个不使用协程的程序,代码如下:

import time
​
​
def job(t):print('Start job {}'.format(t))time.sleep(t)  # 等待 t 秒print('Job {0} takes {0}s'.format(t))
​
​
def main():[job(t) for t in range(1, 3)]
​
​
start = time.time()
main()
print("total time: {}".format(time.time() - start))复制代码

运行结果:

Start job 1
Job 1 takes 1s
Start job 2
Job 2 takes 2s
total time: 3.001577138900757复制代码

从运行结果可以看出,我们的 job 是按顺序执行的。必须执行完 job 1 才能开始执行 job 2, job 1 需要 1 秒的执行时间,job 2 需要 2 秒的执行时间,所以总时间是 3 秒多。

如果我们使用协程的方式,job 1 在等待 time.sleep(t) 执行结束的时候(可以看做是等待一个网页的下载成功),是可以切换到 job 2 执行的。

我们再来看一下使用协程改造后的代码:

import time
import asyncio
​
​
async def job(t):  # 使用 async 关键字将一个函数定义为协程print('Start job {}'.format(t))await asyncio.sleep(t)  # 等待 t 秒, 期间切换执行其他任务print('Job {0} takes {0}s'.format(t))
​
​
async def main(loop):  # 使用 async 关键字将一个函数定义为协程tasks = [loop.create_task(job(t)) for t in range(1, 3)]  # 创建任务, 不立即执行await asyncio.wait(tasks)  # 执行并等待所有任务完成
​
​
start = time.time()
loop = asyncio.get_event_loop()  # 建立 loop
loop.run_until_complete(main(loop))  # 执行 loop
loop.close()  # 关闭 loop
print("total time: {}".format(time.time() - start))复制代码

运行结果:

Start job 1
Start job 2
Job 1 takes 1s
Job 2 takes 2s
total time: 2.0033459663391113复制代码

从运行结果可以看出,我们没有等待 job 1 执行结束再开始执行 job 2,而是 job 1 触发 await 的时候切换到了 job 2 。 这时 job 1 和 job 2 同时在执行 await asyncio.sleep(t),所以最终程序的执行时间取决于执行时间最长的那个 job,也就是 job 2 的执行时间:2 秒。

aiohttp 库

在对 asyncio 库做了简单了解之后,我们来看一下如何通过协程来改造我们的爬虫程序。

安装 aiohttp 库:

pip install aiohttp复制代码

我们先来看一下使用 reqeusts 库实现一个网页的爬取:

import time
​
import requests
​
def fetch(url):r = requests.get(url)return r.url
​
​
def main():results = [fetch('http://www.baidu.com') for _ in range(2)]print(results)
​
​
start = time.time()
main()
print("total time: {}".format(time.time() - start))复制代码

运行结果:

['http://www.baidu.com/', 'http://www.baidu.com/']
total time: 1.5445010662078857复制代码

使用 requests 库,访问两次 www.baidu.com,共耗时 1.5 秒

我们用 aiohttp 库来改造上面的代码:

import time
import asyncio
​
import aiohttp
​
​
async def fetch(session, url):response = await session.get(url)  # await 等待网络 IO 并切换协程return str(response.url)
​
​
async def main(loop):async with aiohttp.ClientSession() as session:tasks = [loop.create_task(fetch(session, 'http://www.baidu.com'))for _ in range(2)]done, pending = await asyncio.wait(tasks)  # 执行并等待所有任务完成results = [r.result() for r in done]  # 获取所有返回结果print(results)
​
​
start = time.time()
loop = asyncio.get_event_loop()  # 建立 事件循环
loop.run_until_complete(main(loop))  # 在 事件循环 中执行协程
loop.close()  # 关闭 事件循环
print("total time: {}".format(time.time() - start))复制代码

运行结果:

['http://www.baidu.com', 'http://www.baidu.com']
total time: 0.10848307609558105复制代码

使用 aiohttp 的代码执行时间较使用 reqeusts 的代码有大幅的提升。

上例中,我们使用官方推荐的方式创建 session,并通过 session 执行 get 操作。aiohttp 官方建议一个 application 中共享使用一个 session,不要为每个请求都创建 session。

使用 asyncio 和 aiohttp 快速获得一个代理池

通过爬虫解析免费的代理发布网站页面,来生成代理池。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
@author: qfedu.com
"""
import os
import re
import time
import asyncio
​
import aiohttp
​
HEADERS = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15'
}
​
OUTPUT_FILE = 'proxies.txt'  # 代理池输出文件
SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top']  # 代理发布网站
CHECK_URL = 'http://www.baidu.com'
LOCAL_PROXY = None  # 在本地发起请求时的代理
​
​
# http get 协程
async def fetch(session, url, proxy=None):proxy_headers = HEADERS if proxy else Nonetry:async with session.get(url, headers=HEADERS, proxy=proxy,proxy_headers=proxy_headers,timeout=aiohttp.ClientTimeout(total=5)) as response:if response.status == 200:return await response.text()else:return ''except:return ''
​
​
# 从代理发布网站获取代理发布页面链接
async def get_page_links(loop, session):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in SITES]  # 创建协程任务done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成htmls = [f.result() for f in done]  # 获取所有返回结果
​# 解析出 html 页面中的代理发布链接def parse(html):return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html)
​results = map(parse, htmls)  # 逐个解析 html 页面
​return [y for x in results for y in x]
​
​
# 从代理发布页面获取代理 IP
async def get_proxies(loop, session, page_links):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in page_links]  # 创建协程任务done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成htmls = [f.result() for f in done]  # 获取所有返回结果
​# 解析出 html 页面中的代理 IPdef parse(html):return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html)
​results = map(parse, htmls)  # 逐个解析 html 页面
​return list(set([y for x in results for y in x]))
​
​
# 验证代理 IP
async def check_proxy(session, proxy):html = await fetch(session, CHECK_URL, proxy=proxy)
​# 如果返回通过代理 IP 访问的页面,则说明代理 IP 有效return proxy if html else ''
​
​
# 通过协程批量验证代理 IP,每次同时发起 200 个验证请求
async def check_proxies(loop, session, proxies):checked_proxies = []for i in range(0, len(proxies), 200):_proxies = [proxy.strip() if proxy.strip().startswith('http://')else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]tasks = [loop.create_task(check_proxy(session, proxy))for proxy in _proxies]done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成checked = [f.result() for f in done]  # 获取所有返回结果checked_proxies += [p for p in checked if p]  # 获取不为空的返回值,即验证成功的代理 IP
​return checked_proxies
​
​
# 将代理 IP 逐行保存到文件
def save_proxies(proxies):# 创建新文件,如果文件已存在,则清空文件内容with open(OUTPUT_FILE, 'w') as f:f.write('')
​# 通过追加写模式,逐行写入文件with open(OUTPUT_FILE, 'a') as f:for proxy in proxies:f.write(proxy + '\n')
​
​
async def main(loop):async with aiohttp.ClientSession() as session:page_links = await get_page_links(loop, session)  # 获得代理发布页面链接# 从代理发布页面获得代理 IPproxies = await get_proxies(loop, session, page_links)print('total proxy: {}'.format(len(proxies)))  # 解析出的代理 IP 总量proxies = await check_proxies(loop, session, proxies)  # 验证代理 IP
​print('total checked proxy: {}'.format(len(proxies)))  # 验证后的代理 IP 总量save_proxies(proxies)  # 保存代理 IP 到文件
​
​
start = time.time()
loop = asyncio.get_event_loop()  # 建立 事件循环
loop.run_until_complete(main(loop))  # 在 事件循环 中执行协程
loop.close()  # 关闭 事件循环
total_time = time.time() - start
print(f'total time: {total_time}')复制代码

运行结果:

total proxy: 15675
total checked proxy: 4503
total time: 487.2807550430298复制代码

更加高效的爬虫

在爬虫程序中,通常有网络请求任务、页面解析任务、数据清洗任务和数据入库任务。

网络请求任务、数据入库任务属于 IO 密集型任务,在 Python 中通常使用多线程模型来提高这类任务的性能,现在还可以通过 aiohttp,Motor(MongoDB 的异步 Python 驱动)等异步框架将性能进一步提升。

页面解析任务、数据清洗任务这类 CPU 密集型的任务我们该如何来提高性能?在 Python 中针对 CPU 密集型任务可以通过 multiprocessing 模块来提升性能,通过 multiprocessing 模块可以使程序运行在多核 CPU 中,增加 CPU 的利用率以提升计算性能。

给代理池爬虫示例增加多核计算支持:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
@author: qfedu.com
"""
import os
import re
import time
import asyncio
from multiprocessing import Pool
​
import aiohttp
​
HEADERS = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0.3 Safari/605.1.15'
}
​
OUTPUT_FILE = 'proxies.txt'  # 代理池输出文件
SITES = ['http://www.live-socks.net', 'http://www.proxyserverlist24.top']  # 代理发布网站
CHECK_URL = 'http://www.baidu.com'
LOCAL_PROXY = 'http://127.0.0.1:1087'  # •在本地发起请求时的代理
​
​
# http get 协程
async def fetch(session, url, proxy=None):proxy_headers = HEADERS if proxy else Nonetry:async with session.get(url, headers=HEADERS, proxy=proxy,proxy_headers=proxy_headers,timeout=aiohttp.ClientTimeout(total=5)) as response:if response.status == 200:return await response.text()else:return ''except:return ''
​
# 解析出 html 页面中的代理发布链接
​
​
def parse_page_link(html):return re.findall(r'<h3[\s\S]*?<a.*?(http.*?\.html).*?</a>', html)
​
# 从代理发布网站获取代理发布页面链接
​
​
async def get_page_links(loop, session):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in SITES]  # 创建协程任务done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成htmls = [f.result() for f in done]  # 获取所有返回结果
​# 利用多核 CPU 的计算能力提升页面解析性能with Pool(processes=os.cpu_count() * 2) as pool:results = pool.map(parse_page_link, htmls)
​return [y for x in results for y in x]
​
# 解析出 html 页面中的代理 IP
​
​
def parse_proxy(html):return re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}', html)
​
# 从代理发布页面获取代理 IP
​
​
async def get_proxies(loop, session, page_links):tasks = [loop.create_task(fetch(session, url, proxy=LOCAL_PROXY))for url in page_links]  # 创建协程任务done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成htmls = [f.result() for f in done]  # 获取所有返回结果
​# 利用多核 CPU 的计算能力提升页面解析性能with Pool(processes=os.cpu_count() * 2) as pool:results = pool.map(parse_proxy, htmls)
​return list(set([y for x in results for y in x]))
​
​
# 验证代理 IP
async def check_proxy(session, proxy):html = await fetch(session, CHECK_URL, proxy=proxy)
​# 如果返回通过代理 IP 访问的页面,则说明代理 IP 有效return proxy if html else ''
​
​
# 通过协程批量验证代理 IP,每次同时发起 200 个验证请求
async def check_proxies(loop, session, proxies):checked_proxies = []for i in range(0, len(proxies), 200):_proxies = [proxy.strip() if proxy.strip().startswith('http://')else 'http://' + proxy.strip() for proxy in proxies[i:i + 200]]tasks = [loop.create_task(check_proxy(session, proxy))for proxy in _proxies]done, _ = await asyncio.wait(tasks)  # 执行并等待所有任务完成checked = [f.result() for f in done]  # 获取所有返回结果checked_proxies += [p for p in checked if p]  # 获取不为空的返回值,即验证成功的代理 IP
​return checked_proxies
​
​
# 将代理 IP 逐行保存到文件
def save_proxies(proxies):# 创建新文件,如果文件已存在,则清空文件内容with open(OUTPUT_FILE, 'w') as f:f.write('')
​# 通过追加写模式,逐行写入文件with open(OUTPUT_FILE, 'a') as f:for proxy in proxies:f.write(proxy + '\n')
​
​
async def main(loop):async with aiohttp.ClientSession() as session:page_links = await get_page_links(loop, session)  # 获得代理发布页面链接# 从代理发布页面获得代理 IPproxies = await get_proxies(loop, session, page_links)print('total proxy: {}'.format(len(proxies)))  # 解析出的代理 IP 总量proxies = await check_proxies(loop, session, proxies)  # 验证代理 IP
​print('total checked proxy: {}'.format(len(proxies)))  # 验证后的代理 IP 总量save_proxies(proxies)  # 保存代理 IP 到文件
​
​
start = time.time()
loop = asyncio.get_event_loop()  # 建立 事件循环
loop.run_until_complete(main(loop))  # 在 事件循环 中执行协程
loop.close()  # 关闭 事件循环
total_time = time.time() - start
print(f'total time: {total_time}')复制代码

进程间的调度及上下文切换是非常消耗资源的。上面例子中解析任务比较简单,解析量也非常少,增加多核计算支持后,性能几乎没有提升还有可能降低。在实际爬虫项目中需要根据实际情况来衡量和选择。

Python基础入门教程:使用 Python 3 协程快速获得一个代理池相关推荐

  1. python免费入门手册-Python 基础入门教程

    Python是一种解释型.面向对象.动态数据类型的高级程序设计语言. Python由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年. <Python 基础入 ...

  2. 超全面Python基础入门教程【十天课程】博客笔记汇总表

    目录 1.学习地址 2.下载地址[视频.源码.课件.软件] 3.详细博客笔记 Day 01 Day 02 Day 03 Day 04 Day 05 Day 06 Day 07 Day 08 Day 0 ...

  3. Python基础入门教程:Day21-30/Web前端概述

    Python基础入门教程:Web前端概述 说明:本文使用的部分插图来自 Jon Duckett 先生的*HTML and CSS: Design and Build Websites*一书,这是一本非 ...

  4. Python基础入门教程

    Python基础入门教程 Python基础教程 Python 简介 Python环境搭建 Python 基础语法 Python 变量类型 Python 运算符 Python 条件语句 Python 循 ...

  5. python菜鸟基础教程-python基础菜鸟教程,Python的基础语法

    原标题:python基础菜鸟教程,Python的基础语法 什么是Python?Python是一门简单直观的编程语言,并且目前是开源的,可以方便任何人使用. Python的开发哲学:用一种方法,最好是只 ...

  6. python基础教程菜鸟教程-python基础菜鸟教程,Python的基础语法

    原标题:python基础菜鸟教程,Python的基础语法 什么是Python?Python是一门简单直观的编程语言,并且目前是开源的,可以方便任何人使用. Python的开发哲学:用一种方法,最好是只 ...

  7. 【全网力荐】堪称最易学的Python基础入门教程

    目录 数据的名字和种类--变量和类型 初探数据种类 数据类型 数值运算 比较运算 变量和赋值 变量的好处 用赋值更新变量 变量和数据类型的关系 总结 数据类型 数值运算 数值比较 变量和赋值 一串数据 ...

  8. python新手入门教程思路-Python新手入门教程_教你怎么用Python做数据分析

    Python新手入门教程_教你怎么用Python做数据分析 跟大家讲了这么多期的Python教程,有小伙伴在学Python新手教程的时候说学Python比较复杂的地方就是资料太多了,比较复杂.很多网上 ...

  9. 转:量化交易零基础入门教程之——python基本语法与变量

    感谢原作者:JoinQuant-TWist 转自:JoinQuant 重要提示:聚宽提供了非常好的数据库接口,愁没研究数据的小伙伴可以加微信(jqdata01)详细了解 原文链接:https://ww ...

最新文章

  1. 网页截图工具CutyCapt
  2. python数据存储与读取_【Python爬虫】数据保存与读取
  3. 百度空间互踩_贝壳联手百度地图 整合新房信息找房更便捷
  4. MOSS推荐之1-WSS V3服务器架构
  5. 阿里云今日发布数据库产品HybridDB
  6. Linux make menuconfig查找并快速跳转指定驱动选项
  7. 静态static的内存图
  8. linux下su和sudo的区别 (/libtool: arm-linux-gcc command not found )
  9. (六)ElasticSearch 6.1.1聚合查询
  10. (摘要)新基建风口下,今年工业互联网平台将呈现十大新特征
  11. 批量文件替换_CAD图形文件中如何快速批量替换文字?【AutoCAD教程】
  12. Setup Factory 9安装前卸载旧版本的方法
  13. 在Ubuntu上面使用华为EC3372
  14. android打开ofd文件
  15. 小米手机无需刷入Recovery获取Root权限,卡刷包payload.bin提取boot.img文件
  16. 生物信息常用文件格式
  17. 第八周——重载运算符——项目一(1)实现复数的+-*/
  18. linux批量删除文件或者文件夹
  19. 2018年美亚杯电子数据取证大赛-资格赛wp
  20. python 读写HDF5

热门文章

  1. 构建ASP.NET MVC5+EF6+EasyUI 1.4.3+Unity4.x注入的后台管理系统(61)-如何使用框架来开发?...
  2. 基于Case的MIS系统 - 总账模块
  3. IE与Firefox的CSS兼容
  4. Ubuntu 取消 Apache及MySQL等自启动
  5. 基于Centos7.2搭建Cobbler自动化批量部署操作系统服务
  6. zabbix自动发现
  7. request获取各种路径 转,记下来免得 以后忘记了。
  8. Getting Started with Node.js LoopBack Framework and IBM Cloudant
  9. 【Shell】fix 1032报错信息的脚本
  10. Helios与Katana的区别