一、线程基础以及守护进程

线程是CPU调度的最小单位

全局解释器锁

全局解释器锁GIL(global interpreter lock)

全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准。

全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行。

GIL锁每执行700条指令才会进行一次(轮转)切换(从一个线程切换到另外一个线程)

节省的是IO操作(不占用CPU)的时间,而不是CPU计算的时间,因为CPU的计算速度非常快,大多数情况下,我们没有办法把一条进程中所有的IO操作都规避掉。

threading模块

import time
from threading import Thread, current_thread, enumerate, active_countdef func(i):print('start%s' % i, current_thread().ident)  # 函数中获取当前线程idtime.sleep(1)print('end%s' % i)if __name__ == '__main__':t1 = []for i in range(3):t = Thread(target=func, args=(i,))t.start()print(t.ident)  # 查看当前线程idt1.append(t)print(enumerate(), active_count())for t in t1:t.join()
print('所有线程执行完毕')

线程是不能从外部强制终止(terminate),所有的子线程只能是自己执行完代码之后就关闭。

current_thread 获取当前的线程对象

current_thread().ident 或者 线程对象.ident 获取当前线程id。

enumerate返回一个列表,存储了所有活着的线程对象,包括主线程

active_count返回一个数字,存储了所有活着的线程个数。

【注意】enumerate导入之后,会和内置函数enumerate重名,需要做特殊的处理

  • from threading import enumerate as en

  • import threading

    threading.enumerate()

面向对象方式开启一个线程

from threading import Threadclass MyThread(Thread):def __init__(self, a, b):super(MyThread, self).__init__()self.a = aself.b = bdef run(self):print(self.ident)t = MyThread(1, 3)
t.start()  # 开启线程,才在线程中执行run方法
print(t.ident)

线程之间的数据是共享的

from threading import Threadn = 100def func():global nn -= 1t_li = []
for i in range(100):t = Thread(target=func)t.start()t_li.append(t)
for t in t_li:t.join()
print(n)

结果是:0

守护线程

  • 主线程会等待子线程结束之后才结束,为什么?

因为主线程结束,进程就会结束。

  • 守护线程随着主线程的结束而结束
  • 守护进程会随着主进程的代码结束而结束,如果主进程代码之后还有其他子进程在运行,守护进程不守护。
  • 守护线程会随着主线程的结束而结束,如果主线程代码结束之后还有其他子线程在运行,守护线程也守护。
import time
from threading import Threaddef son():while True:print('in son')time.sleep(1)def son2():for i in range(3):print('in son2...')time.sleep(1)# flag a
t = Thread(target=son)
t.daemon = True
t.start()
# flag b a-->b用时0s
Thread(target=son2).start()

为什么守护线程会在主线程的代码结束之后继续守护其他子线程?

答:因为守护进程和守护线程的结束原理不同。守护进程需要主进程来回收资源,守护线程是随着主线程的结束而结束,其他子线程–>主线程结束–>主进程结束–>整个进程中所有的资源都被回收,守护线程也会被回收。

二、线程锁(互斥锁)

线程之间也存在数据不安全

import disa = 0def func():global aa += 1dis.dis(func)  # 得到func方法中的代码翻译成CPU指令
"""
结果
0 LOAD_GLOBAL              0 (a)
2 LOAD_CONST               1 (1)
4 INPLACE_ADD
6 STORE_GLOBAL             0 (a)
8 LOAD_CONST               0 (None)
10 RETURN_VALUE
"""

+=、-=、*=、/=、while、if、带返回值的方法(都是先计算后赋值,前提要涉及到全局变量或静态变量) 等都是数据不安全的,append、pop、queue、logging模块等都是数据安全的。

列表中的方法或者字典中的方法去操作全局变量的时候,数据是安全的。

只有一个线程,永远不会出现线程不安全现象。

采用加锁的方式来保证数据安全。

from threading import Thread, Lockn = 0def add(lock):for i in range(500000):global nwith lock:n += 1def sub(lock):for i in range(500000):global nwith lock:n -= 1t_li = []
lock = Lock()
for i in range(2):t1 = Thread(target=add, args=(lock,))t1.start()t2 = Thread(target=sub, args=(lock,))t2.start()t_li.append(t1)t_li.append(t2)
for t in t_li:t.join()
print(n)

线程安全的单例模式

import time
from threading import Thread, Lockclass A:__instance = Nonelock = Lock()def __new__(cls, *args, **kwargs):with cls.lock:if not cls.__instance:time.sleep(0.00001)cls.__instance = super().__new__(cls)return cls.__instancedef func():a = A()print(a)for i in range(10):Thread(target=func).start()

