https://www.cnblogs.com/qiyeboy/p/7016540.html

  

本章讲的依旧是实战项目,实战内容是打造分布式爬虫,这对初学者来说,是一个不小的挑战,也是一次有意义的尝试。这次打造的分布式爬虫采用比较简单的主从模式,完全手工打造,不使用成熟框架,基本上涵盖了前六章的主要知识点,其中涉及分布式的知识点是分布式进程和进程间通信的内容,算是对Python爬虫基础篇的总结。

  现在大型的爬虫系统都是采取分布式爬取结构,通过此次实战项目,让大家对分布式爬虫有一个比较清晰地了解,为之后系统的讲解分布式爬虫打下基础,其实它并没有多么困难。实战目标:爬取2000个百度百科网络爬虫词条以及相关词条的标题、摘要和链接等信息,采用分布式结构改写第六章的基础爬虫,使功能更加强大。爬取页面请看图6.1。

7.1简单分布式爬虫结构

  本次分布式爬虫采用主从模式。主从模式是指由一台主机作为控制节点负责所有运行网络爬虫的主机进行管理,爬虫只需要从控制节点那里接收任务,并把新生成任务提交给控制节点就可以了,在这个过程中不必与其他爬虫通信,这种方式实现简单利于管理。而控制节点则需要与所有爬虫进行通信,因此可以看到主从模式是有缺陷的,控制节点会成为整个系统的瓶颈,容易导致整个分布式网络爬虫系统性能下降。

此次使用三台主机进行分布式爬取,一台主机作为控制节点,另外两台主机作为爬虫节点。爬虫结构如图7.1所示:

图7.1 主从爬虫结构

7.2 控制节点ControlNode

  控制节点主要分为URL管理器、数据存储器和控制调度器。控制调度器通过三个进程来协调URL管理器和数据存储器的工作,一个是URL管理进程,负责URL的管理和将URL传递给爬虫节点,一个是数据提取进程,负责读取爬虫节点返回的数据,将返回数据中的URL交给URL管理进程,将标题和摘要等数据交给数据存储进程,最后一个是数据存储进程,负责将数据提取进程中提交的数据进行本地存储。执行流程如图7.2所示:

图7.2 控制节点执行流程

7.2.1 URL管理器

  URL管理器查考第六章的代码,做了一些优化修改。由于我们采用set内存去重的方式,如果直接存储大量的URL链接,尤其是URL链接很长的时候,很容易造成内存溢出,所以我们采用将爬取过的URL进行MD5处理,由于字符串经过MD5处理后的信息摘要长度可以128bit,将生成的MD5摘要存储到set后,可以减少好几倍的内存消耗,Python中的MD5算法生成的是32位的字符串,由于我们爬取的url较少,md5冲突不大,完全可以取中间的16位字符串,即16位MD5加密。同时添加了save_progress和load_progress方法进行序列化的操作,将未爬取URL集合和已爬取的URL集合序列化到本地,保存当前的进度,以便下次恢复状态。URL管理器URLManager.py代码如下:

 1 #coding:utf-8
 2 import cPickle
 3 import hashlib
 4 class UrlManager(object):
 5     def __init__(self):
 6         self.new_urls = self.load_progress('new_urls.txt')#未爬取URL集合
 7         self.old_urls = self.load_progress('old_urls.txt')#已爬取URL集合
 8     def has_new_url(self):
 9         '''
10         判断是否有未爬取的URL
11         :return:
12         '''
13         return self.new_url_size()!=0
14
15     def get_new_url(self):
16         '''
17         获取一个未爬取的URL
18         :return:
19         '''
20         new_url = self.new_urls.pop()
21         m = hashlib.md5()
22         m.update(new_url)
23         self.old_urls.add(m.hexdigest()[8:-8])
24         return new_url
25
26     def add_new_url(self,url):
27         '''
28          将新的URL添加到未爬取的URL集合中
29         :param url:单个URL
30         :return:
31         '''
32         if url is None:
33             return
34         m = hashlib.md5()
35         m.update(url)
36         url_md5 =  m.hexdigest()[8:-8]
37         if url not in self.new_urls and url_md5 not in self.old_urls:
38             self.new_urls.add(url)
39
40     def add_new_urls(self,urls):
41         '''
42         将新的URLS添加到未爬取的URL集合中
43         :param urls:url集合
44         :return:
45         '''
46         if urls is None or len(urls)==0:
47             return
48         for url in urls:
49             self.add_new_url(url)
50
51     def new_url_size(self):
52         '''
53         获取未爬取URL集合的s大小
54         :return:
55         '''
56         return len(self.new_urls)
57
58     def old_url_size(self):
59         '''
60         获取已经爬取URL集合的大小
61         :return:
62         '''
63         return len(self.old_urls)
64
65     def save_progress(self,path,data):
66         '''
67         保存进度
68         :param path:文件路径
69         :param data:数据
70         :return:
71         '''
72         with open(path, 'wb') as f:
73             cPickle.dump(data, f)
74
75     def load_progress(self,path):
76         '''
77         从本地文件加载进度
78         :param path:文件路径
79         :return:返回set集合
80         '''
81         print '[+] 从文件加载进度: %s' % path
82         try:
83             with open(path, 'rb') as f:
84                 tmp = cPickle.load(f)
85                 return tmp
86         except:
87             print '[!] 无进度文件, 创建: %s' % path
88         return set()

