前言

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。

相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

主线程可以获取某一个线程(或者任务的)的状态,以及返回值。

当一个线程完成的时候,主线程能够立即知道。

让多线程和多进程的编码接口一致。

线程池的基本使用

# coding: utf-8

from concurrent.futures import ThreadPoolExecutor

import time

def spider(page):

time.sleep(page)

print(f"crawl task{page} finished")

return page

with ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池

task1 = t.submit(spider, 1)

task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中

task3 = t.submit(spider, 3)

print(f"task1: {task1.done()}") # 通过done来判断线程是否完成

print(f"task2: {task2.done()}")

print(f"task3: {task3.done()}")

time.sleep(2.5)

print(f"task1: {task1.done()}")

print(f"task2: {task2.done()}")

print(f"task3: {task3.done()}")

print(task1.result()) # 通过result来获取返回值

执行结果如下:

task1: False

task2: False

task3: False

crawl task1 finished

crawl task2 finished

task1: True

task2: True

task3: False

1

crawl task3 finished

1.使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

2.使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

3.通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。

4.使用 result() 方法可以获取任务的返回值。

主要方法

wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三个参数:

fs: 表示需要执行的序列

timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回

return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回

还是用上面那个例子来熟悉用法

示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED

import time

def spider(page):

time.sleep(page)

print(f"crawl task{page} finished")

return page

with ThreadPoolExecutor(max_workers=5) as t:

all_task = [t.submit(spider, page) for page in range(1, 5)]

wait(all_task, return_when=FIRST_COMPLETED)

print('finished')

print(wait(all_task, timeout=2.5))

# 运行结果

crawl task1 finished

finished

crawl task2 finished

crawl task3 finished

DoneAndNotDoneFutures(done={, , }, not_done={})

crawl task4 finished

1.代码中返回的条件是:当完成第一个任务的时候,就停止等待,继续主线程任务

2.由于设置了延时, 可以看到最后只有 task4 还在运行中

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果

用法如下:

# coding: utf-8

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

def spider(page):

time.sleep(page)

print(f"crawl task{page} finished")

return page

def main():

with ThreadPoolExecutor(max_workers=5) as t:

obj_list = []

for page in range(1, 5):

obj = t.submit(spider, page)

obj_list.append(obj)

for future in as_completed(obj_list):

data = future.result()

print(f"main: {data}")

# 执行结果

crawl task1 finished

main: 1

crawl task2 finished

main: 2

crawl task3 finished

main: 3

crawl task4 finished

main: 4

as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。

当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时,先完成的任务会先返回给主线程。

map

map(fn, *iterables, timeout=None)

fn: 第一个参数 fn 是需要线程执行的函数;

iterables:第二个参数接受一个可迭代对象;

timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。

用法如下:

import time

from concurrent.futures import ThreadPoolExecutor

def spider(page):

time.sleep(page)

return page

start = time.time()

executor = ThreadPoolExecutor(max_workers=4)

i = 1

for result in executor.map(spider, [2, 3, 1, 4]):

print("task{}:{}".format(i, result))

i += 1

# 运行结果

task1:2

task2:3

task3:1

task4:4

使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。

上面的代码对列表中的每个元素都执行 spider() 函数,并分配各线程池。

可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 1s 的任务先执行完成,也会先打印前面提交的任务返回的结果。

多线程实战

以某网站为例,演示线程池和单线程两种方式爬取的差异

# coding: utf-8

import requests

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

import json

from requests import adapters

from proxy import get_proxies

headers = {

"Host": "splcgk.court.gov.cn",

"Origin": "https://splcgk.court.gov.cn",

"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",

"Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",

}

url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):

data = {

"bt": "",

"fydw": "",

"pageNum": page,

}

for _ in range(5):

try:

response = requests.post(url, headers=headers, data=data, proxies=get_proxies())

json_data = response.json()

except (json.JSONDecodeError, adapters.SSLError):

continue

else:

break

else:

return {}

return json_data

def main():

with ThreadPoolExecutor(max_workers=10) as t:

obj_list = []

begin = time.time()

for page in range(1, 15):

obj = t.submit(spider, page)

obj_list.append(obj)

for future in as_completed(obj_list):

data = future.result()

print(data)

print('*' * 50)

times = time.time() - begin

print(times)

if __name__ == "__main__":

main()

运行结果:

单线程实战

下面我们可以使用单线程来爬取,代码基本和上面的一样,加个单线程函数

代码如下:

# coding: utf-8

import requests

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

import json

from requests import adapters

from proxy import get_proxies

headers = {

"Host": "splcgk.court.gov.cn",

"Origin": "https://splcgk.court.gov.cn",

"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",

"Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",

}

url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):

