从毕业到现在一直都是做java的,最近因为转到数据组,所以开始着手学python。
近期一个项目需求是把百度百科词条页面里面的“明星关系”抽取出来并存储,当然这里不用实时的从百度百科上去爬取,百度百科的词条信息已经全部在我们mongodb库中,总计1700万的词条,当然这里有很大的重复。
当然这里,因为数据量太大,所以必须要开启多线程处理,最开始用的是python的concurrent包里面的futures模块,用该模块下的ThreadPoolExecutor线程池来处理,代码如下:
import zlib
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
handledUrlSet = set()
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
def handleDocument(doc):try:subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record)writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")# 创建一个大小为15的线程池
pool = ThreadPoolExecutor(max_workers=15)# 先处理比较新的browser数据
docBrowserList = browserCol.find()
index = 0
for doc in docBrowserList:print("browser:" + str(index))index += 1pool.submit(handleDocument, doc)index = 0# 再处理比较老的crawl数据
docCrawlList = crawlCol.find()
for doc in docCrawlList:print("crawl:" + str(index))index += 1pool.submit(handleDocument, doc)思想就是创建一个大小为15的线程池,然后主线程每次从mongodb里面取出一个document后,提交给线程池处理。线程池内部的有一个队列,当提交的任务超过了线程池最大可以同时处理的任务时,新提交的任务就会放入内部队列中,但是因为我们这里有1700w的数据,所以如果用线程池的话要么队列会被撑爆而抛异常,要么内存会被撑爆。当然在这个基础上,可以通过捕获队列满时抛出的异常来做响应处理,比如等待一段时间,理论上也是可以用线程池处理的。但是我比较懒,一般来说,我都会选择比较简便的办法。
所以我改用threading模块,代码如下
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSouphost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"
dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]
subviewUrl = ""
lock = threading.Lock()
writeLock = threading.Lock()
exceptLock = threading.Lock()
handledUrlSet = set()
docQue = Queue.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")def handleDocument(doc):try:   subviewUrl = doc['subview_url']print subviewUrllock.acquire()if not(subviewUrl in handledUrlSet):handledUrlSet.add(subviewUrl)lock.release()text = zlib.decompress(doc['text'])htmlDoc = BeautifulSoup(text, 'html5lib')relations =  htmlDoc.select(".star-info-block.relations")if relations:relation = relations[0]reList = relations.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = realListprint(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()else:lock.release()except BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()class handleThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQuewhile True:doc = docQue.get(block=True)if doc:handleDocument(doc)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记for i in range(15):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(15):thread = handleThread(j)thread.start()于是乎,跑起来,但是发现速度慢的跟狗,一秒处理几十个,我擦,这样算的话我1700w要搞到猴年马月去?那为什么处理这么慢呢?对于我这个刚入门python的人来说过段找了下其他长期用python的同事,得到的回复是这样的:
python本身虽然支持多线程的功能,但是这多个线程会受一个GIL限制,就类似于一个全局的锁,所以意思就是多个线程在这里的时候就要排队了具体关于GIL的介绍,大家可以参考这篇文章:
http://cenalulu.github.io/python/gil-in-python/
知道了python的多线程是这样的玩意后,我惊呆了,这是明显的缺陷啊,好在python提供了多进程来弥补这个缺陷,所以我将代码改成了多进程,代码如下:
#-*-coding:utf-8 -*-
import zlib
import Queue
import simplejson
import pymongo
import codecs
import time
from multiprocessing import queues
import multiprocessing
import threading
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
from multiprocessing import Process, Managerhost = "10.10.0.0"
port = 20000
db = "baike_db"
username = "username"
passwd = "passwd"
crawlCol = "crawlCol"
browserCol = "brawlCol"dbClient = pymongo.MongoClient(host, port)[db]
dbClient.authenticate(username, passwd)
crawlDb = dbClient[crawlCol]
browserCol = dbClient[browserCol]m = Manager()
lock = multiprocessing.Lock()
writeLock = multiprocessing.Lock()
exceptLock = multiprocessing.Lock()
handledUrlDict = m.dict()
docQue = queues.Queue(maxsize=1000)
exceptionFile = codecs.open("exceptionFile.o", "a", "utf-8")
relationFile = codecs.open("relationFile.o", "a", "utf-8")
#handledUrlFile = codecs.open("handledUrlFile.o", "a", "utf-8")
pCount = 15def handleDocument(doc, threadId, lock, exceptLock, writeLock, handledUrlDict):subviewUrl = ""#start = int(time.time() * 1000)try: subviewUrl = doc['subview_url']print "thread" + str(threadId) + subviewUrllock.acquire()if not(subviewUrl in handledUrlDict):handledUrlDict[subviewUrl] = 1lock.release()#       s2 = int(time.time() * 1000)#      print("threadId %d s2 use: %d" % (threadId, s2 - start))text = zlib.decompress(doc['text']).decode('utf-8')#     s4 = int(time.time() * 1000)#    print("threadId %d s4 use: %d" % (threadId, s4 - s2))htmlDoc = BeautifulSoup(text, 'lxml')#   s5 = int(time.time() * 1000)#  print("threadId %d s5 use: %d" % (threadId, s5 - s4))relations =  htmlDoc.select(".star-info-block.relations")# s3 = int(time.time() * 1000)# print("threadId %d s3 use: %d" % (threadId, s3 - s5))if relations:relation = relations[0]reList = relation.select("li")if reList:realList = []for real in reList:aList = real.select("a")if aList:people = aList[0]data = simplejson.loads("{}")data['href'] = people['href']data['src'] = people.img['src']  em = people.em.extract()data['name'] = em.text.strip()data['relation'] = people.text.strip()realList.append(data)if realList:record = simplejson.loads("{}")record['subviewUrl'] = subviewUrlrecord['realList']= realList print(record) writeLock.acquire()relationFile.write(str(record) + "\n")writeLock.release()# s4 = int(time.time() * 1000)# print("threadId %d s4 use: %d" % (threadId, s4 - s3))else:lock.release()passexcept BaseException as e:exceptLock.acquire()exceptionFile.write(subviewUrl + "\t" + str(e) + "\n")    exceptLock.release()#print("threadId %d total use: %d" % (threadId, int(time.time() * 1000) - start))class handleProcess(multiprocessing.Process):def __init__(self, threadId, docQue, lock, exceptLock, writeLock, handledUrlDict):multiprocessing.Process.__init__(self)self.threadId = threadIdself.docQue = docQueself.lock = lockself.exceptLock = exceptLockself.writeLock = writeLockself.handledUrlDict = handledUrlDictdef run(self):while True:doc = docQue.get(block=True)if doc:handleDocument(doc, self.threadId, lock, exceptLock, writeLock, handledUrlDict)else:print("thread%d run over" % threadId) breakclass getDocumentThread(threading.Thread):def __init__(self, threadId):threading.Thread.__init__(self)self.threadId = threadIddef run(self):global docQueglobal browserColglobal crawlColdocBrowserList = browserCol.find()index = 0for doc in docBrowserList:print("browser:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)index = 0docCrawlList = crawlCol.find()for doc in docCrawlList:print("crawl:" + str(index) + "to queue")index += 1docQue.put(doc, block=True)#设置结束标记global pCountfor i in range(pCount):docQue.put(None, block=True)getThread = getDocumentThread(1)
getThread.start()for j in range(pCount):process = handleProcess(j, docQue, lock, exceptLock, writeLock, handledUrlDict)process.start()#process.join()多进程有个什么蛋疼的问题呢。因为多进程的内存是独立的,他们之间是不共享内存的,那怎么通信?多个进程之间怎么共享变量?python提供的很多种解决方案,都在multiprocessing模块里面,比如队列,字典等,如果这几个模块里面要访问线程不安全的对象怎么办?它提供的全局锁可以解决这个问题。
代码运行起来后,发现速度特别慢,评估了下,要处理完这1700w的数据需要45个小时,我去,这个时间不能忍受,于是找原因,看时间主要消耗在哪里,所以代码中我加了很多日志,记录每个步骤消耗的时间,最后发现耗时主要在如下代码:
htmlDoc = BeautifulSoup(text, 'html5lib')(这里改之前的代码)
于是我查了下BeanutifulSoup的文档,它有好几个解析器,分别为html.parse, lxml,html5lib等,你可以直接参考它的官方文档:
https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html
文档中比较了这几个解释器,可以很清楚的看到文档说html5lib解析器很慢,所以我改用了lxml,所以瞬间提升了4.5倍,这个时间就可以接收了。最后抛出一个这里我还没有弄明白的一个问题:子进程和主线程的退出时机
import codecs
import time
import threading
from multiprocessing import Process, Manager
from multiprocessing import queues
import multiprocessing
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
f = codecs.open("exceptionFile.o", "a", "utf-8")def func1(d,q, f,s):print(d)f.write("11\n")d['k3']= 'k3's.add('33')print("11:" + str('11' in s))print("k1:" + str("k1" in d))
def func2(d,q, f,s):print(d)f.write("11\n")print("33:" + str('33' in s))print("k3:" + str('k3' in d))q = queues.Queue(1000)
for j in range(1000):q.put(j)class testProc(Process):def __init__(self, d):multiprocessing.Process.__init__(self)self.d = ddef run(self):print("testk3:" + str('k3' in d))class testThread(threading.Thread):def run(self):#time.sleep(5)pass# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
s = set()
s.add('11')
d['k1'] = 'k1'
d['k2'] = 3
t = testThread()
t.start()
p1 = Process(target=func1, args=(d,q,f,s))
p1.start()p2 = Process(target=func2, args=(d,q,f,s))
p2.start()如上代码,会报如下错误:
<DictProxy object, typeid 'dict' at 0x7f7110976dd0; '__str__()' failed>
Process Process-2:self._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connect
Traceback (most recent call last):conn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrapFile "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientself.run()File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in runself._target(*self._args, **self._kwargs)File "test.py", line 15, in func1c = SocketClient(address)d['k3']= 'k3'File "<string>", line 2, in __setitem__File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClientFile "/usr/local/lib/python2.7/multiprocessing/managers.py", line 755, in _callmethods.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methself._connect()File "/usr/local/lib/python2.7/multiprocessing/managers.py", line 742, in _connectconn = self._Client(self._token.address, authkey=self._authkey)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 169, in Clientreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directoryc = SocketClient(address)File "/usr/local/lib/python2.7/multiprocessing/connection.py", line 308, in SocketClients.connect(address)File "/usr/local/lib/python2.7/socket.py", line 228, in methreturn getattr(self._sock,name)(*args)
error: [Errno 2] No such file or directory查了下错误原因:说是因为主线程退出了,导致子线程执行失败,实践证明解决办法有两个
第一个:在代码最后执行:
p1.join()
p2.join()
即明确告诉主进程,你必须在我们两个子进程执行完之后才能继续执行,这样主线程就不会提前退出
第二个:让testThread开启后休眠,这样也可以正常执行,但是这是为什么呢?如果testThread休眠时间到了,但是p1和p2还没有执行完,是不是也会直接抛异常呢?
通过这个例子可以发现,我处理百度百科的数据代码中,主线程开启了一个getDocumentThread线程不停的往队列中丢数据,另外开启了15个进程不段的从队列中取数据并处理,现在初期可以正常执行时因为getDocumentThread线程还在执行,如果到了最后的1000条记录,该线程执行完后,先退出,此时15个进程因为还有1000个数据要处理,此时会不会因为getDocumentThread线程的退出而抛异常呢?
希望看了这篇的文档的朋友,如果知道,可以回复下我。

