python结束线程池正在运行的线程_python之线程与线程池
#进程是资源分配的最小单位,线程是CPU调度的最小单位.每一个进程中至少有一个线程。#传统的不确切使用线程的程序称为只含有一个线程或单线程程序,而可以使用线程的程序被称为多线程程序,在程序中使用一个线程的方法#被称为多线程#线程的模块:#thread >> 实现线程的低级接口#threading>>> 可以提供高级方法
#同一进程下的各个线程是可以共享该进程的所有的资源的,各个线程之间是可以相互影响的
1.线程创建的两种方式,与进程创建的两种方式基本
from threading importThreadfrom multiprocessing importProcessimporttimedeffucn1(n):
time.sleep(1)print('XXXXXXXXXXXXX',n)if __name__ == '__main__':#p = Process(target=fucn1,args=(1,))
t = Thread(target=fucn1,args=(1,))print(t1.isAlive())#返回线程是否活动的
print(t1.getName())#返回线程名
t1.setName()#设置线程名
t.start()#开启线程的速度非常快
t.join() #等待子线程运行结束之后才进行下面的代码
print('主线程结束')
方式1
classgg(Thread):def __init__(self,n):
super().__init__()
self.n=ndefrun(self):print('xxx')print(self.n)if __name__ == '__main__':
t1= gg(66)
t1.start()
与进程创建运行相似
方式2
# threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
1.1
from threading importThreadfrom multiprocessing importProcessimporttimedefjisuan():for i in range(100000):
i+= 1
#print(i)
defpro():
p= Process(target=jisuan)
p1= Process(target=jisuan)
p.start()
p1.start()
p1.join()
p.join()defthreading():
t1= Thread(target=jisuan)
t2= Thread(target=jisuan)
t1.start()
t2.start()if __name__ == '__main__':
t1=time.time()
pro()
t2=time.time()
t3=time.time()
threading()
t4=time.time()print(t2-t1)#0.24815773963928223
print(t4-t3)#0.01202702522277832
线程与进程之间的效率对比
1.2
from threading importThreadimportthreadingfrom multiprocessing importProcessimportosdefwork():importtime
time.sleep(3)print(threading.current_thread().getName())if __name__ == '__main__':#在主进程下开启线程
t=Thread(target=work)
t.start()print(threading.current_thread())#主线程对象
print(threading.current_thread().getName()) #主线程名称
print(threading.current_thread().ident) #主线程ID
print(threading.get_ident()) #主线程ID
print(threading.enumerate()) #连同主线程在内有两个运行的线程
print(threading.active_count())print('主线程/主进程')#'''#打印结果:#<_mainthread started>#MainThread#14104#[<_mainthread started>, ]#主线程/主进程#Thread-1#'''
一些不常用的方法
1.3
#一个主线程要等待所有的非守护线程结束才结束#(主线程的代码执行完之后主线程并没有结束,而要等待所有的非守护进程执行完并返回结果后才结束)
#主进程默认是在执行完代码之后,相当于结束了,并不关心所有的子进程的执行结果,只是关心所有的子进程是否结束的的信号,#接收到所有子进程结束的信号之后,主进程(程序)才结束
importtimefrom threading importThreaddeffunc():
time.sleep(3)print('任务1')deffunc1():
time.sleep(2)print('任务2')if __name__ == '__main__':
t1= Thread(target=func)
t2= Thread(target=func1)
t1.daemon=True
t1.start()
t2.start()print('主线程结束')#结果
'''主线程结束
任务2'''
守护进程
1.4
from threading importThreadfrom multiprocessing importProcessimporttime
a= 100
deffucn1():globala
a-= 1 #等同于
temp =a
time.sleep(0.001)#验证关键点
temp = temp -1a=temp
time.sleep(5)if __name__ == '__main__':
gg=[]for i in range(100):
t= Thread(target=fucn1)
t.start()#开启线程的速度非常快
gg.append(t)print(t.is_alive())
[tt.join()for tt ingg]print(a)print('主线程结束')#线程共享进程的数据,由于数据是共享的也会有数据的不安全的情况(数据混乱),#但是由于线程的创建的速度非常快,如果加上系统的线程不多的话,#效果不明显#解决共享数据不安全: 加锁 ,对取值和修改值的的操作开始加锁(与多进程加锁一样)
验证线程之间的数据是共享的,但是也存在数据的安全的问题
信号量,事件等与进程的操作方法一样#Semaphore管理一个内置的计数器,#每当调用acquire()时内置计数器-1;#调用release() 时内置计数器+1;#计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。#
#实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):## 基本代码如下#def func(sm):#sm.acquire()## 赋值或修改的代码#sm.release()#if __name__ == '__main__':#sm=Semaphore(5)#t = Thread(target=func)
#事件与进程的一样,#event.isSet() 查看等待的状态不一样#event.wait():如果 event.isSet()==False将阻塞线程;#event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;#event.clear():恢复event的状态值为False。
与进程功能基本一样的相关说明
线程队列
#线程队列#使用import queue,用法与进程Queue一样,直接引入,不用通过threading 模块引入
#1>class queue.Queue(maxsize=0) #先进先出
importqueue#q = queue.Queue(3)#创建一个容量为3的队列#q.put(1)#q.put(2)#q.put(3)#在队列塞满三个元素后,如果继续塞元素,就会进入一个阻塞的状态## 但是如果使用q.put_nowait()塞元素的话,到塞满之后再塞的话,就会直接抛出队列已满的异常,## 不会进入阻塞的状态,与q.get_nowait()相似#print(q.get())#1#print(q.get())#2#print(q.get())#3##按照添加的顺序进行输出#print(q.get())#>>>>>>>>>取到第四个的时候,队列已经是空的了,如果使用这个的话,就会进入## 阻塞的状态#print(q.get_nowait())#>>>>>>但是如果取到第四个使用这个的话,不会进入阻塞的状态,直接##抛出异常#
#
## 2>class queue.LifoQueue(maxsize=0) #先进后出#q = queue.LifoQueue(3)#q.put(1)#q.put(2)#q.put(None)#### # 取值的时候输出为#print(q.get())#None#print(q.get())#2#print(q.get())#1#
## 3>class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#q = queue.PriorityQueue(4)## 在设置的时候,元组的方式进行添加 如:(优先级,元素),## 优先级通过使用数字来表示,数字越小优先级越高##如果优先级一样,就会按照元素的ASCIll顺序进行输出,相同优先级的两个元素能够进行比较(同优先级的两个元素必须是同种类型的)##字典类型的东西不能进行比较#q.put((-10,1))#q.put((-10,3))#q.put((1,20))#q.put((2,'我'))#
##按照优先级进行输出#print(q.get())#(-10, 1)#print(q.get())#(-10, 3)#print(q.get())#(1, 20)#print(q.get())#(2, '我')#
#
## 这三队列是安全的,不存在多个线程抢占同一资源或数据的情况
线程三种队列的使用
线程池
submit的使用
源代码欣赏importthreadingimportosclassThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix=''): #初始化方法,设置线程池的最大线程数量
if max_workers is None:#线程池的默认设置
max_workers = (os.cpu_count() or 1) *5默认设置的线程数是CPU核数的5倍if max_workers <=0:raise ValueError("max_workers must be greater than 0")
self._max_workers=max_workers
self._work_queue=queue.Queue()
self._threads=set()
self._shutdown=False
self._shutdown_lock= threading.Lock()#创建线程锁
self._thread_name_prefix = (thread_name_prefix or("ThreadPoolExecutor-%d" %self._counter()))def submit(self, fn, *args, **kwargs):#创建一个线程,并异步提交任务
with self._shutdown_lock:ifself._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')
f=_base.Future()
w=_WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()returnf
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):#调整线程池的数量
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads=len(self._threads)if num_threads < self._max_workers: #创建线程的过程
thread_name = '%s_%d' % (self._thread_name_prefix orself,
num_threads)
t= threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon=True
t.start()
self._threads.add(t)
_threads_queues[t]=self._work_queuedef shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown=True
self._work_queue.put(None)ifwait:for t inself._threads:
t.join()
class ThreadPoolExecutor的源码欣赏
1.1 submit的基本使用
#常用基本方法#class ThreadPoolExecutor():#def submit(self, fn, *args, **kwargs):#创建一个线程,并异步提交任务#pass#def shutdown(self,wait=True):#相当于进程池中的p.close() 和p.join()#pass##wait = True ,等待池内所有任务执行完毕回收完资源后才继续##wait = False,立即返回,并不会等待池内的任务执行完毕## 但不管wait参数为何值,整个程序都会等到所有任务执行完毕## submit和map必须在shutdown之前#通过
#import time#import threading#from concurrent.futures import ThreadPoolExecutor#def func(i):#time.sleep(2)#print('%s打印的:'%(threading.get_ident()),)#return i*i#
#tpool = ThreadPoolExecutor(max_workers= 5)#
#t_lst = []#for i in range(5):#t = tpool.submit(func,i)#异步提交任务,与apply_async 相似,返回的也是一个结果对象#t_lst.append(t)#tpool.shutdown()#for a in t_lst:#print('>>',a.result())#获取
线程池submit的基本使用
1.2 map的基本使用
1.2.1 源码欣赏
def map(self, fn, *iterables, timeout=None, chunksize=1):if timeout is notNone:
end_time= timeout +time.time()
fs= [self.submit(fn, *args) for args in zip(*iterables)]def result_iterator():#生成器
try:
fs.reverse()whilefs:#Careful not to keep a reference to the popped future
if timeout isNone:yieldfs.pop().result()else:yield fs.pop().result(end_time -time.time())finally:for future infs:
future.cancel()return result_iterator()
map方法的源码欣赏
1.2.2 map的基本使用(验证使用过程)
#简单使用
from concurrent.futures importThreadPoolExecutorimportthreadingimportos,time,randomdeftask(n):print('%s is running'%(threading.get_ident()))#time.sleep(random.randint(1,2))
time.sleep(10)#测试for循环取值的时候,如果执行的子线程还没有执行完的时候的情况
return n**2
if __name__ == '__main__':
t_pool= ThreadPoolExecutor(max_workers=3)
s= t_pool.map(task,range(1,5))#map取代for + sumbit
print(s) #.result_iterator at 0x000000C536C2B0F8>
for i ins :print(i)#print([i for i in s])#
print('主程序结束')'''#前面4个瞬间就出来
7252 is running
3084 is running
7824 is running
.result_iterator at 0x000000AF18AAA2B0>
7252 is running #延迟大概10s后后面4个瞬间出来
1
4
9
16#延迟10s后两个瞬间出来
主程序结束'''
map的简单使用
1.3 submit回调函数的应用
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorfrom multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))
respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defparse_page(res):
res=res.result()print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']#p=Pool(3)
#for url in urls:
#p.apply_async(get_page,args=(url,),callback=pasrse_page)
#p.close()
#p.join()
p=ProcessPoolExecutor(3)for url inurls:
p.submit(get_page,url).add_done_callback(parse_page)#parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
回调函数的应用
1.4 线程与进程之间的性能测试
#进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势
#现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,#甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
from multiprocessing importProcessfrom threading importThreadimportthreadingimportos,timedefwork():
time.sleep(2)print('===>')if __name__ == '__main__':
l=[]print(os.cpu_count()) #本机为4核
start=time.time()for i in range(400):#p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上
p=Thread(target=work) #耗时2s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))## I/O密集型:多线程效率高
from multiprocessing importProcessfrom threading importThreadimportos,timedefwork():
res=0for i in range(100000000):
res*=iif __name__ == '__main__':
l=[]print(os.cpu_count()) #本机为4核
start=time.time()for i in range(4):
p=Process(target=work) #耗时5s多
p=Thread(target=work) #耗时18s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))#
## 计算密集型:多进程效率高#多线程用于IO密集型,如socket,爬虫,web#多进程用于计算密集型,如金融分析
线程与进程之家你的性能测试
1.5 线程的使用补充
#线程提供了一种便利的能够同时处理多个请求的高效的服务器#多线程服务器基本有着同样的体系结构, :主线程负责侦听请求的线程#当它收到一个请求的时候,一个新的工作者线程就会被建立起来,处理该客户端#的请求,当客户端断开连接时候,工作者线程会终止
#线程池被设计成一个线程同时只为一个客户服务,但是在服务结束之后#线程并不终止,线程池中的线程要么是事先全部建立起来,要么是在需要的时候被建立起来#在客户端断开连接的时候,线程并不终止,而是保持着,等待为更多的连接提供服务#
#线程池通常包含:#1.一个主要的侦听线程来接收和分派客户端的连接#2.一些工作者线程用来处理客户端请求#3.一个线程管理系统用来处理那些意外终止的线程#
python结束线程池正在运行的线程_python之线程与线程池相关推荐
- 本地方法栈线程公有_Java运行时区域,哪些区域是线程私有的?哪些是共有的?...
JVM 运行时数据区域大致可以分为:程序计数器.虚拟机栈.本地方法栈.堆区.元空间.运行时常量池.直接内存等区域:就是下面这个样子的: 其中有些区域,随着 JDK 版本的升级不断调整,例如: JDK ...
- linux主线程结束 子线程还能运行么,linux主线程和子线程
"读了三遍,愣是没读懂楼主想说啥.######@java_zf 我现在在做的一个项目用到了多线程,我就在项目中子线程调用的一个函数中加了个sleep.你可以写个简单的代码试一下,应该比较简单 ...
- python可以在多平台运行 体现了_Python:使用异常处理来判断运行的平台
try:importtermios, TERMIOS 1exceptImportError:try:importmsvcrt 2exceptImportError:try:from EasyDialo ...
- python 多进程 每个进程做不同功能实例_Python 多进程并发操作中进程池Pool的实例...
{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...
- 百度java的线程技术_自我提升(基础技术篇)——java线程简介
前言:虽然自己平时都在用多线程,也能完成基本的工作需求,但总觉得,还是对线程没有一个系统的概念,所以,查阅了一些资料,理解那些大神和官方的资料,写这么一篇关于线程的文章 本来想废话一番,讲讲自己的经历 ...
- python 中主线程结束 子线程还在运行么_「干货」python线程笔记
引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0.1秒,每条数据互不干扰.该如何执行才能花费时间最短呢? 在多线程(MT)编程出现之前 ...
- python 中主线程结束 子线程还在运行么_python 线程之一:线程的创建、启动及运行方式
threading:这个模块在较低级的模块 _thread 基础上建立较高级的线程接口 以后我们就用 threading 模块来管理线程就可以了. Tread 类:控制线程创建.启动及运行方式 一.线 ...
- python线程池模块_python并发编程之进程池,线程池,协程
需要注意一下 不能无限的开进程,不能无限的开线程 最常用的就是开进程池,开线程池.其中回调函数非常重要 回调函数其实可以作为一种编程思想,谁好了谁就去掉 只要你用并发,就会有锁的问题,但是你不能一直去 ...
- python结束线程_2018-01-02 如何优雅地终止python线程
前言 · 零 我们知道,在python里面要终止一个线程,常规的做法就是设置/检查 --->标志或者锁方式来实现的. 这种方式好不好呢? 应该是不大好的!因为在所有的程序语言里面,突然地终止一个 ...
最新文章
- 机器学习01-定义、线性回归、梯度下降
- java web常用权限方式,java web项目的几种权限控制方法
- oracle 输出重复记录,ORACLE 去除重复记录
- VTK:可视化算法之IronIsoSurface
- Xcode8.0 删除插件路径
- 机器学习(二十二)——推荐算法中的常用排序算法, Tri-training
- hosts ip 指向ip_【好玩的网络-第4期】DNS硬核科普,你是怎么上网的?DNS里都有啥?传说中的hosts文件又是啥?...
- 为什么在2012/2013年我将在新的Enterprise Java项目中继续使用Spring *和* Java EE
- oracle中的merge into用法解析
- 北方华创:「8大报表应用场景」助力全方位数字化管理
- Windows自带渗透工具Certutil介绍(免杀、哈希计算、md5、sha256、下载文件、base64编码)
- No rule to make target ...
- docker 网桥冲突了解决
- git clone 拉取github上面的代码报错:fatal: Authentication failed for xxx解决
- php in_array性能优化
- Javascript 编程风格
- Eclipse —— 官网下载地址
- 在php里面加音乐,如何给视频添加音乐 视频配上背景音乐
- JSON.stringify初步使用
- 微信小程序原生实现日历功能
热门文章
- Codeforces Round #479 (Div. 3)【完结】
- 1003 Emergency (25 分)【难度: 中等 / 知识点: 变种的Dijkstra】
- VScode的基础设置
- 80C51单片机的最小系统
- 【Vue.js】vue用户登录功能
- Spring boot的@Value注解
- jQuery的入口函数
- python爬取地图上的经纬度_Python调用百度地图API爬取经纬度
- c#连接远程sqlserver2008_C#用代码控制网络断开与重连
- 从最新的编程语言排行看,Java真的要凉了吗?