Python网络爬虫3 - 生产者消费者模型爬取某金融网站数据
博客首发于www.litreily.top
应一位金融圈的朋友所托,帮忙写个爬虫,帮他爬取中国期货行业协议网站中所有金融机构的从业人员信息。网站数据的获取本身比较简单,但是为了学习一些新的爬虫方法和技巧,即本文要讲述的生产者消费者模型,我又学习了一下Python中队列库queue
及线程库Thread
的使用方法。
生产者消费者模型
生产者消费者模型非常简单,相信大部分程序员都知道,就是一方作为生产者不断提供资源,另一方作为消费者不断消费资源。简单点说,就好比餐馆的厨师和顾客,厨师作为生产者不断制作美味的食物,而顾客作为消费者不断食用厨师提供的食物。此外,生产者与消费者之间可以是一对一、一对多、多对一和多对多的关系。
那么这个模型和爬虫有什么关系呢?其实,爬虫可以认为是一个生产者,它不断从网站爬取数据,爬取到的数据就是食物;而所得数据需要消费者进行数据清洗,把有用的数据吸收掉,把无用的数据丢弃。
在实践过程中,爬虫爬取和数据清洗分别对应一个Thread
,两个线程之间通过顺序队列queue
传递数据,数据传递过程就好比餐馆服务员从厨房把食物送到顾客餐桌上的过程。爬取线程负责爬取网站数据,并将原始数据存入队列,清洗线程从队列中按入队顺序读取原始数据并提取出有效数据。
以上便是对生产者消费者模型的简单介绍了,下面针对本次爬取任务予以详细说明。
分析站点
http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfo
我们要爬取的数据是主页显示的表格中所有期货公司的从业人员信息,每个公司对应一个机构编号(G01001~G01198
)。从上图可以看到有主页有分页,共8页。以G01001
方正中期期货公司为例,点击该公司名称跳转至对应网页如下:
从网址及网页内容可以提取出以下信息:
- 网址
- http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo
organid
: 机构编号,+G01001+
~+G01198+
currentPage
: 该机构从业人员信息当前页面编号pageSize
: 每个页面显示的人员个数,默认20selectType
: 固定为personinfo
- 机构名称
mechanism_name
,在每页表格上方可以看到当前机构名称 - 从业人员信息,即每页的表格内容,也是我们要爬取的对象
- 该机构从业人员信息总页数
page_cnt
我们最终爬取的数据可以按机构名称存储到对应的txt文件或excel文件中。
获取机构名称
获取到某机构的任意从业信息页面后,使用BeautifulSoup
可快速提取机构名称。
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
复制代码
那么有人可能会问,既然主页表格都已经包含了所有机构的编号和名称,为何还要多此一举的再获取一次呢?这是因为,我压根就不想爬主页的那些表格,直接根据机构编号的递增规律生成对应的网址即可,所以获取机构名称的任务就放在了爬取每个机构首个信息页面之后。
获取机构信息对应的网页数量
每个机构的数据量是不等的,幸好每个页面都包含了当前页面数及总页面数。使用以下代码即可获取页码数。
url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
复制代码
从每个机构首页获取页码数后,便可for
循环修改网址参数中的currentPage
,逐页获取机构信息。
获取当前页面从业人员信息
针对如上图所示的一个特定信息页时,人员信息被存放于一个表中,除了固定的表头信息外,人员信息均被包含在一个带有id
的tr
标签中,所以使用BeautifulSoup
可以很容易提取出页面内所有人员信息。
soup.find_all('tr', id=True)
复制代码
确定爬取方案
一般的想法当然是逐页爬取主页信息,然后获取每页所有机构对应的网页链接,进而继续爬取每个机构信息。
但是由于该网站的机构信息网址具有明显的规律,我们根据每个机构的编号便可直接得到每个机构每个信息页面的网址。所以具体爬取方案如下:
- 将所有机构编号网址存入队列
url_queue
- 新建生产者线程
SpiderThread
完成抓取任务
- 循环从队列
url_queue
中读取一个编号,生成机构首页网址,使用requests
抓取之 - 从抓取结果中获取页码数量,若为0,则返回该线程第1步
- 循环爬取当前机构剩余页面
- 将页面信息存入队列
html_queue
- 新建消费者线程
DatamineThread
完成数据清洗任务
- 循环从队列
html_queue
中读取一组页面信息 - 使用
BeautifulSoup
提取页面中的从业人员信息 - 将信息以二维数组形式存储,最后交由数据存储类
Storage
存入本地文件
代码实现
生成者SpiderThread
爬虫线程先从队列获取一个机构编号,生成机构首页网址并进行爬取,接着判断机构页面数量是否为0,如若不为0则继续获取机构名称,并根据页面数循环爬取剩余页面,将原始html数据以如下dict
格式存入队列html_queue
:
{'name': mechanismId_mechanismName,'num': currentPage,'content': html
}
复制代码
爬虫产生的数据队列html_queue
将由数据清洗线程进行处理,下面是爬虫线程的主程序,整个线程代码请看后面的源码。
def run(self):while True:mechanism_id = 'G0' + self.url_queue.get()# the first page's urlurl = self.__get_url(mechanism_id, 1)html = self.grab(url)page_cnt = self.url_re.search(html.text).group(1)if page_cnt == '0':self.url_queue.task_done()continuesoup = BeautifulSoup(html.text, 'html.parser')mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))# put data into html_queueself.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})for i in range(2, int(page_cnt) + 1):url = self.__get_url(mechanism_id, i)html = self.grab(url)self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})self.url_queue.task_done()
复制代码
消费者DatamineThread
数据清洗线程比较简单,就是从生产者提供的数据队列html_queue
逐一提取html
数据,然后从html
数据中提取从业人员信息,以二维数组形式存储,最后交由存储模块Storage
完成数据存储工作。
class DatamineThread(Thread):"""Parse data from html"""def __init__(self, html_queue, filetype):Thread.__init__(self)self.html_queue = html_queueself.filetype = filetypedef __datamine(self, data):'''Get data from html content'''soup = BeautifulSoup(data['content'].text, 'html.parser')infos = []for info in soup.find_all('tr', id=True):items = []for item in info.find_all('td'):items.append(item.get_text())infos.append(items)return infosdef run(self):while True:data = self.html_queue.get()print('Datamine Thread: get %s_%d' % (data['name'], data['num']))store = Storage(data['name'], self.filetype)store.save(self.__datamine(data))self.html_queue.task_done()
复制代码
数据存储Storage
我写了两类文件格式的存储函数,write_txt
, write_excel
,分别对应txt
,excel
文件。实际存储时由调用方确定文件格式。
def save(self, data):{'.txt': self.write_txt,'.xls': self.write_excel}.get(self.filetype)(data)
复制代码
存入txt文件
存入txt
文件是比较简单的,就是以附加(a
)形式打开文件,写入数据,关闭文件。其中,文件名称由调用方提供。写入数据时,每个人员信息占用一行,以制表符\t
分隔。
def write_txt(self, data):'''Write data to txt file'''fid = open(self.path, 'a', encoding='utf-8')# insert the header of tableif not os.path.getsize(self.path):fid.write('\t'.join(self.table_header) + '\n')for info in data:fid.write('\t'.join(info) + '\n')fid.close()
复制代码
存入Excel文件
存入Excel
文件还是比较繁琐的,由于经验不多,选用的是xlwt
, xlrd
和xlutils
库。说实话,这3个库真心不大好用,勉强完成任务而已。为什么这么说,且看:
- 修改文件麻烦:
xlwt
只能写,xlrd
只能读,需要xlutils
的copy
函数将xlrd
读取的数据复制到内存,再用xlwt
修改 - 只支持
.xls
文件:.xlsx
经读写也会变成.xls
格式 - 表格样式易变:只要重新写入文件,表格样式必然重置
所以后续我肯定会再学学其它的excel
库,当然,当前解决方案暂时还用这三个。代码如下:
def write_excel(self, data):'''write data to excel file'''if not os.path.exists(self.path):header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')wb = xlwt.Workbook(encoding='utf-8')ws = wb.add_sheet('Data')# insert the header of tablefor i in range(len(self.table_header)):ws.write(0, i, self.table_header[i], header_style)else:rb = open_workbook(self.path)wb = copy(rb)ws = wb.get_sheet(0)# write dataoffset = len(ws.rows)for i in range(0, len(data)):for j in range(0, len(data[0])):ws.write(offset + i, j, data[i][j])# When use xlutils.copy.copy function to copy data from exist .xls file,# it will loss the origin style, so we need overwrite the width of column,# maybe there some other good solution, but I have not found yet.for i in range(len(self.table_header)):ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]# save to filewhile True:try:wb.save(self.path)breakexcept PermissionError as e:print('{0} error: {1}'.format(self.path, e.strerror))time.sleep(5)finally:pass
复制代码
说明:
- 一个文件对应一个机构的数据,需要多次读取和写入,所以需要计算文件写入时的行数偏移量
offset
,即当前文件已包含数据的行数 - 当被写入文件被人为打开时,会出现
PermissionError
异常,可以在捕获该异常然后提示错误信息,并定时等待直到文件被关闭。
main
主函数用于创建和启动生产者线程和消费者线程,同时为生产者线程提供机构编号队列。
url_queue = queue.Queue()
html_queue = queue.Queue()def main():for i in range(1001, 1199):url_queue.put(str(i))# create and start a spider threadst = SpiderThread(url_queue, html_queue)st.setDaemon(True)st.start()# create and start a datamine threaddt = DatamineThread(html_queue, '.xls')dt.setDaemon(True)dt.start()# wait on the queue until everything has been processedurl_queue.join()html_queue.join()
复制代码
从主函数可以看到,两个队列都调用了join
函数,用于阻塞,直到对应队列为空为止。要注意的是,队列操作中,每个出队操作queue.get()
需要对应一个queue.task_done()
操作,否则会出现队列数据已全部处理完,但主线程仍在执行的情况。
至此,爬虫的主要代码便讲解完了,下面是完整源码。
源码
#!/usr/bin/python3
# -*-coding:utf-8-*-import queue
from threading import Threadimport requestsimport re
from bs4 import BeautifulSoupimport os
import platformimport xlwt
from xlrd import open_workbook
from xlutils.copy import copyimport time# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
#
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}url_queue = queue.Queue()
html_queue = queue.Queue()class SpiderThread(Thread):"""Threaded Url Grab"""def __init__(self, url_queue, html_queue):Thread.__init__(self)self.url_queue = url_queueself.html_queue = html_queueself.page_size = 20self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}def __get_url(self, mechanism_id, current_page):return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+¤tPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \% (mechanism_id, current_page, self.page_size)def grab(self, url):'''Grab html of url from web'''while True:try:html = requests.get(url, headers=self.headers, timeout=20)if html.status_code == 200:breakexcept requests.exceptions.ConnectionError as e:print(url + ' Connection error, try again...')except requests.exceptions.ReadTimeout as e:print(url + ' Read timeout, try again...')except Exception as e:print(str(e))finally:passreturn htmldef run(self):'''Grab all htmls of mechanism one by oneSteps:1. grab first page of each mechanism from url_queue2. get number of pages and mechanism name from first page3. grab all html file of each mechanism4. push all html to html_queue'''while True:mechanism_id = 'G0' + self.url_queue.get()# the first page's urlurl = self.__get_url(mechanism_id, 1)html = self.grab(url)page_cnt = self.url_re.search(html.text).group(1)if page_cnt == '0':self.url_queue.task_done()continuesoup = BeautifulSoup(html.text, 'html.parser')mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))# put data into html_queueself.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})for i in range(2, int(page_cnt) + 1):url = self.__get_url(mechanism_id, i)html = self.grab(url)self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})self.url_queue.task_done()class DatamineThread(Thread):"""Parse data from html"""def __init__(self, html_queue, filetype):Thread.__init__(self)self.html_queue = html_queueself.filetype = filetypedef __datamine(self, data):'''Get data from html content'''soup = BeautifulSoup(data['content'].text, 'html.parser')infos = []for info in soup.find_all('tr', id=True):items = []for item in info.find_all('td'):items.append(item.get_text())infos.append(items)return infosdef run(self):while True:data = self.html_queue.get()print('Datamine Thread: get %s_%d' % (data['name'], data['num']))store = Storage(data['name'], self.filetype)store.save(self.__datamine(data))self.html_queue.task_done()class Storage():def __init__(self, filename, filetype):self.filetype = filetypeself.filename = filename + filetypeself.table_header = ('姓名', '性别', '从业资格号', '投资咨询从业证书号', '任职部门', '职务', '任现职时间')self.path = self.__get_path()def __get_path(self):path = {'Windows': 'D:/litreily/Documents/python/cfachina','Linux': '/mnt/d/litreily/Documents/python/cfachina'}.get(platform.system())if not os.path.isdir(path):os.makedirs(path)return '%s/%s' % (path, self.filename)def write_txt(self, data):'''Write data to txt file'''fid = open(self.path, 'a', encoding='utf-8')# insert the header of tableif not os.path.getsize(self.path):fid.write('\t'.join(self.table_header) + '\n')for info in data:fid.write('\t'.join(info) + '\n')fid.close()def write_excel(self, data):'''write data to excel file'''if not os.path.exists(self.path):header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')wb = xlwt.Workbook(encoding='utf-8')ws = wb.add_sheet('Data')# insert the header of tablefor i in range(len(self.table_header)):ws.write(0, i, self.table_header[i], header_style)else:rb = open_workbook(self.path)wb = copy(rb)ws = wb.get_sheet(0)# write dataoffset = len(ws.rows)for i in range(0, len(data)):for j in range(0, len(data[0])):ws.write(offset + i, j, data[i][j])# When use xlutils.copy.copy function to copy data from exist .xls file,# it will loss the origin style, so we need overwrite the width of column,# maybe there some other good solution, but I have not found yet.for i in range(len(self.table_header)):ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]# save to filewhile True:try:wb.save(self.path)breakexcept PermissionError as e:print('{0} error: {1}'.format(self.path, e.strerror))time.sleep(5)finally:passdef save(self, data):'''Write data to local file.According filetype to choose function to save data, filetype can be '.txt' or '.xls', but '.txt' type is saved more faster then '.xls' typeArgs:data: a 2d-list array that need be save'''{'.txt': self.write_txt,'.xls': self.write_excel}.get(self.filetype)(data)def main():for i in range(1001, 1199):url_queue.put(str(i))# create and start a spider threadst = SpiderThread(url_queue, html_queue)st.setDaemon(True)st.start()# create and start a datamine threaddt = DatamineThread(html_queue, '.xls')dt.setDaemon(True)dt.start()# wait on the queue until everything has been processedurl_queue.join()html_queue.join()if __name__ == '__main__':main()
复制代码
爬取测试
写在最后
- 测试发现,写入
txt
的速度明显高于写入excel
的速度 - 如果将页面网址中的
pageSize
修改为1000
或更大,则可以一次性获取某机构的所有从业人员信息,而不用逐页爬取,效率可以大大提高。 - 该爬虫已托管至github Python-demos
Python网络爬虫3 - 生产者消费者模型爬取某金融网站数据相关推荐
- 爬虫python爬取页面请求_03 Python网络爬虫第三弹《爬取get请求的页面数据》,urllib...
一.urllib库 urllib是Python自带的一个用于爬虫的库,其主要作用就是可以通过代码模拟浏览器发送请求.其常被用到的子模块在Python3中的为urllib.request和urllib. ...
- Python网络爬虫(九):爬取顶点小说网站全部小说,并存入MongoDB
前言:本篇博客将爬取顶点小说网站全部小说.涉及到的问题有:Scrapy架构.断点续传问题.Mongodb数据库相关操作. 背景: Python版本:Anaconda3 运行平台:Windows IDE ...
- Python网络爬虫,Appuim+夜神模拟器爬取得到APP课程数据
一.背景介绍 随着生产力和经济社会的发展,温饱问题基本解决,人们开始追求更高层次的精神文明,开始愿意为知识和内容付费.从2016年开始,内容付费渐渐成为时尚. 罗辑思维创始人罗振宇全力打造" ...
- Python网络爬虫:利用正则表达式方法爬取‘’豆瓣读书‘’中‘’新书速递‘’条目
1.简述:正则表达式是爬虫的方法之一,这里利用Requests库进行爬取,尽管Urllib也能进行爬取,但过程过于繁琐,在了解Urllib的爬取过程后,没有进行实战演练,但在学习了Requests的爬 ...
- Python网络爬虫之基本项目:爬取网易新闻排行榜
1. 最基本的抓取 抓取大多数情况属于get请求,即直接从对方服务器上获取数据. 首先,Python中自带urllib及urllib2这两个模块,基本上能满足一般的页面抓取.另外,requests也是 ...
- [Python3网络爬虫开发实战] --分析Ajax爬取今日头条街拍美图
[Python3网络爬虫开发实战] --分析Ajax爬取今日头条街拍美图 学习笔记--爬取今日头条街拍美图 准备工作 抓取分析 实战演练 学习笔记–爬取今日头条街拍美图 尝试通过分析Ajax请求来抓取 ...
- Python新手爬虫训练小项目《爬取彼岸图网》(超详细讲解版)
Python新手爬虫训练小项目<爬取彼岸图网>(超详细讲解版) 这是我的第一篇文章,作为一名新手爬虫,这个算是我这几天来的努力成果,虽然代码寥寥几行但花费了大半天,新手上路还是不能只看视频 ...
- Python网络爬虫,pyautogui与pytesseract抓取新浪微博数据,OCR
Python网络爬虫,pyautogui与pytesseract抓取新浪微博数据,OCR方案 用ocr与pyautogui,以及webbrowser实现功能:设计爬虫抓取新浪微博数据,比如,抓取微博用 ...
- 爬虫python爬取页面请求_Python网络爬虫第三弹《爬取get请求的页面数据》
一.urllib库 urllib是Python自带的一个用于爬虫的库,其主要作用就是可以通过代码模拟浏览器发送请求.其常被用到的子模块在Python3中的为urllib.request和urllib. ...
最新文章
- 关于Windows 7的64位系统不兼容某些控件的问题
- 聊聊LettucePoolingConnectionProvider
- 神经网络与机器学习 笔记—改善反向传播的性能试探法
- 云服务器 cvm操作系统选择,云服务器cvm操作系统选择
- qpython3使用手册图_qpython图形
- word 代码块_如何优雅的写好 Pythonic 代码?
- c语言函数实现strstr,C语言实现strstr函数模拟
- 超过马云!中国第二大富豪诞生:年仅40岁,财富已达3200亿
- [.NET] : 使用自定义对象当作报表数据源
- 连接服务器显示句柄无效,win10打印机句柄无效怎么解决?_网站服务器运行维护,win10,打印机,句柄无效...
- Java –显示所有ZoneId及其UTC偏移量
- csv文件转换成xlsx文件方法
- Java--深入理解字符串的String#intern()方法奥妙之处
- 简信CRM:什么是在线CRM?在线CRM有什么好处?
- Appium 连接夜神模拟器并启动
- PUTTY中永久更改字体大小
- Facebook 新款AR眼镜都有哪些技术创新?
- 富斯i6航模遥控器通过usb-ttl串口工具刷改中文系统
- 网络安全渗透测试的常用工具
- 圆形比例分布图怎么做_比例的秘密,教你如何在设计中运用黄金比例