python 协程池

一、问题描述

现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通。

执行起来效率太慢,需要使用协程。

#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31,  # 红色'green': 32,  # 绿色'yellow': 33,  # 黄色'blue': 34,  # 蓝色'purple_red': 35,  # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37,  # 白色
    }choice = colour_dict.get(colour)  # 选择颜色
info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time()  # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines()  # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop()  # 删除最后一个元素last_row = res.pop().decode('utf-8').strip()  # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split()  # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%":  # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""for num in range(1, 256):ip = '192.168.10.{}'.format(num)self.check_ping(ip)if __name__ == '__main__':startime = time.time()  # 开始时间
NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1:  # 判断不足1秒时take_time = 1  # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

改造成,协程执行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31,  # 红色'green': 32,  # 绿色'yellow': 33,  # 黄色'blue': 34,  # 蓝色'purple_red': 35,  # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37,  # 白色
    }choice = colour_dict.get(colour)  # 选择颜色
info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time()  # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines()  # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop()  # 删除最后一个元素last_row = res.pop().decode('utf-8').strip()  # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split()  # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%":  # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""process_list = []for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中
            process_list.append(gevent.spawn(self.check_ping, ip))gevent.joinall(process_list)  # 等待所有协程结束if __name__ == '__main__':startime = time.time()  # 开始时间
NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1:  # 判断不足1秒时take_time = 1  # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

执行输出:

...
错误, 命令: ping 192.168.10.250 -c 2,本地执行超时!
错误, 执行命令: ping 192.168.10.250 -c 2 失败
错误, 命令: ping 192.168.10.255 -c 2,本地执行超时!
错误, 执行命令: ping 192.168.10.255 -c 2 失败
本次花费时间 00:00:07

注意:切勿在windows系统中运行,否则会报错

AttributeError: module 'os' has no attribute 'setsid'

二、使用协程池

上面直接将所有任务加到列表中,然后一次性,全部异步执行。那么同一时刻,最多有多少任务执行呢?

不知道,可能有256个吧?

注意:如果这个一个很耗CPU的程序,可能会导致服务器,直接卡死。

那么,我们应该要限制它的并发数。这个时候,需要使用协程池,固定并发数。

比如:固定为100个

#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31,  # 红色'green': 32,  # 绿色'yellow': 33,  # 黄色'blue': 34,  # 蓝色'purple_red': 35,  # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37,  # 白色
    }choice = colour_dict.get(colour)  # 选择颜色
info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time()  # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines()  # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop()  # 删除最后一个元素last_row = res.pop().decode('utf-8').strip()  # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split()  # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%":  # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""process_list = []pool= gevent.pool.Pool(100)  # 协程池固定为100个for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中
            process_list.append(pool.spawn(self.check_ping, ip))gevent.joinall(process_list)  # 等待所有协程结束if __name__ == '__main__':startime = time.time()  # 开始时间
NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1:  # 判断不足1秒时take_time = 1  # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

再次执行,效果如下:

...
错误, 执行命令: ping 192.168.10.254 -c 2 失败
错误, 命令: ping 192.168.10.255 -c 2,本地执行超时!
错误, 执行命令: ping 192.168.10.255 -c 2 失败
本次花费时间 00:00:15

可以,发现花费的时间,明显要比上面慢了!

其实,还有一种写法,使用pool.map,语法如下:

pool.map(func,iterator)

比如:

pool.map(self.get_kernel, NODE_LIST)

注意:func是一个方法,iterator是一个迭代器。比如:list就是一个迭代器

使用map时,func只能接收一个参数。这个参数就是,遍历迭代器的每一个值。

使用map,完整代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()def custom_print(content,colour='white'):"""写入日志文件:param content: 内容:param colour: 颜色:return: None"""# 颜色代码colour_dict = {'red': 31,  # 红色'green': 32,  # 绿色'yellow': 33,  # 黄色'blue': 34,  # 蓝色'purple_red': 35,  # 紫红色'bluish_blue': 36, # 浅蓝色'white': 37,  # 白色
    }choice = colour_dict.get(colour)  # 选择颜色