7.2.2数据存储器

数据存储器的内容基本上和第六章的一样,不过生成文件按照当前时间进行命名避免重复,同时对文件进行缓存写入。代码如下:

 1 #coding:utf-8
 2 import codecs
 3 import time
 4 class DataOutput(object):
 5     def __init__(self):
 6         self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) )
 7         self.output_head(self.filepath)
 8         self.datas=[]
 9     def store_data(self,data):
10         if data is None:
11             return
12         self.datas.append(data)
13         if len(self.datas)>10:
14             self.output_html(self.filepath)
15
16
17     def output_head(self,path):
18         '''
19         将HTML头写进去
20         :return:
21         '''
22         fout=codecs.open(path,'w',encoding='utf-8')
23         fout.write("<html>")
24         fout.write("<body>")
25         fout.write("<table>")
26         fout.close()
27
28     def output_html(self,path):
29         '''
30         将数据写入HTML文件中
31         :param path: 文件路径
32         :return:
33         '''
34         fout=codecs.open(path,'a',encoding='utf-8')
35         for data in self.datas:
36             fout.write("<tr>")
37             fout.write("<td>%s</td>"%data['url'])
38             fout.write("<td>%s</td>"%data['title'])
39             fout.write("<td>%s</td>"%data['summary'])
40             fout.write("</tr>")
41             self.datas.remove(data)
42         fout.close()
43
44     def ouput_end(self,path):
45         '''
46         输出HTML结束
47         :param path: 文件存储路径
48         :return:
49         '''
50         fout=codecs.open(path,'a',encoding='utf-8')
51         fout.write("</table>")
52         fout.write("</body>")
53         fout.write("</html>")
54         fout.close()

7.2.3控制调度器

  控制调度器主要是产生并启动URL管理进程、数据提取进程和数据存储进程,同时维护4个队列保持进程间的通信,分别为url_queue,result_queue, conn_q,store_q。4个队列说明如下:

  • url_q队列是URL管理进程将URL传递给爬虫节点的通道
  • result_q队列是爬虫节点将数据返回给数据提取进程的通道
  • conn_q队列是数据提取进程将新的URL数据提交给URL管理进程的通道
  • store_q队列是数据提取进程将获取到的数据交给数据存储进程的通道

因为要和工作节点进行通信,所以分布式进程必不可少。参考1.4.4小节分布式进程中服务进程中的代码(linux版),创建一个分布式管理器,定义为start_manager方法。方法代码如下:

 1 def start_Manager(self,url_q,result_q):
 2     '''
 3     创建一个分布式管理器
 4     :param url_q: url队列
 5     :param result_q: 结果队列
 6     :return:
 7     '''
 8     #把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
 9     # 将Queue对象在网络中暴露
10     BaseManager.register('get_task_queue',callable=lambda:url_q)
11     BaseManager.register('get_result_queue',callable=lambda:result_q)
12     #绑定端口8001,设置验证口令‘baike’。这个相当于对象的初始化
13     manager=BaseManager(address=('',8001),authkey='baike')
14     #返回manager对象
15     return manager

  

URL管理进程将从conn_q队列获取到的新URL提交给URL管理器,经过去重之后,取出URL放入url_queue队列中传递给爬虫节点,代码如下:

