python 多线程 异步_python 多线程异步
最近做了个爬取代理的爬虫,使用了python的aysncio及concurrent.futures的ThreadPoolExecutor(线程池)技术,最终完成了多线程下的异步抓取,在此mark下,以作备忘,代码在gitee上,是看到一位同道中人的go语言项目后比较感兴趣,于是用python加以改进并实现了相同的功能基本思路就是配置好要爬取的免费代理地址,然后按照分页规则生成对应的地址,在组合成任务单元,提交给线程池,线程池则把任务分配给单一空闲线程,线程下把任务分为爬去数据,结果解析,有效性检验,存入数据库几个耗时操作,利用异步类将各操作组合起来,完成功能,篇幅限制就只列出主要代码了,可以当伪代码看下,希望对你有帮助异步编程主要就是要把任务细分下来,分的好和分的坏差别是比较大的
废话不多说,上代码:
1.异步任务类import asyncio,requestsfrom db.mysql import db #自己封装的sql包import pymysql.errclass asyncWorker(object):
loop = None #事件循环
threadId = None #线程id
def __init__(self,loop,tid):
self.loop = loop
self.threadId = tid
'''
协程任务,爬取到的结果入库
'''
async def execRes(self,sign,res):
print('Thread#%d task#%d %s start' % (self.threadId,sign,json.dumps(res)))
check = await self.check(res,sign) if check: await self.insertDB(**res,sign=sign) #爬取到的ip通过验证后插入数据库
'''
协程任务,对爬取到的结果进行检查
'''
async def check(self,proxyData,sign):
sign = self.ipVerify(**proxyData,sign=sign) return sign
'''
代理ip验证方法
'''
def ipVerify(self, ip , port , protocol,sign):
url = protocol+'://httpbin.org/get'
headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36'} if protocol in ['socks4','socks5']:
url = 'https://httpbin.org/get'
proxy = {'https':protocol+'://'+ip+':'+port} elif protocol in ['anonymous','distorting','transparent']:
url = 'https://httpbin.org/get'
proxy = {'https':'https://'+ip+':'+port} else:
proxy = { protocol:protocol+'://'+ip+':'+port} try:
res = requests.get(url,headers=headers,proxies=proxy,timeout=10)
res.encoding = 'utf-8'
res.raise_for_status()
print('Thread#%d task#%d verify %s://%s:%s httpcode:%s' % (self.threadId,sign,protocol,ip,port,res.status_code)) return True
except requests.exceptions.RequestException as e:
print('Thread#%d task#%d verify %s' % (self.threadId,sign,e)) # AppLog('verify').error(e)
return False
'''
协程任务,入库操作
'''
async def insertDB(self,ip,port,protocol,sign):
try:
database = db()
database.insert(ip,port,protocol)
print('Thread#%d task#%d insertDB ip:%s port:%s protocal:%s' % (self.threadId,sign,ip,port,protocol)) except pymysql.MySQLError as e:
print('Thread#%d task#%d insertDB %s' % (self.threadId,sign,e))
'''
并行执行方法
'''
def parallelrun(self,*task):
asyncio.set_event_loop(self.loop) #设置对应线程下的事件循环
# loop = asyncio.get_event_loop() #如果不使用多线程,可以直接用这个来获取事件循环,上面的loop就可以删掉了
self.loop.run_until_complete(asyncio.gather(*task)) #事件循环直到所有任务完成
self.loop.close() #关闭事件循环
2.主函数线程池import asyncio from asyncTask import asyncWorker import concurrent.futures #这是python封装的异步模块(python3.x内置),简单易用,人生苦短,我用python
import time '''
调度任务封装,线程下的工作单元
param: url-爬取代理的网址;parse-结果解析handle;loop-事件循环
'''
def work_url(tid,url,parse,loop):
start = time.time()
s = getter(url) #爬取网页的原始内容
res = s.getWeb(parse) #使用对应的解析函数对原始数据进行解析
worker = asyncWorker(loop,tid) #调用异步任务类
task = (worker.execRes(res.index(item),item) for item in res) #构造任务队列
worker.parallelrun(*task) #异步执行
end = time.time()
print('[thread_id:%d]%s:%s page has been completed usage time:[%s]' % (tid,parse,url,end-start)) return '%r has been completed' % url
urlIter = map(lambda item: {'parse':item[0],'url':item[1]},cfg.items('url')) #这里是爬取地址的生成器,cfg是配置类
#多线程,正片开始
with concurrent.futures.ThreadPoolExecutor(15) as exector: #先开15个线程试试,哈哈
future_to_url = {} #这个字典是用来存储对应的url和task的关联关系的,一一映射
channel = 1 #线程编号,就是asyncWorker中的threadId
for item in urlIter: if item['parse'] == 'proxylists': for param in map(lambda page: {'url':item['url'].format(page=page),'parse':'proxylists','loop':asyncio.new_event_loop()}, range(10)):
task = {exector.submit(work_url,tid=channel,**param):param['url']} #调用上面的工作单元并传参,提交给线程池
future_to_url.update(task) #更新关系字典
channel += 1
else:
item['loop'] = asyncio.new_event_loop() #这里使用asyncio来新建事件循环
task = {exector.submit(work_url,tid=channel,**item):item['url']}
future_to_url.update(task)
channel += 1
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future] try:
data = future.result() #获取上面的工作单元的返回值
except Exception as e:
print('%r generated an exception: %s' % (url,e)) else:
print('[%r] %s' % (url,data))
exit()
备注:
1.python的多线程无法利用多核,只是靠cpu的时间分配造成的并发的错觉,当然,可以换用jython解释器来利用多核,默认的cython是存在GIL全局锁的
2.协程是一种用户态的线程,由用户来发出中断请求以及处理响应,省去了线程的切换开销,并且所有的处理是在同一个线程下,所以也不会造成读写冲突,省去了加锁操作
3.asyncio的async关键字等同于@asyncio.coroutine,await相当于yield from,两者区别就是后者的兼容性更好一些
4.吐槽一下,还是swoole的异步更方便
作者:_luka_
链接:https://www.jianshu.com/p/c5166df8e3b7
python 多线程 异步_python 多线程异步相关推荐
- python多线程库_python多线程库
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! python 多线程 多线程类似于同时执行多个不同程序,多线程运行有如下优点:使 ...
- python线程唤醒_python 多线程
python 多线程 真正的多线程吗? 对于多核处理器,在同一时间确实可以多个线程独立运行,但在Python中确不是这样的了.原因在于,python虚拟机中引入了GIL这一概念.GIL(Global ...
- python多线程实现异步_python多线程实现异步
import time from threading import Thread def long_io(cb): def func(callback): print("开始耗时操作io&q ...
- python socket发包_python 多线程tcp udp发包 Dos工具。
[使用 异步多线程TCP Socket 实现进程间通信(VC 6.0 , BCB6.0调试通过) 进程间通信有很多种方式,比如说 Pipe,共享内存,DDE,Socket等,关于 ...
- python多线程挂了_python多线程输入的问题 python高效编程技巧13(如何在线程之间实现...
python3 创建线程时不用args传参,执行线程时为什如果创建线程时在target里就传入了参数,为什么在启动线程时,线程不是在Python多线程下,每个线程的执行方式: 有什么了不起,大不了继续 ...
- python多线程输出_Python多线程
多线程基础概念 并行与并发 并行:同时处理多个任务,必须在多核环境下 一段时间内同时处理多个任务,单核也可以并发 并发手段 线程:内核空间的调度 进程:内核空间的调度 协程:用户空间的调度 线程可以允 ...
- python 多线程 模块_Python多线程threading和multiprocessing模块实例解析
本文研究的主要是Python多线程threading和multiprocessing模块的相关内容,具体介绍如下. 线程是一个进程的实体,是由表示程序运行状态的寄存器(如程序计数器.栈指针)以及堆栈组 ...
- python多线程执行_python多线程实现同时执行两个while循环
如果想同时执行两个while True循环,可以使用多线程threading来实现. 完整代码 #coding=gbk from time import sleep, ctime import thr ...
- python多线程模块_python 多线程模块参考
threading.active_count() 返回当前处于 active 状态的线程的数目 threading.current_thread() 返回调用者当前的 Thread 对象 thread ...
最新文章
- 事件 ID 6008问题
- 2017 ACM/ICPC Asia Regional Xian Online 记录
- 中断处理函数中自旋锁的应用
- 【教学课件】IT教学课件和课程考试资料汇总
- php内存映射,如何用ZwMapViewOfSection将Driver分配的内存映射到App空间?
- matlab用socket线程发送数据,使用Python Twisted和Autobahn从Matlab通过WebSocket发送JSON数据...
- 话筒好坏测试软件,如何简单地判断麦克风的质量好坏?
- php mysql登陆页面完整代码_求助:PHP实现登陆注册的代码是什么啊(主要是数据库那块)?...
- Web 服务器错误代码
- Pycharm中SQL语句提示SQL Dialect is Not Configured
- JavaScript键盘鼠标监听功能
- 如何计算某一天是星期几?—— 蔡勒(Zeller)公式
- 第三方SSD问题引起电脑频繁重启问题IONVMeController.cpp:5499
- 从零开始自制实现WebServer(一)---- 万丈高楼平地起 步子得一步一步慢慢走
- Ant下载安装及使用详解
- c语言自动输入0到1000,c语言实现输入一组数自动从大到小排列
- 【spark运行报错】
- ubuntu下完全卸载 opencv库 详细教程
- 2013年国内最具技术影响力公司TOP10
- 宋朝记载的超新星爆发和光速不变
热门文章
- HWDB数据集gnt格式转为png格式
- 在LaTex中插入代码块
- JavaScript很牛
- [转载] Python的exec
- [转载] Python time sleep()方法如何使用?
- [转载] python 中NumPy和Pandas工具包中的函数使用笔记(方便自己查找)
- [转载] 如何使用 Python 生成酷炫的二维码?
- hdfs居然无法正常停止
- JS === 实现通过点击td 跳转相应的图片
- HTML5网页录音和上传到服务器,支持PC、Android,支持IOS微信