一次Python性能调优经历相关推荐

  1. 【Python】这10个Python性能调优的小技巧,你知道几个?

    这篇文章关于Python性能调优的10个小技巧,每天花5-10分钟阅读我的文章,对你技术提升一定会有帮助. 1 多多使用列表生成式 替换下面代码: cube_numbers = []for n in ...

  2. python 断点重传_性能调优-python SDK 调优

    python SDK python 和 java 或者和 GO ,在性能上来说都不是最好的,而且 python 无法支持多核的并发,只能跑在单核上的多线程.但是 oss 也提供了相应的方法提高多线程的 ...

  3. python 性能优化监控工具_推荐一款非常实用的JVM性能调优监控工具(亲测好用)...

    前言 现实企业级Java开发中,有时候我们会碰到下面这些问题: 1.OutOfMemoryError,内存不足 2.内存泄露 3.线程死锁 4.锁争用(Lock Contention) 5.Java进 ...

  4. MySQL mysql性能调优

    MySQL性能调优,SQL优化.索引优化 慢查询日志 当查询超过一定的时间没有返回结果的时候,才会记录到慢查询日志中.默认不开启. 采样的时候手工开启.可以帮助我们找出执行慢的 SQL 语句 查看慢 ...

  5. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  6. JVM解读-性能调优实例

    2019独角兽企业重金招聘Python工程师标准>>> JVM性能调优 1 堆设置调优 年轻代大小选择 响应时间优先的应用:尽可能设大,直到接近系统的最低响应时间限制(根据实际情况选 ...

  7. Java性能调优、LinkedIn容器部署、阿里移动性能调优——首届APMCon精彩演讲先睹为快...

    APMCon2016,在盛夏的8月等你. \\ 作为第一届APM垂直领域的技术大会,我们能拿出什么呈现给参会者? \\ 答案是,除了会场可以纳凉避暑之外,还有来自国内外顶级技术大拿带来的Java性能管 ...

  8. “性能调优”坑惨了几十万程序员

    很多程序员觉得性能调优这块的JVM.Mysql不是什么大事,自己平时写代码写得好好的,不是很了解JVM好像也没什么的,认为得千万级甚至亿万级的大流量.大项目才用得上,其他一般场景根本用不到,直到遇见这 ...

  9. 如何合理的规划一次jvm性能调优

    这是jvm优化系列第三篇: jvm优化--垃圾回收 jvm优化--监控工具 JVM性能调优涉及到方方面面的取舍,往往是牵一发而动全身,需要全盘考虑各方面的影响.但也有一些基础的理论和原则,理解这些理论 ...

最新文章

  1. 牛逼!微信红包封面可以更换了!!!
  2. Redis进阶-分布式存储 Sequential partitioning Hash partitioning
  3. MAC chrome浏览器 adobe flash player不是最新版本提示 的解决方案
  4. C++入门经典-例8.1-类的继承
  5. 如何使用Docker安装Redis\Zookeeper\Mysql
  6. 用Elman做时序预测
  7. 连接池和 Timeout expired异常【转】
  8. 系统学习NLP(十六)--DSSM
  9. python语法学习第十天--类与对象
  10. 内核调试神器SystemTap — 探测点与语法(二)
  11. 火电厂( 4×300MW )电气主系统方案与设备配置初步设计
  12. 三菱伺服驱动器MR-J2S 70A伺服驱动器电源驱动板图纸
  13. 【FPGA】DS18B20温度显示
  14. HTML:使用JavaScript(js)脚本在网页上显示实时时间
  15. 最新中国数据中心排行榜
  16. 给仍在「 选品 」的跨境卖家提个醒!
  17. Python小游戏:猜大小
  18. 【Transformers】第 3 章:Transformers剖析
  19. html5支持ajax和jQuery吗,使用HTML5文件上传与AJAX和jQuery(Using HTML5 file uploads with AJAX and jQuery)...
  20. 无约束多维极值求解思路

热门文章

  1. php 自己封装一个调用第三方接口的函数
  2. SpringBoot从零单排 ------ 拦截器的使用
  3. Spring boot MultipartResolver
  4. SPOJ QTREE
  5. 2017-2018-2课表
  6. CSS伪对象选择符整理
  7. 【总结】Dancing Links
  8. 无缝的缓存读取:双存储缓存策略
  9. hbase 协处理器 部署_hbase协处理器概念及知识点总结
  10. graphics | 基础绘图系统(四)——柱状图、直方图、扇形图、箱形图和函数图象...