data = {

"bt": "",

"fydw": "",

"pageNum": page,

}

for _ in range(5):

try:

response = requests.post(url, headers=headers, data=data, proxies=get_proxies())

json_data = response.json()

except (json.JSONDecodeError, adapters.SSLError):

continue

else:

break

else:

return {}

return json_data

def single():

begin = time.time()

for page in range(1, 15):

data = spider(page)

print(data)

print('*' * 50)

times = time.time() - begin

print(times)

if __name__ == "__main__":

single()

运行结果:

可以看到,总共花了 19 秒。真是肉眼可见的差距啊!如果数据量大的话,运行时间差距会更大!

以上就是python线程池 ThreadPoolExecutor 的用法示例的详细内容,更多关于python线程池 ThreadPoolExecutor 的用法及实战的资料请关注我们其它相关文章!

本文标题: python线程池 ThreadPoolExecutor 的用法示例

本文地址: http://www.cppcns.com/jiaoben/python/352744.html

python的线程池_python线程池 ThreadPoolExecutor 的用法示例相关推荐

  1. python循环语句画图_Python基础之循环语句用法示例【for、while循环】

    本文实例讲述了Python基础之循环语句用法.分享给大家供大家参考,具体如下: while 循环 Python中while语句的一般形式: while 判断条件: statements 同样需要注意冒 ...

  2. python线程池wait_python线程池 ThreadPoolExecutor 的用法示例

    前言 从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进 ...

  3. python 线程池_Python线程池及其原理和使用(超级详细)

    系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 线程池在系统启动时即 ...

  4. python 线程池_python线程池

    线程池概念 什么是线程池? 诸如web服务器.数据库服务器.文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务. 构建服务器应用程序的一个过于简单的模型是:每当一个请求到 ...

  5. python 判断线程状态_Python线程指南

    Python线程指南 本文介绍了Python对于线程的支持,包括"学会"多线程编程需要掌握的基础以及Python两个线程标准库的完整介绍及使用示例. 注意:本文基于Python2. ...

  6. python线程状态_Python线程

    1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...

  7. python 线程退出_python线程退出

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 如果某线程并未使用很多 io 操作, 它会在自己的时间片内一直占用处理器(和 g ...

  8. python进程池_python进程池剖析(一)

    python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序.如果需要 ...

  9. python 判断线程状态_Python 线程和进程

    前言 学编程,谁没有为线程折腾过啊. 目录 线程与进程 线程与进程是操作系统里面的术语,简单来讲,每一个应用程序都有一个自己的进程. 操作系统会为这些进程分配一些执行资源,例如内存空间等. 在进程中, ...

  10. python结束线程类_Python线程指南(转)

    1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...

最新文章

  1. mysql导入sas文件夹_MYSQL导出HTML格式数据如何导入到SAS中
  2. 【算法】QuickSort
  3. IntelliJ IDEA 2020修改菜单显示大小
  4. django-分页自带的分页-自定义分页
  5. Assemby 打包并启动jar包
  6. 【声学基础】概述——传播
  7. 漫画家Neal Adams推出以蝙蝠侠等超级英雄为主题的NFT
  8. bzoj4172: 弹珠
  9. linux 系统如何启动服务,如何查看和停止Linux启动的服务
  10. 配置JDK环境变量详细步骤
  11. 浅析芝麻信用分征信体系
  12. 联想本装系统stop:0X000007B错误[转]
  13. Android开发项目管理7宗罪之五——项目组个性文档文件的管理
  14. 专访美团外卖曹振团:天下武功唯快不破
  15. Vue安装element ui踩坑
  16. 中科大统计学习(刘东)作业1
  17. Matlab求时变微分方程组解,Matlab求常微分方程组的解析解
  18. ElasticSearch_高级_(集群+分片)
  19. java处理1927 12 31_java - 为什么将这两次相减(在1927年)会得出奇怪的结果? - 堆栈内存溢出...
  20. leetcode LCP2 分式化简(C++)

热门文章

  1. js 设计模式(23种)
  2. cv2.cvtColor() 的使用
  3. 2021年中国上市公司发明授权数量及分布:发明授权数量连续5年增长,广东省位居全国第一[图]
  4. idea中安装uml工具插件
  5. php公众号模板在哪,微信公众号页面模板在哪里设置的?微信公众号页面模板怎么设置?...
  6. windows服务器连接教程-手机连接电脑连接
  7. 【CRC】CRC推导(二)模二除法
  8. Evernote推出实体笔记本Evernote Business Notebook,支持搜索手写笔记,用实体便签将笔记同步到Evernote上的相应分类
  9. 纹理分析方法:共生矩阵的计算
  10. item_search - 按关键字搜索EBAY商品