def url_manager_proc(self,url_q,conn_q,root_url):url_manager = UrlManager()url_manager.add_new_url(root_url)while True:while(url_manager.has_new_url()):#从URL管理器获取新的urlnew_url = url_manager.get_new_url()#将新的URL发给工作节点
            url_q.put(new_url)print 'old_url=',url_manager.old_url_size()#加一个判断条件,当爬去2000个链接后就关闭,并保存进度if(url_manager.old_url_size()>2000):#通知爬行节点工作结束url_q.put('end')print '控制节点发起结束通知!'#关闭管理节点,同时存储set状态url_manager.save_progress('new_urls.txt',url_manager.new_urls)url_manager.save_progress('old_urls.txt',url_manager.old_urls)return#将从result_solve_proc获取到的urls添加到URL管理器之间try:if not conn_q.empty():urls = conn_q.get()url_manager.add_new_urls(urls)except BaseException,e:time.sleep(0.1)#延时休息

  数据提取进程从result_queue队列读取返回的数据,并将数据中的URL添加到conn_q队列交给URL管理进程,将数据中的文章标题和摘要添加到store_q队列交给数据存储进程。代码如下:

def result_solve_proc(self,result_q,conn_q,store_q):while(True):try:if not result_q.empty():content = result_q.get(True)if content['new_urls']=='end':#结果分析进程接受通知然后结束print '结果分析进程接受通知然后结束!'store_q.put('end')returnconn_q.put(content['new_urls'])#url为set类型store_q.put(content['data'])#解析出来的数据为dict类型else:time.sleep(0.1)#延时休息except BaseException,e:time.sleep(0.1)#延时休息

数据存储进程从store_q队列中读取数据,并调用数据存储器进行数据存储。代码如下:

 1 def store_proc(self,store_q):
 2     output = DataOutput()
 3     while True:
 4         if not store_q.empty():
 5             data = store_q.get()
 6             if data=='end':
 7                 print '存储进程接受通知然后结束!'
 8                 output.ouput_end(output.filepath)
 9
10                 return
11             output.store_data(data)
12         else:
13             time.sleep(0.1)

最后将分布式管理器、URL管理进程、 数据提取进程和数据存储进程进行启动,并初始化4个队列。代码如下:

 1 if __name__=='__main__':
 2     #初始化4个队列
 3     url_q = Queue()
 4     result_q = Queue()
 5     store_q = Queue()
 6     conn_q = Queue()
 7     #创建分布式管理器
 8     node = NodeManager()
 9     manager = node.start_Manager(url_q,result_q)
10     #创建URL管理进程、 数据提取进程和数据存储进程
11     url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm',))
12     result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,))
13     store_proc = Process(target=node.store_proc, args=(store_q,))
14     #启动3个进程和分布式管理器
15     url_manager_proc.start()
16     result_solve_proc.start()
17     store_proc.start()
18     manager.get_server().serve_forever()

  

7.3 爬虫节点SpiderNode

爬虫节点相对简单,主要包含HTML下载器、HTML解析器和爬虫调度器。执行流程如下:

  • 爬虫调度器从控制节点中的url_q队列读取URL
  • 爬虫调度器调用HTML下载器、HTML解析器获取网页中新的URL和标题摘要
  • 最后爬虫调度器将新的URL和标题摘要传入result_q队列交给控制节点

7.3.1 HTML下载器

HTML下载器的代码和第六章的一致,只要注意网页编码即可。代码如下:

 1 #coding:utf-8
 2 import requests
 3 class HtmlDownloader(object):
 4
 5     def download(self,url):
 6         if url is None:
 7             return None
 8         user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
 9         headers={'User-Agent':user_agent}
10         r = requests.get(url,headers=headers)
11         if r.status_code==200:
12             r.encoding='utf-8'
13             return r.text
14         return None

7.3.2 HTML解析器

HTML解析器的代码和第六章的一致,详细的网页分析过程可以回顾第六章。代码如下:

 1 #coding:utf-8
 2 import re
 3 import urlparse
 4 from bs4 import BeautifulSoup
 5
 6
 7 class HtmlParser(object):
 8
 9     def parser(self,page_url,html_cont):
10         '''
11         用于解析网页内容抽取URL和数据
12         :param page_url: 下载页面的URL
13         :param html_cont: 下载的网页内容
14         :return:返回URL和数据
15         '''
16         if page_url is None or html_cont is None:
17             return
18         soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8')
19         new_urls = self._get_new_urls(page_url,soup)
20         new_data = self._get_new_data(page_url,soup)
21         return new_urls,new_data
22
23
24     def _get_new_urls(self,page_url,soup):
25         '''
26         抽取新的URL集合
27         :param page_url: 下载页面的URL
28         :param soup:soup
29         :return: 返回新的URL集合
30         '''
31         new_urls = set()
32         #抽取符合要求的a标签
33         links = soup.find_all('a',href=re.compile(r'/view/\d+\.htm'))
34         for link in links:
35             #提取href属性
36             new_url = link['href']
37             #拼接成完整网址
38             new_full_url = urlparse.urljoin(page_url,new_url)
39             new_urls.add(new_full_url)
40         return new_urls
41     def _get_new_data(self,page_url,soup):
42         '''
43         抽取有效数据
44         :param page_url:下载页面的URL
45         :param soup:
46         :return:返回有效数据
47         '''
48         data={}
49         data['url']=page_url
50         title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
51         data['title']=title.get_text()
52         summary = soup.find('div',class_='lemma-summary')
53         #获取tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回
54         data['summary']=summary.get_text()
55         return data