info = "\033[1;{};1m{}\033[0m".format(choice, content)print(info)def execute_linux2(cmd, timeout=10, skip=False):"""执行linux命令,返回list:param cmd: linux命令:param timeout: 超时时间,生产环境, 特别卡, 因此要3秒:param skip: 是否跳过超时限制:return: list"""p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)t_beginning = time.time()  # 开始时间while True:if p.poll() is not None:breakseconds_passed = time.time() - t_beginningif not skip:if seconds_passed > timeout:# p.terminate()# p.kill()# raise TimeoutError(cmd, timeout)custom_print('错误, 命令: {},本地执行超时!'.format(cmd),"red")# 当shell=True时,只有os.killpg才能kill子进程try:# time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)except Exception as e:passreturn Falseresult = p.stdout.readlines()  # 结果输出列表return resultclass NetworkTest(object):def __init__(self):self.flag_list = []def check_ping(self,ip):"""检查ping:param ip: ip地址:return: none"""cmd = "ping %s -c 2" % ip# print(cmd)# 本机执行命令res = execute_linux2(cmd,2)# print(res)if not res:custom_print("错误, 执行命令: {} 失败".format(cmd), "red")self.flag_list.append(False)return Falseres.pop()  # 删除最后一个元素last_row = res.pop().decode('utf-8').strip()  # 再次获取最后一行结果if not last_row:custom_print("错误,执行命令: {} 异常","red")self.flag_list.append(False)return Falseres = last_row.split()  # 切割结果# print(res,type(res),len(res))if len(res) <10:custom_print("错误,切割 ping 结果异常","red")self.flag_list.append(False)return Falseif res[5] == "0%":  # 判断丢包率custom_print("正常, ip: {} ping正常 丢包率0%".format(ip), "green")else:self.flag_list.append(False)custom_print("错误, ip: {} ping异常 丢包率100%".format(ip), "red")def main(self):"""主程序:return:"""pool= gevent.pool.Pool(100)  # 协程池固定为100个ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)]# 使用pool.map,语法:pool.map(func,iterator)
        pool.map(self.check_ping, ip_list)if __name__ == '__main__':startime = time.time()  # 开始时间
NetworkTest().main()endtime = time.time()take_time = endtime - startimeif take_time < 1:  # 判断不足1秒时take_time = 1  # 设置为1秒# 计算花费时间m, s = divmod(take_time, 60)h, m = divmod(m, 60)custom_print("本次花费时间 %02d:%02d:%02d" % (h, m, s),"green")

View Code

注意:方法只有一个参数的情况下,才可以使用pool.map。这样代码,看起来,比较精简!

如果有多个参数,还是得使用上面的方法。

比如:

process_list = []
pool= gevent.pool.Pool(100)  # 协程池固定为100个
for num in range(1, 256):ip = '192.168.10.{}'.format(num)# self.check_ping(ip)# 将任务加到列表中
    process_list.append(pool.spawn(self.check_ping, ip))gevent.joinall(process_list)  # 等待所有协程结束

View Code

posted @ 2019-05-18 16:38 肖祥 阅读(...) 评论(...) 编辑 收藏

