背景

前段时间学习了python的多线程爬虫,当时爬取一个图片网站,开启多线程后,并没有限制线程的数量,也就是说,如果下载1000张图片,会一次性开启1000个子线程同时进行下载

现在希望控制线程数量:例如每次只下载5张,当下载完成后再下载另外5张,直至全部完成

查了一些资料,发现在python中,threading 模块有提供 Semaphore类 和 BoundedSemaphore 类来限制线程数

详细说明可以看看下面几篇文章,写的很棒:

  • https://docs.python.org/3.5/library/threading.html?highlight=threading#semaphore-objects

  • https://www.liujiangblog.com/course/python/79

  • https://my.oschina.net/u/3524921/blog/920303

  • https://zhuanlan.zhihu.com/p/34159519

官网给出例子如下:信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定的任何情况下,都应使用有界信号量。在产生任何工作线程之前,您的主线程将初始化信号量:

maxconnections = 5# ...pool_sema = BoundedSemaphore(value=maxconnections)

产生后,工作线程在需要连接到服务器时会调用信号量的获取和释放方法:

with pool_sema:    conn = connectdb()    try:        # ... use connection ...    finally:        conn.close()

改造之前的多线程爬虫

首先贴出原来的代码

# -*- coding:utf-8 -*-import requestsfrom requests.exceptions import RequestExceptionimport os, timeimport refrom lxml import etreeimport threadinglock = threading.Lock()def get_html(url):    """    定义一个方法,用于获取一个url页面的响应内容    :param url: 要访问的url    :return: 响应内容    """    response = requests.get(url, timeout=10)    # print(response.status_code)    try:        if response.status_code == 200:            # print(response.text)            return response.text        else:             return None    except RequestException:        print("请求失败")        # return Nonedef parse_html(html_text):    """    定义一个方法,用于解析页面内容,提取图片url    :param html_text:    :return:一个页面的图片url集合    """    html = etree.HTML(html_text)    if len(html) > 0:        img_src = html.xpath("//img[@class='photothumb lazy']/@data-original")  # 元素提取方法        # print(img_src)        return img_src    else:        print("解析页面元素失败")def get_image_pages(url):    """    获取所查询图片结果的所有页码    :param url: 查询图片url    :return: 总页码数    """    html_text = get_html(url)  # 获取搜索url响应内容    # print(html_text)    if html_text is not None:        html = etree.HTML(html_text)  # 生成XPath解析对象        last_page = html.xpath("//div[@class='pages']//a[last()]/@href")  # 提取最后一页所在href链接        print(last_page)        if last_page:            max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group()  # 使用正则表达式提取链接中的页码数字            print(max_page)            print(type(max_page))            return int(max_page)  # 将字符串页码转为整数并返回        else:            print("暂无数据")            return None    else:        print("查询结果失败")def get_all_image_url(page_number):    """    获取所有图片的下载url    :param page_number: 爬取页码    :return: 所有图片url的集合    """    base_url = 'https://imgbin.com/free-png/naruto/'    image_urls = []    x = 1  # 定义一个标识,用于给每个图片url编号,从1递增    for i in range(1, page_number):        url = base_url + str(i)  # 根据页码遍历请求url        try:            html = get_html(url)  # 解析每个页面的内容            if html:                data = parse_html(html)  # 提取页面中的图片url                # print(data)                # time.sleep(3)                if data:                    for j in data:                        image_urls.append({                            'name': x,                            'value': j                        })                        x += 1  # 每提取一个图片url,标识x增加1        except RequestException as f:            print("遇到错误:", f)            continue    # print(image_urls)    return image_urlsdef get_image_content(url):    """请求图片url,返回二进制内容"""    # print("正在下载", url)    try:        r = requests.get(url, timeout=15)        if r.status_code == 200:            return r.content        return None    except RequestException:        return Nonedef main(url, image_name):    """    主函数:实现下载图片功能    :param url: 图片url    :param image_name: 图片名称    :return:    """    semaphore.acquire()  # 加锁,限制线程数    print('当前子线程: {}'.format(threading.current_thread().name))    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'    try:        file_path = '{0}/{1}.jpg'.format(save_path, image_name)        if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取            with open(file_path, 'wb') as f:                f.write(get_image_content(url))                f.close()                print('第{}个文件保存成功'.format(image_name))        else:            print("第{}个文件已存在".format(image_name))        semaphore.release()  # 解锁imgbin-多线程-重写run方法.py    except FileNotFoundError as f:        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))        print("报错:", f)        raise    except TypeError as e:        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))        print("报错:", e)class MyThread(threading.Thread):    """继承Thread类重写run方法创建新进程"""    def __init__(self, func, args):        """        :param func: run方法中要调用的函数名        :param args: func函数所需的参数        """        threading.Thread.__init__(self)        self.func = func        self.args = args    def run(self):        print('当前子线程: {}'.format(threading.current_thread().name))        self.func(self.args[0], self.args[1])        # 调用func函数        # 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入if __name__ == '__main__':    start = time.time()    print('这是主线程:{}'.format(threading.current_thread().name))    urls = get_all_image_url(5)  # 获取所有图片url列表    thread_list = []  # 定义一个列表,向里面追加线程    semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法    for t in urls:        # print(i)        m = MyThread(main, (t["value"], t["name"]))  # 调用MyThread类,得到一个实例        thread_list.append(m)    for m in thread_list:        m.start()  # 调用start()方法,开始执行    for m in thread_list:        m.join()  # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出    end = time.time()    print(end-start)    # get_image_pages("https://imgbin.com/free-png/Naruto")