不用考虑加锁的小技巧

  • 不要操作全局变量
  • 不要在类中操作静态变量

因为多个线程同时操作全局变量/静态变量,会产生数据不安全现象。

三、线程锁(递归锁)

from threading import Lock, RLock# Lock 互斥锁
# RLock 递归(recursion)锁l = Lock()
l.acquire()
print('希望被锁住的代码')
l.release()rl = RLock()  # 在同一个线程中可以被acquire多次
rl.acquire()
rl.acquire()
rl.acquire()
print('希望被锁住的代码')
rl.release()
from threading import Thread, RLockdef func(i, lock):lock.acquire()lock.acquire()print(i, ':start')lock.release()lock.release()print(i, ':end')lock = RLock()
for i in range(5):Thread(target=func, args=(i, lock)).start()

互斥锁与递归锁

递归锁在同一个线程中可以被acquire多次,而互斥锁不行

互斥锁效率高,递归锁效率相对低

多把互斥锁容易产生死锁现象,递归锁可以快速解决死锁

四、死锁

死锁:指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象。

死锁现象是怎么产生的?

答:有多把锁,并且在多个线程中交叉使用。与互斥锁、递归锁无关,都会发生死锁。如果是互斥锁,出现了死锁现象,最快速的解决方案是把所有的互斥锁都改成一把递归锁(noodle_lock = fork_lock = RLock()),程序的效率会降低。

from threading import Thread, Lock
import time
noodle_lock = Lock()
fork_lock = Lock()def eat1(name):noodle_lock.acquire()print(name, '抢到面了')fork_lock.acquire()print(name, '抢到叉子了')print(name, '吃面')time.sleep(0.0001)fork_lock.release()print(name, '放下叉子了')noodle_lock.release()print(name, '放下面了')def eat2(name):fork_lock.acquire()print(name, '抢到叉子了')noodle_lock.acquire()print(name, '抢到面了')print(name, '吃面')noodle_lock.release()print(name, '放下面了')fork_lock.release()print(name, '放下叉子了')Thread(target=eat1, args=('lucy',)).start()
Thread(target=eat2, args=('jack',)).start()
Thread(target=eat1, args=('rose',)).start()
Thread(target=eat2, args=('disen',)).start()

五、队列

队列:线程之间数据安全的容器

线程队列:数据安全,先进先出

原理:加锁 + 链表

Queue

fifo 先进先出的队列

get和put
import queueq = queue.Queue(3)  # fifo 先进先出的队列q.put(1)
q.put(2)
print(q.get())
print(q.get())
1
2
get_nowait
import queue# from queue import Empty  # 不是内置的错误类型,而是queue模块中的错误
q = queue.Queue()  # fifo 先进先出的队列
try:q.get_nowait()
except queue.Empty:pass
print('队列为空,继续执行其他代码')
put_nowait

用的很少,因为队列满时,抛异常,数据放不进去,丢失了。

LifoQueue

后进先出的队列,也就是栈。last in first out

from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
print(lq.get())
print(lq.get())
2
1

PriorityQueue

优先级队列,按照放入数据的第一位数值从小到大输出

from queue import PriorityQueuepriq = PriorityQueue()
priq.put((2, 'lucy'))
priq.put((0, 'rose'))
priq.put((1, 'jack'))
print(priq.get())
print(priq.get())
print(priq.get())
(0, 'rose')
(1, 'jack')
(2, 'lucy')

三种队列使用场景

先进先出:用于处理服务类任务(买票任务)

后进先出:算法中用的比较多

优先级队列:比如,VIP制度,VIP用户优先;

六、相关面试题

请聊聊进程队列的特点和实现原理

特点:实现进程之间的通信;数据安全;先进先出。

实现原理:基于管道 + 锁 实现的,管道是基于文件级别的socket + pickle 实现的。

你了解生产者消费者模型吗,如何实现

了解

为什么了解?工作经验

 采集图片/爬取音乐:由于要爬取大量的数据,想提高爬取效率有用过一个生产者消费者模型,这个模型是我自己写的,消息中间件,用的是xxx(redis),我获取网页的过程作为生产者,分析网页,获取所有歌曲歌曲链接的过程作为消费者。自己写监控,或者是自己写邮件报警系统,监控程序作为生产者,一旦发现有问题的程序,就需要把要发送的邮件信息交给消息中间件redis,消费者就从中间件中取值,然后来处理发邮件的逻辑。

什么时候用过?

 项目 或者 例子,结合上面一起

