python多线程_python多线程:控制线程数量
背景
前段时间学习了python的多线程爬虫,当时爬取一个图片网站,开启多线程后,并没有限制线程的数量,也就是说,如果下载1000张图片,会一次性开启1000个子线程同时进行下载
现在希望控制线程数量:例如每次只下载5张,当下载完成后再下载另外5张,直至全部完成
查了一些资料,发现在python中,threading 模块有提供 Semaphore类 和 BoundedSemaphore 类来限制线程数
详细说明可以看看下面几篇文章,写的很棒:
https://docs.python.org/3.5/library/threading.html?highlight=threading#semaphore-objects
https://www.liujiangblog.com/course/python/79
https://my.oschina.net/u/3524921/blog/920303
https://zhuanlan.zhihu.com/p/34159519
官网给出例子如下:信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定的任何情况下,都应使用有界信号量。在产生任何工作线程之前,您的主线程将初始化信号量:
maxconnections = 5# ...pool_sema = BoundedSemaphore(value=maxconnections)
产生后,工作线程在需要连接到服务器时会调用信号量的获取和释放方法:
with pool_sema: conn = connectdb() try: # ... use connection ... finally: conn.close()
改造之前的多线程爬虫
首先贴出原来的代码
# -*- coding:utf-8 -*-import requestsfrom requests.exceptions import RequestExceptionimport os, timeimport refrom lxml import etreeimport threadinglock = threading.Lock()def get_html(url): """ 定义一个方法,用于获取一个url页面的响应内容 :param url: 要访问的url :return: 响应内容 """ response = requests.get(url, timeout=10) # print(response.status_code) try: if response.status_code == 200: # print(response.text) return response.text else: return None except RequestException: print("请求失败") # return Nonedef parse_html(html_text): """ 定义一个方法,用于解析页面内容,提取图片url :param html_text: :return:一个页面的图片url集合 """ html = etree.HTML(html_text) if len(html) > 0: img_src = html.xpath("//img[@class='photothumb lazy']/@data-original") # 元素提取方法 # print(img_src) return img_src else: print("解析页面元素失败")def get_image_pages(url): """ 获取所查询图片结果的所有页码 :param url: 查询图片url :return: 总页码数 """ html_text = get_html(url) # 获取搜索url响应内容 # print(html_text) if html_text is not None: html = etree.HTML(html_text) # 生成XPath解析对象 last_page = html.xpath("//div[@class='pages']//a[last()]/@href") # 提取最后一页所在href链接 print(last_page) if last_page: max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group() # 使用正则表达式提取链接中的页码数字 print(max_page) print(type(max_page)) return int(max_page) # 将字符串页码转为整数并返回 else: print("暂无数据") return None else: print("查询结果失败")def get_all_image_url(page_number): """ 获取所有图片的下载url :param page_number: 爬取页码 :return: 所有图片url的集合 """ base_url = 'https://imgbin.com/free-png/naruto/' image_urls = [] x = 1 # 定义一个标识,用于给每个图片url编号,从1递增 for i in range(1, page_number): url = base_url + str(i) # 根据页码遍历请求url try: html = get_html(url) # 解析每个页面的内容 if html: data = parse_html(html) # 提取页面中的图片url # print(data) # time.sleep(3) if data: for j in data: image_urls.append({ 'name': x, 'value': j }) x += 1 # 每提取一个图片url,标识x增加1 except RequestException as f: print("遇到错误:", f) continue # print(image_urls) return image_urlsdef get_image_content(url): """请求图片url,返回二进制内容""" # print("正在下载", url) try: r = requests.get(url, timeout=15) if r.status_code == 200: return r.content return None except RequestException: return Nonedef main(url, image_name): """ 主函数:实现下载图片功能 :param url: 图片url :param image_name: 图片名称 :return: """ semaphore.acquire() # 加锁,限制线程数 print('当前子线程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}个文件保存成功'.format(image_name)) else: print("第{}个文件已存在".format(image_name)) semaphore.release() # 解锁imgbin-多线程-重写run方法.py except FileNotFoundError as f: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", f) raise except TypeError as e: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", e)class MyThread(threading.Thread): """继承Thread类重写run方法创建新进程""" def __init__(self, func, args): """ :param func: run方法中要调用的函数名 :param args: func函数所需的参数 """ threading.Thread.__init__(self) self.func = func self.args = args def run(self): print('当前子线程: {}'.format(threading.current_thread().name)) self.func(self.args[0], self.args[1]) # 调用func函数 # 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入if __name__ == '__main__': start = time.time() print('这是主线程:{}'.format(threading.current_thread().name)) urls = get_all_image_url(5) # 获取所有图片url列表 thread_list = [] # 定义一个列表,向里面追加线程 semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法 for t in urls: # print(i) m = MyThread(main, (t["value"], t["name"])) # 调用MyThread类,得到一个实例 thread_list.append(m) for m in thread_list: m.start() # 调用start()方法,开始执行 for m in thread_list: m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出 end = time.time() print(end-start) # get_image_pages("https://imgbin.com/free-png/Naruto")
将代码进行改造 1、下面的第8、9行表示调用 threading 的 BoundedSemaphore类,初始化信号量为5,把结果赋给变量 pool_sema
if __name__ == '__main__': start = time.time() print('这是主线程:{}'.format(threading.current_thread().name)) urls = get_all_image_url(5) # 获取所有图片url列表 thread_list = [] # 定义一个列表,向里面追加线程 max_connections = 5 # 定义最大线程数 pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法 for t in urls: # print(i) m = MyThread(main, (t["value"], t["name"])) # 调用MyThread类,得到一个实例 thread_list.append(m) for m in thread_list: m.start() # 调用start()方法,开始执行 for m in thread_list: m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出 end = time.time() print(end-start)
2、修改main()函数
(1)方法一:通过with语句实现,第9行添加 with pool_sema:
使用 with 语句来获得一个锁、条件变量或信号量,相当于调用 acquire();离开 with 块后,会自动调用 release()
def main(url, image_name): """ 主函数:实现下载图片功能 :param url: 图片url :param image_name: 图片名称 :return: """ with pool_sema: print('当前子线程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}个文件保存成功'.format(image_name)) else: print("第{}个文件已存在".format(image_name)) except FileNotFoundError as f: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", f) raise except TypeError as e: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", e)
(2)方法二:直接使用acquire() 和 release()下面的第8行调用 acquire(),第24行调用 release()
def main(url, image_name): """ 主函数:实现下载图片功能 :param url: 图片url :param image_name: 图片名称 :return: """ pool_sema.acquire() # 加锁,限制线程数 # with pool_sema: print('当前子线程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/' try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取 with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}个文件保存成功'.format(image_name)) else: print("第{}个文件已存在".format(image_name)) pool_sema.release() # 解锁imgbin-多线程-重写run方法.py except FileNotFoundError as f: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", f) raise except TypeError as e: print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url)) print("报错:", e)
最终效果是一样的,每次启用5个线程,完成后再启动下一批
喜欢记得来一个
python多线程_python多线程:控制线程数量相关推荐
- python如何使用多线程_python多线程与线程
进程与线程的概念 进程 考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I ...
- python中线程里面多线程_Python中的线程和多线程是什么
一.线程的概念 一个进程里面至少有一个控制线程,进程的概念只是一种抽象的概念,真正在CPU上面调度的是进程里面的线程,就好比真正在地铁这个进程里面工作的实际上是地铁里面的线程,北京地铁里面至少要有一个 ...
- C#中的多线程-线程同步基础 (控制线程数量)
同步要领 下面的表格列展了.NET对协调或同步线程动作的可用的工具: 简易阻止方法 构成 目的 Sleep 阻止给定的时间周期 Join 等待另一个线程完成 锁系统 构成 目的 跨进程? 速度 loc ...
- python多线程_Python多线程和队列结合demo
每周一07:22,准时为你充电 一.使用场景 大家都知道python的多线程不是真正的多线程,但是对于io类型的任务,多线程还是能发挥作用的.那么多个线程之间是如何进行变量共享的呢,很多时候我们可以借 ...
- python 选择题 多线程_python多线程练习题
python多线程练习题 多线程练习题目,涉及知识点较多,属于很好的练习题. 题目要求 通过多线程实现类似linux中的>>功能,也就是将日志记录到指定的文件中. 题目分析 基本为main ...
- python守护多线程_Python多线程Threading、子线程与守护线程实例详解
线程Threading: python中多线程需要使用threading模块 线程的创建与运行: 1.直接调用threading的Thread类: 线程的创建:线程对象=thread.Thread(t ...
- python创建多线程_Python 多线程,threading模块,创建子线程的两种方式示例
本文实例讲述了Python 多线程,threading模块,创建子线程的两种方式.分享给大家供大家参考,具体如下: GIL(全局解释器锁)是C语言版本的Python解释器中专有的,GIL的存在让多线程 ...
- python线程池超过最大数量_python自定义线程池控制线程数量
1.自定义线程池import threading import Queue import time queue = Queue.Queue() def put_data_in_queue(): for ...
- python scrapy框架基如何实现多线程_Python实现在线程里运行scrapy的方法
本文实例讲述了Python实现在线程里运行scrapy的方法.分享给大家供大家参考.具体如下: 如果你希望在一个写好的程序里调用scrapy,就可以通过下面的代码,让scrapy运行在一个线程里. & ...
最新文章
- python windows epoll_Windows 10生产力提升之WSL实践
- MFC 学习的基本概念
- python 嵌入键值数据库_PupDB 一个用Python编写基于文件的简单键值数据库
- 极大似然估计(Maximum Likelihood)与无监督
- 从域环境搭建到域渗透
- JAVA 技术类分享(一)
- 飞康任命Gartner前分析师担任亚洲区市场总监
- 查找-动态查找表-二叉排序树
- c语言不允许有常量的是,C语言试卷第10套含答案.doc-资源下载人人文库网
- Linux 进程间通信 --- 信号通信 --- signal --- signal(SIGINT, my_func); --- 按键驱动异步通知(转)...
- 因为某种原因阻止文本引擎初始化_文成县搜索引擎优化如何,神马SEO优化_万推霸屏...
- 和WiFi共享精灵一起成长
- Mapper method 'com.XXX.dao.XXXMapper.XXX' has an unsupported return type: class XXX
- Unity鼠标左键控制物体
- 域服务器文件备份,怎么备份域服务器?
- protocol buffers 序列化数据
- SpringBoot(5)响应式编程WebFlux
- 兔子- Exception raised during rendering: com/android/util/PropertiesMap (Details)
- java部分基础知识 (二):计算机组成原理 原码 补码 反码 按位符 移位符 按位与 按位或 按位抑或 非 分析hashMap的put方法原理
- 2018CCPC吉林赛区 题解
热门文章
- java中的语句有哪些_java中的循环语句有哪些
- python socket编程_Python学习记录-socket编程
- php内置的数组函数大全,php数组的内置函数大全
- mysql增加sort_buffer_设置sort_buffer_size
- arcgis导入excel字段不显示_ArcGIS从excel中导入坐标出现空白无法选择字段
- 关于单纤与双纤光端机的区别介绍
- php mysql ppt,7PHP访问数据库分析.ppt
- html 按钮 按下 状态_科普|你身边的手动火灾报警按钮,您了解吗?
- 天地一体化信息网络发展与拟态技术应用构想
- springboot异步注解_Spring Boot 2 :Spring Boot 中的响应式编程和 WebFlux 入门