将代码进行改造  1、下面的第8、9行表示调用 threading 的 BoundedSemaphore类,初始化信号量为5,把结果赋给变量 pool_sema

if __name__ == '__main__':    start = time.time()    print('这是主线程:{}'.format(threading.current_thread().name))    urls = get_all_image_url(5)  # 获取所有图片url列表    thread_list = []  # 定义一个列表,向里面追加线程    max_connections = 5  # 定义最大线程数    pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法    for t in urls:        # print(i)        m = MyThread(main, (t["value"], t["name"]))  # 调用MyThread类,得到一个实例        thread_list.append(m)    for m in thread_list:        m.start()  # 调用start()方法,开始执行    for m in thread_list:        m.join()  # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出    end = time.time()    print(end-start)

2、修改main()函数

(1)方法一:通过with语句实现,第9行添加 with pool_sema:

使用 with 语句来获得一个锁、条件变量或信号量,相当于调用 acquire();离开 with 块后,会自动调用 release()

def main(url, image_name):    """    主函数:实现下载图片功能    :param url: 图片url    :param image_name: 图片名称    :return:    """    with pool_sema:        print('当前子线程: {}'.format(threading.current_thread().name))        save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'        try:            file_path = '{0}/{1}.jpg'.format(save_path, image_name)            if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取                with open(file_path, 'wb') as f:                    f.write(get_image_content(url))                    f.close()                    print('第{}个文件保存成功'.format(image_name))            else:                print("第{}个文件已存在".format(image_name))        except FileNotFoundError as f:            print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))            print("报错:", f)            raise        except TypeError as e:            print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))            print("报错:", e)

(2)方法二:直接使用acquire() 和 release()下面的第8行调用 acquire(),第24行调用 release()

def main(url, image_name):    """    主函数:实现下载图片功能    :param url: 图片url    :param image_name: 图片名称    :return:    """    pool_sema.acquire()  # 加锁,限制线程数    # with pool_sema:    print('当前子线程: {}'.format(threading.current_thread().name))    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'    try:        file_path = '{0}/{1}.jpg'.format(save_path, image_name)        if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取            with open(file_path, 'wb') as f:                f.write(get_image_content(url))                f.close()                print('第{}个文件保存成功'.format(image_name))        else:            print("第{}个文件已存在".format(image_name))        pool_sema.release()  # 解锁imgbin-多线程-重写run方法.py    except FileNotFoundError as f:        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))        print("报错:", f)        raise    except TypeError as e:        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))        print("报错:", e)

最终效果是一样的,每次启用5个线程,完成后再启动下一批

喜欢记得来一个