在python中实现生产者消费者模型可以用哪些机制

 消息中间件celery(分布式框架):定时发短信的任务

从你的角度说说进程在计算机中扮演什么角色

进程用来管理一个运行中的程序的资源,是资源分配的最小单位

进程与进程之间内存是隔离的

进程是由操作系统负责调度的,并且多个进程之间是一种竞争关系,所以我们应该对进程的三状态时刻关注,尽量减少进程中的IO操作,或者在进程里面开线程来规避IO,让我们写的程序在运行的时候能够更多的占用CPU资源。

为什么线程之间的数据不安全

线程之间数据共享

多线程的情况下,

 如果在计算某一个变量的时候,还要进行赋值操作,这个过程不是由一条完整的CPU指令完成的;如果在判断某个bool表达式之后,再做某些操作,这个过程也不是由一条完整的CPU指令完成的;在中间发生了GIL锁的切换(时间片的轮转),可能会导致数据不安全。

读程序,请确认执行到最后number的长度是否一定为 1

import threading
import time# loop = 1E7  # 10000000.
loop = int(1E7)  # 10000000def _add(loop: int = 1):global numbersfor _ in range(loop):numbers.append(0)def _sub(loop: int = 1):global numbersfor _ in range(loop):while not numbers:time.sleep(1E-8)numbers.pop()numbers = [0]
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))
# ts1 = threading.Thread(target=_sub, args=(loop,))ta.start()
ts.start()
# ts1.start()ta.join()
ts.join()
# ts1.join()

因为只开启了一个进行pop操作的线程,如果开启多个pop操作的线程,必须在while前面加锁,因为可能有两个线程,一个执行了while not numbers,发生了GIL的切换,另外一个线程执行完了代码,numbers刚好没有了数据,导致结果一个pop成功,一个pop不成功。

所以number长度一定为1,如果把注释去了,不一定为1

读程序,请确认执行到最后number的长度是否一定为 1

import threading
import timeloop = int(1E7)def _add(loop: int = 1):global numbersfor _ in range(loop):numbers.append(0)def _sub(loop: int = 1):global numbersfor _ in range(loop):while not numbers:time.sleep(1E-8)numbers.pop()numbers = [0]
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))ta.start()
ta.join()
ts.start()
ts.join()

一定为1,因为是同步的。

读程序,请确认执行到最后number是否一定为 0

import threadingloop = int(1E7)def _add(loop: int = 1):global numbersfor _ in range(loop):numbers += 1def _sub(loop: int = 1):global numbersfor _ in range(loop):numbers -= 1numbers = 0
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))ta.start()
ta.join()
ts.start()
ts.join()

一定等于0,因为是同步的。

读程序,请确认执行到最后number是否一定为 0

import threadingloop = int(1E7)def _add(loop: int = 1):global numbersfor _ in range(loop):numbers += 1def _sub(loop: int = 1):global numbersfor _ in range(loop):numbers -= 1numbers = 0
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))ta.start()
ts.start()
ta.join()
ts.join()

不一定为0,因为是异步的且存在 += 操作

七、判断数据是否安全

是否数据共享,是同步还是异步(数据共享并且异步的情况下)

  • +=、-=、*=、/=、a = 计算之后赋值给变量
  • if、while 条件,这两个判断是由多个线程完成的

这两种情况下,数据不安全。

八、进程池 & 线程池

以前,有多少个任务就开多少个进程或线程。

什么是池

要在程序开始的时候,还没有提交任务,先创建几个线程或者进程,放在一个池子里,这就是池

为什么要用池

如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了;并且开好的进程/线程会一直存在在池中,可以被多个任务反复利用,这样极大的减少了开启/关闭/调度进程/调度线程的时间开销。

池中的线程/进程个数控制了操作系统需要调用的任务个数,控制池中的单位,有利于提高操作系统的效率,减轻操作系统的负担。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor# threading模块 没有提供池
# multiprocessing模块 仿照threading增加了Pool(逐渐被淘汰)
# concurrent.futures模块 线程池,进程池都能够用相似的方式开启/使用
ThreadPoolExecutor()  # 参数代表开启多少个线程,线程的个数一般起cpu个数*4(或者*5)
ProcessPoolExecutor()  # 参数代表开启多少个进程,进程的个数一般起cpu个数+1

创建线程池并提交任务

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import timedef func(a, b):print(current_thread().ident, a, b)time.sleep(1)tp = ThreadPoolExecutor(4)  # 创建线程池对象
for i in range(20):# tp.submit(func, i, i + 1)# 向池中提交任务tp.submit(func, a=i, b=i + 1)  # 位置传参,关键字传参都可以

