python 协程池
python 协程池
一、问题描述
现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通。
执行起来效率太慢,需要使用协程。
#!/usr/bin/env python # -*- coding: utf-8 -*-import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31, # 红色'green': 32, # 绿色'yellow': 33, # 黄色'blue': 34, # 蓝色'purple_red': 35, # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37, # 白色 }choice = colour_dict.get(colour) # 选择颜色 info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time() # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1) os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines() # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop() # 删除最后一个元素last_row = res.pop().decode('utf-8').strip() # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split() # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%": # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""for num in range(1, 256):ip = '192.168.10.{}'.format(num)self.check_ping(ip)if __name__ == '__main__':startime = time.time() # 开始时间 NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1: # 判断不足1秒时take_time = 1 # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
改造成,协程执行。
#!/usr/bin/env python # -*- coding: utf-8 -*-import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31, # 红色'green': 32, # 绿色'yellow': 33, # 黄色'blue': 34, # 蓝色'purple_red': 35, # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37, # 白色 }choice = colour_dict.get(colour) # 选择颜色 info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time() # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1) os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines() # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop() # 删除最后一个元素last_row = res.pop().decode('utf-8').strip() # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split() # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%": # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""process_list = []for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中 process_list.append(gevent.spawn(self.check_ping, ip))gevent.joinall(process_list) # 等待所有协程结束if __name__ == '__main__':startime = time.time() # 开始时间 NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1: # 判断不足1秒时take_time = 1 # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
执行输出:
... 错误, 命令: ping 192.168.10.250 -c 2,本地执行超时! 错误, 执行命令: ping 192.168.10.250 -c 2 失败 错误, 命令: ping 192.168.10.255 -c 2,本地执行超时! 错误, 执行命令: ping 192.168.10.255 -c 2 失败 本次花费时间 00:00:07
注意:切勿在windows系统中运行,否则会报错
AttributeError: module 'os' has no attribute 'setsid'
二、使用协程池
上面直接将所有任务加到列表中,然后一次性,全部异步执行。那么同一时刻,最多有多少任务执行呢?
不知道,可能有256个吧?
注意:如果这个一个很耗CPU的程序,可能会导致服务器,直接卡死。
那么,我们应该要限制它的并发数。这个时候,需要使用协程池,固定并发数。
比如:固定为100个
#!/usr/bin/env python # -*- coding: utf-8 -*-import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31, # 红色'green': 32, # 绿色'yellow': 33, # 黄色'blue': 34, # 蓝色'purple_red': 35, # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37, # 白色 }choice = colour_dict.get(colour) # 选择颜色 info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time() # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1) os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines() # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop() # 删除最后一个元素last_row = res.pop().decode('utf-8').strip() # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split() # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%": # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""process_list = []pool= gevent.pool.Pool(100) # 协程池固定为100个for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中 process_list.append(pool.spawn(self.check_ping, ip))gevent.joinall(process_list) # 等待所有协程结束if __name__ == '__main__':startime = time.time() # 开始时间 NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1: # 判断不足1秒时take_time = 1 # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
再次执行,效果如下:
... 错误, 执行命令: ping 192.168.10.254 -c 2 失败 错误, 命令: ping 192.168.10.255 -c 2,本地执行超时! 错误, 执行命令: ping 192.168.10.255 -c 2 失败 本次花费时间 00:00:15
可以,发现花费的时间,明显要比上面慢了!
其实,还有一种写法,使用pool.map,语法如下:
pool.map(func,iterator)
比如:
pool.map(self.get_kernel, NODE_LIST)
注意:func是一个方法,iterator是一个迭代器。比如:list就是一个迭代器
使用map时,func只能接收一个参数。这个参数就是,遍历迭代器的每一个值。
使用map,完整代码如下:
#!/usr/bin/env python # -*- coding: utf-8 -*-import os import time import signal import subprocess import gevent import gevent.pool from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31, # 红色'green': 32, # 绿色'yellow': 33, # 黄色'blue': 34, # 蓝色'purple_red': 35, # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37, # 白色 }choice = colour_dict.get(colour) # 选择颜色 info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time() # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1) os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines() # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop() # 删除最后一个元素last_row = res.pop().decode('utf-8').strip() # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split() # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%": # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""pool= gevent.pool.Pool(100) # 协程池固定为100个ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)]# 使用pool.map,语法:pool.map(func,iterator) pool.map(self.check_ping, ip_list)if __name__ == '__main__':startime = time.time() # 开始时间 NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1: # 判断不足1秒时take_time = 1 # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")
View Code
注意:方法只有一个参数的情况下,才可以使用pool.map。这样代码,看起来,比较精简!
如果有多个参数,还是得使用上面的方法。
比如:
process_list = [] pool= gevent.pool.Pool(100) # 协程池固定为100个 for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中 process_list.append(pool.spawn(self.check_ping, ip))gevent.joinall(process_list) # 等待所有协程结束
View Code
python 协程池相关推荐
- python 协程池和pool.map用法
一.问题描述 现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通. 执行起来效率太慢,需要使用协程. #!/usr/bin/env python # -*- coding: utf-8 ...
- python协程池操作mysql_在python中使用aiomysql异步操作mysql
之前一直在使用mongo与redis,最近在项目中开始使用mysql数据库,由于现在的项目是全程异步的操作,所以在在网上查了下关于在python中异步的操作mysql,找来找去最后发现aiomysql ...
- python 协程池gevent.pool_进程池\线程池,协程,gevent
目录 1. 进程池与线程池 2. 协程 3. gevent 4. 单线程下实现并发的套接字通信 首先写一个基于多线程的套接字 服务端: from socket import * from thread ...
- python协程池_python3下multiprocessing、threading和gevent性能对比—-暨进程池、线程池和协程池性能对比 | 学步园...
目前计算机程序一般会遇到两类I/O:硬盘I/O和网络I/O.我就针对网络I/O的场景分析下python3下进程.线程.协程效率的对比.进程采用multiprocessing.Pool进程池,线程是自己 ...
- python协程池爬虫_Python之协程爬虫 小说网协程爬虫案例
在Gevent协程的使用中我们已经学会简单的使用协程,这篇文章我们通过协程爬虫来测试一下具体的效果.Gevent遇到IO阻塞时会自动切换任务: from gevent import monkey mo ...
- python协程池操作mysql_python_协程方式操作数据库
#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import gevent import pymysql from gevent ...
- Python进程池,线程池,协程池
线程池 import threading import time def myThread():for i in range(10):time.sleep()print('d') sep=thread ...
- Python协程(真才实学,想学的进来)
真正有知识的人的成长过程,就像麦穗的成长过程:麦穗空的时候,麦子长得很快,麦穗骄傲地高高昂起,但是,麦穗成熟饱满时,它们开始谦虚,垂下麦芒. --蒙田<蒙田随笔全集> 在这里还是要推荐下我 ...
- 5分钟完全掌握Python协程
1. 协程相关的概念 1.1 进程和线程 进程(Process)是应用程序启动的实例,拥有代码.数据和文件和独立的内存空间,是操作系统最小资源管理单元.每个进程下面有一个或者多个线程(Thread), ...
最新文章
- java随机产生字母排序_Java生成含字母和数字的6位随机字符串
- prim算法_最小生成树的本质是什么?Prim算法道破天机
- IcmpBackDoor
- 大数据为智慧城市建设添砖加瓦
- (二)原生JS实现 - 事件类方法
- android 数组排重方法,js数组去重方法集合 - osc_779ncf3o的个人空间 - OSCHINA - 中文开源技术交流社区...
- JavaScript-Date类的getMonth方法释疑
- 医院各领域榜单。22个科室、100种常见疾病
- 总结:KPCB中国合伙人周炜
- df pd 属性_pd.DataFrame()函数解析
- 重装系统是否影响计算机,电脑重装系统会对电脑造成伤害吗? 答案说出来你可能都不信...
- android 重力模拟,android的模拟器怎样仿真重力感应器
- 20190118 阿耐---《艰难的制造》
- 高新科技企业税收优惠有多少
- 4个图片无损压缩工具,快速批量压缩图片,图片高清不糊
- 实用的shell脚本合集
- 计算机基础课件第三章ppt,计算机应用基础(windows 7+office 2010)课件 第三章 Windows7操作系统.ppt.pdf-汇文网...
- 医院计算机网络安全应急预案,医院(妇幼保健院)计算机网络信息管理系统故障应急预案(技术保障实施细则)...
- 笔记本内存和台式机内存的区别
- mysql语句添加、删除索引(转)