7.3.3 爬虫调度器

  爬虫调度器需要用到分布式进程中工作进程的代码,具体内容可以参考第一章的分布式进程章节。爬虫调度器需要先连接上控制节点,然后依次完成从url_q队列中获取URL,下载并解析网页,将获取的数据交给result_q队列,返回给控制节点等各项任务,代码如下:

 1 class SpiderWork(object):
 2     def __init__(self):
 3         #初始化分布式进程中的工作节点的连接工作
 4         # 实现第一步:使用BaseManager注册获取Queue的方法名称
 5         BaseManager.register('get_task_queue')
 6         BaseManager.register('get_result_queue')
 7         # 实现第二步:连接到服务器:
 8         server_addr = '127.0.0.1'
 9         print('Connect to server %s...' % server_addr)
10         # 端口和验证口令注意保持与服务进程设置的完全一致:
11         self.m = BaseManager(address=(server_addr, 8001), authkey='baike')
12         # 从网络连接:
13         self.m.connect()
14         # 实现第三步:获取Queue的对象:
15         self.task = self.m.get_task_queue()
16         self.result = self.m.get_result_queue()
17         #初始化网页下载器和解析器
18         self.downloader = HtmlDownloader()
19         self.parser = HtmlParser()
20         print 'init finish'
21
22     def crawl(self):
23         while(True):
24             try:
25                 if not self.task.empty():
26                     url = self.task.get()
27
28                     if url =='end':
29                         print '控制节点通知爬虫节点停止工作...'
30                         #接着通知其它节点停止工作
31                         self.result.put({'new_urls':'end','data':'end'})
32                         return
33                     print '爬虫节点正在解析:%s'%url.encode('utf-8')
34                     content = self.downloader.download(url)
35                     new_urls,data = self.parser.parser(url,content)
36                     self.result.put({"new_urls":new_urls,"data":data})
37             except EOFError,e:
38                 print "连接工作节点失败"
39                 return
40             except Exception,e:
41                 print e
42                 print 'Crawl  fali '
43
44 if __name__=="__main__":
45     spider = SpiderWork()
46     spider.crawl()

  在爬虫调度器设置了一个本地IP:127.0.0.1,大家可以将在一台机器上测试代码的正确性。当然也可以使用三台VPS服务器,两台运行爬虫节点程序,将IP改为控制节点主机的公网IP,一台运行控制节点程序,进行分布式爬取,这样更贴近真实的爬取环境。下面图7.3为最终爬取的数据,图7.4为new_urls.txt内容,图7.5为old_urls.txt内容,大家可以进行对比测试,这个简单的分布式爬虫还有很大发挥的空间,希望大家发挥自己的聪明才智进一步完善。

图7.3 最终爬取的数据

图7.4 new_urls.txt

图7.5 old_urls.txt

7.4小结

  本章讲解了一个简单的分布式爬虫结构,主要目的是帮助大家将Python爬虫基础篇的知识进行总结和强化,开拓大家的思维,同时也让大家知道分布式爬虫并不是这么高不可攀。不过当你亲手打造一个分布式爬虫后,就会知道分布式爬虫的难点在于节点的调度,什么样的结构能让各个节点稳定高效的运作才是分布式爬虫要考虑的核心内容。到本章为止,Python爬虫基础篇已经结束,这个时候大家基本上可以编写简单的爬虫,爬取一些静态网站的内容,但是Python爬虫开发不仅如此,大家接着往下学习吧。

转载于:https://www.cnblogs.com/mapu/p/8400114.html