创建进程池并提交任务

from concurrent.futures import ProcessPoolExecutor
import os
import timedef func(a, b):print(os.getpid(), 'start', a, b)time.sleep(1)print(os.getpid(), 'end', a, b)if __name__ == '__main__':tp = ProcessPoolExecutor(4)  # 创建进程池对象for i in range(20):# tp.submit(func, i, i + 1)# 向池中提交任务tp.submit(func, a=i, b=i + 1)  # 位置传参,关键字传参都可以

获取任务结果

from concurrent.futures import ProcessPoolExecutor
import os
import timedef func(a, b):print(os.getpid(), 'start', a, b)time.sleep(1)print(os.getpid(), 'end', a, b)return a * bif __name__ == '__main__':tp = ProcessPoolExecutor(4)  # 创建进程池对象future_d = {}for i in range(20):  # 异步非阻塞的ret = tp.submit(func, a=i, b=i + 1)  # future未来对象# print(ret)  # <Future at 0x1ad918e1148 state=running># print(ret.result())  # 这样需要等待,同步的future_d[i] = retfor key in future_d:  # 同步阻塞的print(key, future_d[key].result())

tp对象的map

map 只适合传递简单的参数,并且必须是一个可迭代的类型

from concurrent.futures import ProcessPoolExecutor
import os
import timedef func(a):print(os.getpid(), 'start', a[0], a[1])time.sleep(1)print(os.getpid(), 'end', a[0], a[1])return a[0] * a[1]if __name__ == '__main__':tp = ProcessPoolExecutor(4)ret = tp.map(func, ((i, i + 1) for i in range(20)))  # 一般函数只接收一个参数,要想传入多个,使用元组方式for r in ret:print(r)

回调函数

当有一个结果需要进行处理时,都会绑定一个回调函数来处理,除非是得到所有结果之后才做处理,我们使用 把结果存入列表 遍历列表 的方式。

回调函数效率最高的。

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import timedef func(a, b):print(current_thread().ident, 'start', a, b)time.sleep(1)print(current_thread().ident, 'end', a)return a * bif __name__ == '__main__':tp = ThreadPoolExecutor(4)future_d = {}for i in range(20):  # 异步非阻塞的ret = tp.submit(func, a=i, b=i + 1)future_d[i] = retfor key in future_d:  # 同步阻塞的print(key, future_d[key].result())

上述代码,打印结果是按照顺序(0,1,2,3……),并不是谁先结束就打印谁。

使用回调函数以后,谁先执行完就打印谁,代码如下:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import timedef func(a, b):print(current_thread().ident, 'start', a, b)time.sleep(1)print(current_thread().ident, 'end', a)return a, a * bdef print_func(ret):  # 异步阻塞 每个任务都是各自阻塞各自,谁先执行完谁先打印print(ret.result())if __name__ == '__main__':tp = ThreadPoolExecutor(4)for i in range(20):  # 异步非阻塞的ret = tp.submit(func, a=i, b=i + 1)  # [ret0, ret1, ..., ret19]ret.add_done_callback(print_func)  # 异步阻塞 [print_func, print_func,...,print_func]# 回调机制# 回调函数 给ret对象绑定一个回调函数,等待ret对应的任务有了结果之后立即调用print_func函数# 就可以对结果立即进行处理,而不用按照顺序接收结果处理结果

ret这个任务会在执行完毕的瞬间立即触发print_func函数,并且把任务的返回值对象传递到print_func做参数。

回调函数的例子

from concurrent.futures import ThreadPoolExecutor
import requestsdef get_page(url):  # 访问网页,获取网页源代码,用线程池中的线程来操作respone = requests.get(url)if respone.status_code == 200:return {'url': url, 'text': respone.text}def parse_page(res):  # 获取到字典结果之后,计算网页源代码的长度,把'https://www.baidu.com : 长度值'写到文件里,线程任务执行完毕之后绑定回调函数res = res.result()parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))with open('db.txt', 'a') as f:f.write(parse_res)if __name__ == '__main__':urls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://www.tencent.com/zh-cn','http://www.sina.com.cn/']tp = ThreadPoolExecutor(4)for url in urls:ret = tp.submit(get_page, url)ret.add_done_callback(parse_page)  # 谁先回来谁就先把结果写进文件# 不用回调函数:# 按照顺序获取网页,baidu python openstack tencent sina# 也只能按照顺序写
# 用上了回调函数# 按照顺序获取网页,baidu python openstack tencent sina# 哪一个网页先返回结果,就先执行哪个网页对应的回调函数(parse_page)