python 协程池相关推荐

  1. python 协程池和pool.map用法

    一.问题描述 现在有一段代码,需要扫描一个网段内的ip地址,是否可以ping通. 执行起来效率太慢,需要使用协程. #!/usr/bin/env python # -*- coding: utf-8 ...

  2. python协程池操作mysql_在python中使用aiomysql异步操作mysql

    之前一直在使用mongo与redis,最近在项目中开始使用mysql数据库,由于现在的项目是全程异步的操作,所以在在网上查了下关于在python中异步的操作mysql,找来找去最后发现aiomysql ...

  3. python 协程池gevent.pool_进程池\线程池,协程,gevent

    目录 1. 进程池与线程池 2. 协程 3. gevent 4. 单线程下实现并发的套接字通信 首先写一个基于多线程的套接字 服务端: from socket import * from thread ...

  4. python协程池_python3下multiprocessing、threading和gevent性能对比—-暨进程池、线程池和协程池性能对比 | 学步园...

    目前计算机程序一般会遇到两类I/O:硬盘I/O和网络I/O.我就针对网络I/O的场景分析下python3下进程.线程.协程效率的对比.进程采用multiprocessing.Pool进程池,线程是自己 ...

  5. python协程池爬虫_Python之协程爬虫 小说网协程爬虫案例

    在Gevent协程的使用中我们已经学会简单的使用协程,这篇文章我们通过协程爬虫来测试一下具体的效果.Gevent遇到IO阻塞时会自动切换任务: from gevent import monkey mo ...

  6. python协程池操作mysql_python_协程方式操作数据库

    #!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import gevent import pymysql from gevent ...

  7. Python进程池,线程池,协程池

    线程池 import threading import time def myThread():for i in range(10):time.sleep()print('d') sep=thread ...

  8. Python协程(真才实学,想学的进来)

    真正有知识的人的成长过程,就像麦穗的成长过程:麦穗空的时候,麦子长得很快,麦穗骄傲地高高昂起,但是,麦穗成熟饱满时,它们开始谦虚,垂下麦芒. --蒙田<蒙田随笔全集> 在这里还是要推荐下我 ...

  9. 5分钟完全掌握Python协程

    1. 协程相关的概念 1.1 进程和线程 进程(Process)是应用程序启动的实例,拥有代码.数据和文件和独立的内存空间,是操作系统最小资源管理单元.每个进程下面有一个或者多个线程(Thread), ...

最新文章

  1. java随机产生字母排序_Java生成含字母和数字的6位随机字符串
  2. prim算法_最小生成树的本质是什么?Prim算法道破天机
  3. IcmpBackDoor
  4. 大数据为智慧城市建设添砖加瓦
  5. (二)原生JS实现 - 事件类方法
  6. android 数组排重方法,js数组去重方法集合 - osc_779ncf3o的个人空间 - OSCHINA - 中文开源技术交流社区...
  7. JavaScript-Date类的getMonth方法释疑
  8. 医院各领域榜单。22个科室、100种常见疾病
  9. 总结:KPCB中国合伙人周炜
  10. df pd 属性_pd.DataFrame()函数解析
  11. 重装系统是否影响计算机,电脑重装系统会对电脑造成伤害吗? 答案说出来你可能都不信...
  12. android 重力模拟,android的模拟器怎样仿真重力感应器
  13. 20190118 阿耐---《艰难的制造》
  14. 高新科技企业税收优惠有多少
  15. 4个图片无损压缩工具,快速批量压缩图片,图片高清不糊
  16. 实用的shell脚本合集
  17. 计算机基础课件第三章ppt,计算机应用基础(windows 7+office 2010)课件 第三章 Windows7操作系统.ppt.pdf-汇文网...
  18. 医院计算机网络安全应急预案,医院(妇幼保健院)计算机网络信息管理系统故障应急预案(技术保障实施细则)...
  19. 笔记本内存和台式机内存的区别
  20. mysql语句添加、删除索引(转)

热门文章

  1. 网络安全课第七节 文件上传漏洞的检测与防御
  2. 用python测测你身体是否健康
  3. JavaScript很少为人所知的玩法
  4. SSM3---SpringMVC
  5. 小小蜜蜂蜇壮汉不死,注射打针反致死
  6. ctf实战第一节:kali环境的熟悉:最新ZSH,初始化root密码,环境配置
  7. linux下重启邮件服务,Linux的postfix邮件服务
  8. 链路追踪jaeger
  9. Java-多线程-Future、FutureTask、CompletionService、CompletableFuture解决多线程并发中归集问题的效率对比
  10. 搜狗总收录批量查询教程 如何有效提高搜狗总收录的方法详解