纯手工打造简单分布式爬虫(Python)相关推荐

  1. 【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)

    疯狂撸码中ing 实现组件效果 src/components/sgTopo.vue <template><!-- 强哥的拓扑图2021.09.14 --><divclas ...

  2. 原创:纯手工打造CSS像素画--笨笨熊系列图标

    纯手工打造CSS像素画--笨笨熊系列图标 作者:冰极峰 转载请注明出处 在cssplay网站看到有一组CSS像素画,于是也想摩仿一下,于是在网络上找到一组头像图标,看其结构比较简单,就拿它开刀吧!先看 ...

  3. html格子像素画,HTML_纯手工打造CSS像素画,在cssplay网站看到有一组CSS像素 - phpStudy...

    纯手工打造CSS像素画 在cssplay网站看到有一组CSS像素画,于是也想摩仿一下,于是在网络上找到一组头像图标,看其结构比较简单,就拿它开刀吧!先看看预览图 图一 基本原理: 没有什么技术含量,主 ...

  4. 广工学生“纯手工”打造赛车 将出征F1赛道

    转播到腾讯微博 一辆红银色流线型方程式赛车昨日亮相广东工业大学图书馆广场.这辆赛车为该校二十余名学生"纯手工"打造,本月将登陆上海国际赛车场.南都记者邹卫摄 南都讯 记者刘黎霞 实 ...

  5. IOS学习之道:使用UIButton纯手工打造的黑白快小游戏.

    由于代码量比较多,有兴趣的同学可以去我的资源页进行下载. http://download.csdn.net/detail/tx874828503/8637445 // // RootViewContr ...

  6. 后渗透篇:纯手工打造服务器自解压shift后门【详细演示】

    当你的才华 还撑不起你的野心时 那你就应该静下心来学习 目录 纯手工打造服务器自解压shift后门 0x01 介绍 0x02 实例演示 纯手工打造服务器自解压shift后门 0x01 介绍 很多时候我 ...

  7. 消防管件做的机器人图片_报废消防器材变身“机器人” 由消防官兵纯手工打造(图)...

    原标题:报废消防器材变身"机器人" 由消防官兵纯手工打造(图) 由报废的消防零配件组成的机器人模型. 厦门网讯 (厦门日报记者林路然通讯员阙凤芳曾德猛)远看好似变形金刚,凑近还会说 ...

  8. 纯手工打造漂亮的垂直时间轴,使用最简单的HTML+CSS+JQUERY完成100个版本更新记录的华丽转身!...

    前言 FineUI控件库发展至今已经有 5 个年头,目前论坛注册的QQ会员 5000 多人,捐赠用户 500 多人(捐赠用户转化率达到10%以上,在国内开源领域相信这是一个梦幻数字!也足以证明Fine ...

  9. 这个B站up主太硬核了!纯手工打造AI小电视:硬件自己焊接,驱动代码全手写...

    点击上方"视学算法",选择加"星标"或"置顶" 重磅干货,第一时间送达 晓查 发自 凹非寺  本文转自自:量子位(QbitAI) 一个低调的 ...

最新文章

  1. 普通域账号客户端计算无关机选项
  2. 传统的Linux中IPC通信原理
  3. Kali Linux缺少ifconfig命令
  4. Oracle等待事件说明
  5. 【视频课】超全目标检测课程!超30小时理论与4大案例实践,检测框架使用,长期更新中...
  6. 腾讯Techo开发者大会揭晓云存储发展趋向:高性能、高可用、高性价比
  7. Google发布全球首个72量子比特处理器Bristlecone预览
  8. Starling实现的硬皮翻书效果
  9. Spark实现协同过滤CF算法实践
  10. js批量向html容器内的元素赋值
  11. ORACLE10g中 ora-12638:身份证明检索失败(
  12. Cesium:结合canvas实现自定义气泡点
  13. Textview属性Kotlin.Android
  14. python三维数据欠采样_数据分析:使用Imblearn处理不平衡数据(过采样、欠采样)...
  15. unity Screen.orientation
  16. 码农与程序员,就好比哈士奇与狼
  17. 如何用计算机自动回复微信,10分钟教你用Python实现微信自动回复功能
  18. 一个C#写的爬虫程序
  19. 达梦数据库(DM)——表空间管理命令大全
  20. SQL笔试题:某团数分岗笔试真题详解

热门文章

  1. html中两个div垂直居中对齐,在div中垂直居中的两个元素
  2. go java jsonrpc_使用golang 实现JSON-RPC2.0
  3. vue 生成发布包_Vue 3.0 终于正正正正正式发布了!
  4. 华为S5024p交换机配端口镜像
  5. 《构建之法》第8,9,10章
  6. 推荐一款自己的软件作品[豆约翰博客备份专家],新浪博客,QQ空间,CSDN,cnblogs博客备份,导出CHM,PDF(转载)...
  7. 北京林业大学计算机科学与技术考研科目,北京林业大学计算机科学与技术考研经验-北林信息学院考研辅导班...
  8. Activiti 流程查询出的结果封装为 JSON 时出现的异常
  9. java format 补足空格_11 个简单的 Java 性能调优技巧
  10. 笔试算法题(26):顺时针打印矩阵 求数组中数对差的最大值