进程池线程池的应用场景

进程池:

场景:高计算的场景,没有IO操作(没有文件操作,没有数据库操作,没有网络操作,没有input);

进程的个数:[cpu_count*1, cpu_count*2]

线程池:

场景:爬虫

线程的个数:一般根据IO的比例定制,cpu_count*5

Python并发编程之线程的玩法相关推荐

  1. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  2. Python并发编程之线程池/进程池

    引言 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我 ...

  3. python 并发编程 多线程 目录

    线程理论 python 并发编程 多线程 开启线程的两种方式 python 并发编程 多线程与多进程的区别 python 并发编程 多线程 Thread对象的其他属性或方法 python 并发编程 多 ...

  4. python网络编程基础(线程与进程、并行与并发、同步与异步、阻塞与非阻塞、CPU密集型与IO密集型)...

    python网络编程基础(线程与进程.并行与并发.同步与异步.阻塞与非阻塞.CPU密集型与IO密集型) 目录 线程与进程并行与并发同步与异步阻塞与非阻塞CPU密集型与IO密集型 线程与进程 进程 前言 ...

  5. Python并发编程理论篇

    Python并发编程理论篇 前言 很多人学习python,不知道从何学起. 很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手. 很多已经做案例的人,却不知道如何去学习更加高深的知识 ...

  6. 深入浅出讲解Python并发编程

    微信公众号:运维开发故事,作者:素心 Python并发编程 本文比较长,绕的也比较快,需要慢慢跟着敲代码并亲自运行一遍,并发编程本身来说就是编程里面最为抽象的概念,单纯的理论确实很枯燥,但这是基础,基 ...

  7. python并发编程之协程

    python并发编程之协程 1.协程: 单线程实现并发 在应用程序里控制多个任务的切换+保存状态 优点: 应用程序级别速度要远远高于操作系统的切换 缺点: 多个任务一旦有一个阻塞没有切,整个线程都阻塞 ...

  8. java并发编程与线程安全

    2019独角兽企业重金招聘Python工程师标准>>> 什么是线程安全 如果对象的状态变量(对象的实例域.静态域)具有可变性,那么当该对象被多个线程共享时就的考虑线程安全性的问题,否 ...

  9. python并发编程调优_Python并发编程-并发解决方案概述

    Python并发编程-并发解决方案概述 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.并发和并行区别 1>.并行(parallel) 同时做某些事,可以互不干扰的同一个时 ...

最新文章

  1. Python 初学者进阶的九大技能(附代码)
  2. WebService 学习之路(一):了解并使用webService
  3. Attachment assignment block里选择的文件是如何传到application server
  4. 使用C#体验函数式编程之——Partial application(局部应用)
  5. 解决Office系列安装不上的办法
  6. LNK2019 无法解析的外部符号 __imp_CommandLineToArgvW,该符号在函数 WinMain 中被引用
  7. Linux中出现 -bash: unzip: command not found
  8. git clone github源码 下载速度很慢的解决方法
  9. A simple BBS demo including(CRUD) - 1
  10. 出现在嵌入式DSP上可用于实现各种编解码器
  11. Weblogic安装部署步骤
  12. windows下基于selenium保存网页为图片
  13. mac系统克隆不能启动_如何制作Mac硬盘的可启动克隆
  14. 打开win10应用商店,提示管理员已阻止这个应用
  15. 面试中常考的数学题——截木棍、圆上取点、赛马、红蓝墨水,测试毒药、坐到正确座位问题
  16. c语言产生式系统动物识别系统,简单动物识别系统的知识表示实验报告
  17. JZOJ3426. 【NOIP2013模拟】封印一击 (2017.8B组)
  18. matlab显示英文字母,#EXCEL函数判断是数字还是字母#excel表格列显示字母
  19. Day02 - CSS
  20. erc20 php,无需gas即可归集ERC20的PHP开发包【SmartWallet】

热门文章

  1. 跨境电商属于外贸吗,Starday跨境电商靠谱吗?
  2. 茂名天源石化宣传“世界急救日”活动 普及急救知识
  3. 物体长度测量---------C#+Emgucv
  4. Python操作excel基础
  5. CoinUp开启GameFi新世界—魔法元世界(MAC)
  6. 好未来谢华亮:AI 在教育行业中的应用
  7. 查看英伟达Nvidia显卡、cuda版本
  8. 英语计算机单词怎么记,英语单词快速记忆法
  9. 有趣的动态壁纸如何制作
  10. OFD格式文如何打开,可以转成PDF吗?