python多线程_python多线程:控制线程数量相关推荐

  1. python如何使用多线程_python多线程与线程

    进程与线程的概念 进程 考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I ...

  2. python中线程里面多线程_Python中的线程和多线程是什么

    一.线程的概念 一个进程里面至少有一个控制线程,进程的概念只是一种抽象的概念,真正在CPU上面调度的是进程里面的线程,就好比真正在地铁这个进程里面工作的实际上是地铁里面的线程,北京地铁里面至少要有一个 ...

  3. C#中的多线程-线程同步基础 (控制线程数量)

    同步要领 下面的表格列展了.NET对协调或同步线程动作的可用的工具: 简易阻止方法 构成 目的 Sleep 阻止给定的时间周期 Join 等待另一个线程完成 锁系统 构成 目的 跨进程? 速度 loc ...

  4. python多线程_Python多线程和队列结合demo

    每周一07:22,准时为你充电 一.使用场景 大家都知道python的多线程不是真正的多线程,但是对于io类型的任务,多线程还是能发挥作用的.那么多个线程之间是如何进行变量共享的呢,很多时候我们可以借 ...

  5. python 选择题 多线程_python多线程练习题

    python多线程练习题 多线程练习题目,涉及知识点较多,属于很好的练习题. 题目要求 通过多线程实现类似linux中的>>功能,也就是将日志记录到指定的文件中. 题目分析 基本为main ...

  6. python守护多线程_Python多线程Threading、子线程与守护线程实例详解

    线程Threading: python中多线程需要使用threading模块 线程的创建与运行: 1.直接调用threading的Thread类: 线程的创建:线程对象=thread.Thread(t ...

  7. python创建多线程_Python 多线程,threading模块,创建子线程的两种方式示例

    本文实例讲述了Python 多线程,threading模块,创建子线程的两种方式.分享给大家供大家参考,具体如下: GIL(全局解释器锁)是C语言版本的Python解释器中专有的,GIL的存在让多线程 ...

  8. python线程池超过最大数量_python自定义线程池控制线程数量

    1.自定义线程池import threading import Queue import time queue = Queue.Queue() def put_data_in_queue(): for ...

  9. python scrapy框架基如何实现多线程_Python实现在线程里运行scrapy的方法

    本文实例讲述了Python实现在线程里运行scrapy的方法.分享给大家供大家参考.具体如下: 如果你希望在一个写好的程序里调用scrapy,就可以通过下面的代码,让scrapy运行在一个线程里. & ...

最新文章

  1. python windows epoll_Windows 10生产力提升之WSL实践
  2. MFC 学习的基本概念
  3. python 嵌入键值数据库_PupDB 一个用Python编写基于文件的简单键值数据库
  4. 极大似然估计(Maximum Likelihood)与无监督
  5. 从域环境搭建到域渗透
  6. JAVA 技术类分享(一)
  7. 飞康任命Gartner前分析师担任亚洲区市场总监
  8. 查找-动态查找表-二叉排序树
  9. c语言不允许有常量的是,C语言试卷第10套含答案.doc-资源下载人人文库网
  10. Linux 进程间通信 --- 信号通信 --- signal --- signal(SIGINT, my_func); --- 按键驱动异步通知(转)...
  11. 因为某种原因阻止文本引擎初始化_文成县搜索引擎优化如何,神马SEO优化_万推霸屏...
  12. 和WiFi共享精灵一起成长
  13. Mapper method 'com.XXX.dao.XXXMapper.XXX' has an unsupported return type: class XXX
  14. Unity鼠标左键控制物体
  15. 域服务器文件备份,怎么备份域服务器?
  16. protocol buffers 序列化数据
  17. SpringBoot(5)响应式编程WebFlux
  18. 兔子- Exception raised during rendering: com/android/util/PropertiesMap (Details)
  19. java部分基础知识 (二):计算机组成原理 原码 补码 反码 按位符 移位符 按位与 按位或 按位抑或 非 分析hashMap的put方法原理
  20. 2018CCPC吉林赛区 题解

热门文章

  1. java中的语句有哪些_java中的循环语句有哪些
  2. python socket编程_Python学习记录-socket编程
  3. php内置的数组函数大全,php数组的内置函数大全
  4. mysql增加sort_buffer_设置sort_buffer_size
  5. arcgis导入excel字段不显示_ArcGIS从excel中导入坐标出现空白无法选择字段
  6. 关于单纤与双纤光端机的区别介绍
  7. php mysql ppt,7PHP访问数据库分析.ppt
  8. html 按钮 按下 状态_科普|你身边的手动火灾报警按钮,您了解吗?
  9. 天地一体化信息网络发展与拟态技术应用构想
  10. springboot异步注解_Spring Boot 2 :Spring Boot 中的响应式编程和 WebFlux 入门