1.进程和线程的概念

计算机有5大基本组成部分,运算器,控制器,存储器,输入和输出。运算器和控制器封装到一起,加上寄存器组和cpu内部总线构成中央处理器(CPU)。cpu的根本任务,就是执行指令,对计算机来说,都是0,1组成的序列,cpu从逻辑上可以划分为3个模块:控制单元、运算单元和存储单元。这三个部分由cpu总线连接起来。

CPU的运行原理就是:控制单元在时序脉冲的作用下,将指令计数器里所指向的指令地址(这个地址是在内存里的)送到地址总线上去,然后CPU将这个地址里的指令读到指令寄存器进行译码。对于执行指令过程中所需要用到的数据,会将数据地址也送到地址总线,然后CPU把数据读到CPU的内部存储单元(就是内部寄存器)暂存起来,最后命令运算单元对数据进行处理加工。周而复始,一直这样执行下去。

并发:并发当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状。.这种方式我们称之为并发(Concurrent)。

并行:当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。

1.1并发和并行的区别

  • 并发性:又称共行性,是指处理多个同时性活动的能力,
  • 并行:指同时发生两个并发事件,具有并发的含义。并发不一定并行,也可以说并发事件之间不一定要同一时刻发生。
  • 并行指两个或两个以上事件或活动在同一时刻发生,在多道程序环境下,并行使多个程序同一时刻可在不同CPU上同时执行。
  • 并发是在同一个cpu上同时(不是真正的同时,而是看来是同时,因为CPU要在多个程序之间切换)运行多个程序。
  • 并行是每一个CPU运行一个程序。
  • 并发是指一个处理器同时处理多个任务。
  • 并行是指多个处理器或者是多核的处理器同时处理多个不同的任务。
  • 并发是逻辑上的同时发生(simultaneous),而并行是物理上的同时发生。
  • 在处理器层面的理解:并行需要两个或两个以上的线程跑在不同的处理器上,并发可以跑在一个处理器上通过时间片进行切换。
  • 在核层面的理解:并行需要两个或两个以上的线程跑在不同的核上,并发可以跑在一个核上通过时间片进行切换。【可以忽略这个,会比较绕】
  • 并发不是同时发生,并行是同时发生。

再次强调:

  • 并行是指两个或者多个事件在同一时刻发生;而并发是指两个或多个事件在同一时间间隔发生。
  • 并行是在不同实体上的多个事件,并发是在同一实体上的多个事件。

1.2理清多核、多CPU与多线程、多进程的对应关系

  • 进程是资源分配的最小单位,一个程序有至少一个进程。线程是程序执行的最小单位。一个进程有至少一个线程。
  • 进程有自己的独立地址空间,每启动一个进程,系统就会为它分配地址空间,建立数据表来维护代码段、堆栈段和数据段,这种操作非常昂贵。而线程是共享进程中的数据的,使用相同的地址空间,因此CPU切换一个线程的花费远比进程要小很多,同时创建一个线程的开销也比进程要小很多。
  • 线程之间的通信更方便,同一进程下的线程共享全局变量、静态变量等数据,而进程之间的通信需要以通信的方式(IPC)进行。不过如何处理好同步与互斥是编写多线程程序的难点。
  • 多cpu的运行,对应进程的运行状态;多核cpu的运行,对应线程的运行状态。
  • 单CPU中进程只能是并发,多CPU计算机中进程可以并行。
  • 单CPU单核中线程只能并发,单CPU多核中线程可以并行。
  • 一个进程中可以有多条执行路径同时执行,一个线程就是进程中的一条执行路径。

1.3多进程(multi-processing) 和多线程(multi-threading)

  • 多进程是各个并行任务之间“不使用“共同的内存空间;
  • 而多线程的各个并行任务”使用“共同的内存空间。

多进程

  • 优点:独立内存空间;实现代码直观简单;充分利用多核多CPU;避免全局解释器锁的限制;
  • 缺点:无法实现对象和内容共享;需要较大的内存空间

多线程

  • 优点:轻量,需要的额外内存较小;共享内存,方便访问;对于CPython解释器,可以通过全局解释器锁使用C扩展;适合I/O密集型任务
  • 缺点:全局解释器锁的限制;并行任务不能杀掉;实现代码较为复杂

1.4全局解释器锁GIL

Python 全局解释器锁或GIL,简单来说,是一个互斥锁(或锁),它只允许一个线程控制 Python 解释器。这意味着在任何时间点都只能有一个线程处于执行状态。执行单线程程序的开发人员看不到 GIL 的影响,但它可能是 CPU 密集型和多线程代码中的性能瓶颈。由于即使在具有多个 CPU 内核的多线程架构中,GIL 也只允许一次执行一个线程,因此 GIL 被称为 Python 的“臭名昭著”的特性,但是它确实为Python内存处理提供了方便。。

如果一个对象同时被多个线程来引用,那么引用计数可能同时增加或减少,每个线程按照自己的方式进行计数,对象在整个内存中的引用变得十分混乱,很容易造成内存泄漏或者其他很多不可预见的Bug。一个解决办法给就是每个线程都给引用计数加一个锁,阻止别人修改,不过这样会造成锁死现象(比如一个对象有多个锁时),另外,大量的资源会浪费在加锁解锁的过程了,严重拖慢了程序运行速度。

这个时候,就需要一个统一来管理引用计数的机制,以确保对象引用计数准确、安全。全局解释器锁就是用来处理这种情况的。它既避免了不同线程带来的引用计数混乱,又避免了过多线程锁带来的死锁和运行效率低的问题。虽然全局解释器锁解决了对象引用计数的问题,但随之而来的是,很多CPU密集型任务在全局解释器锁的作用下,实际上变成了单线程,不能充分发挥CPU的算力,影响程序速度。

全局解释器锁并不是Python独有的,其他一些语言,比如Ruby也存在全局解释器锁。还有一些语言没有使用引用计数的方式来管理内容,而是使用垃圾回收机制(GC)来管理内存。虽然这样避免了全局解释器锁,但是在单线程处理上,GC并不占有优势。

历史发展

Python在设计之初,就选择了全局解释器锁用来管理内存引用计数,在当时,操作系统还没有线程的概念,所以全局解释器锁并没有带了弊端,反而给开发者带来了很多方便。很多Python的扩展都是使用C语言库来编写的,这些C编写的扩展需要全局解释器锁来确保线程的内存安全。即便是有些C语言库内存处理上不是很安全,那么在Python中全局解释器锁的作用下,也能很好的发挥作用。

所以,全局解释器锁对早期使用CPython做解释器的开发者来说,解决了很多内存管理的问题。

全局解释器锁对多线程的影响

首先应该区分不同性质的任务,有一些是CPU密集型的任务,有一些是I/O密集型的任务。

  • CPU密集型的任务在最大程度上使用了CPU,比如数学矩阵的计算、图像处理、文件解压缩等。【一般使用多进程应对】
  • I/O密集型任务在需要花费很多时间来等待信息的输入和输出(读写),比如从网址下载内容,大量访问磁盘进行读写等。【一般使用多线程应对】

全局解释器锁存在于Cpython的解释器中,如果你使用其他的Python解释器,比如Jython(使用Java编写), IronPython(使用C#编写)和PyPy(使用Python编写),可能不会遇到全局解释器锁的情况。

1.5 线程与进程

1.5.1进程

进程就是运行着的程序。写的python程序(或者其他应用程序比如画笔、qq等),运行起来,就称之为一个进程;在windows下面打开任务管理器,里面显示了当前系统上运行着的进程。这些程序还没有运行的时候,它们的程序代码文件存储在磁盘中,就是那些扩展名为 .exe 文件。双击它们,这些 .exe 文件就被os加载到内存中,运行起来,成为进程。

1.5.2线程

而系统中每个进程里面至少包含一个 线程 。线程是操作系统创建的,每个线程对应一个代码执行的数据结构,保存了代码执行过程中的重要的状态信息。没有线程,操作系统没法管理和维护 代码运行的状态信息。所以没有创建线程之前,操作系统是不会执行我们的代码的。

前面写的Python程序,里面虽然没有创建线程的代码,但实际上,当Python解释器程序运行起来(成为一个进程),OS就自动的创建一个线程,通常称为主线程,在这个主线程里面执行代码指令。当解释器执行我们python程序代码的时候。 我们的代码就在这个主线程中解释执行。

1.5.3进程池

进程池:可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。

使用进程池的优点

  • 提高效率,节省开辟进程和开辟内存空间的时间及销毁进程的时间
  • 节省内存空间

当需要创建的⼦进程数量不多时, 可以直接利⽤multiprocessing.Process动态生成多个进程, 但如果要创建很多进程时,⼿动创建的话⼯作量会非常大,此时就可以⽤到multiprocessing模块提供的Pool去创建一个进程池。

multiprocessing.Pool常⽤函数:

  • apply_async(func, args, kwds):使⽤⾮阻塞⽅式调⽤func(任务并⾏执⾏),args为传递给func的参数列表,kwds为传递给func的关键字参数列表
  • apply(func, args, kwds):使⽤阻塞⽅式调⽤func,必须等待上⼀个进程执行完任务后才能执⾏下⼀个进程,了解即可,几乎不用
  • close():关闭Pool,使其不再接受新的任务
  • terminate():不管任务是否完成,⽴即终⽌
  • join():主进程阻塞,等待⼦进程的退出,必须在close或terminate之后使⽤

初始化Pool时,可以指定⼀个最⼤进程数,当有新的任务提交到Pool中时,如果进程池还没有满,那么就会创建⼀个新的进程⽤来执⾏该任务,但如果进程池已满(池中的进程数已经达到指定的最⼤值),那么该任务就会等待,直到池中有进程结束才会创建新的进程来执⾏。

import os
import time
from multiprocessing import Pooldef func(n):print('start func%s'%n,os.getpid())time.sleep(1)print('end func%s'%n, os.getpid())if __name__ == '__main__':p = Pool(4)for i in range(10):p.apply_async(func,args=(i,))p.close()   #结束进程池接收任务p.join()    #感知进程池中任务执行结束

1.5.4线程池

线程池只有在py3.2才内置的,2版本只能自己维护一个线程池很麻烦。在Python3.2中的concurrent_futures,其可以实现线程池,进程池,不必再自己使用管道传数据造成死锁的问题。并且这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能,但是平时用的最多的还是用来构建线程池和进程池。

在线程池中,主线程可以获取任意一个线程的状态以及返回结果,并且当一个线程完成后,主线程能立即得到结果。

1.5.5 同步和异步

  • 同步,就是调用某个东西时,调用方得等待这个调用返回结果才能继续往后执行。
  • 异步,和同步相反调用方不会等待得到结果,而是在调用发出后调用者可用继续执行后续操作,被调用者通过状体来通知调用者,或者通过回掉函数来处理这个调用

1.6 多线程要解决的问题

比如,我们需要到 前程无忧 网站 抓取 python 开发相关的职位信息。

要抓取几百个网页的内容, 执行这些抓取信息的任务的代码,时间主要耗费在等待网站返回信息上面。 等待信息返回的时候CPU是空闲的。如果我们像以前那样 在一个线程里面,用一个循环 依次 获取100个网页的信息,如下:

# 抓取 网页的职位信息
def  grabOnePage(url):print('代码发起请求,抓取网页信息,具体代码省略')for pageIdx in range(1,101):url = f'https://search.51job.com/list/020000,000000,0000,00,9,99,python,2,{pageIdx}.html'grabOnePage(url)

就会有很长的时间耗费在 等待服务器返回信息上面。

如果我们能用100个线程,同时运行 获取网页信息的代码, 理论上,可以100倍的减少执行时间。

2.队列(线程间通信)

在一个进程中,不同子线程负责不同的任务,t1子线程负责获取到数据,t2子线程负责把数据保存的本地,那么他们之间的通信使用Queue来完成。因为再一个进程中,数据变量是共享的,即多个子线程可以对同一个全局变量进行操作修改,Queue是加了锁的安全消息队列。

Python的Queue队列,主要用于多生产者和消费者模式下的队列实现,特别适合多线程时的消息交换。通过使用队列,把生产者和消费者分解开来,作为其中的中间件,比如生产者产生一个数据,然后放到queue队列中,queue队列在把这个数据放到消费者线程中。使用单线程不必用队列,但是队列对于多线程来说是不可或缺的。它实现了常见的锁语法,临时阻塞线程,防止竞争,这有赖于Python对线程的支持。

1.导入类库
import queue2.创建Queue对象
q = queue.Queue()3.添加元素
q.put(item)4.获取元素
item = q.get()5.查询状态
#查看元素的状态
q.qsize()
#判断是否为空
q.empty()
# 判断是否已满
q.full()

Queue三种模式

  • FIFO模式:
    FIFO模式就是管道模式(先进先出),就像一根管道一样,数据一个一个的进去,然后一个一个的出来,你可以设置这个管道运行进入数据的个数,然后数据进满了之后,一个一个数据出来,出来后又进数据。
  • LIFO模式:
    LIFO模式是后进先出模式,也就是堆栈,元素只能在栈顶堆入。就是后进的数据先出去,先进的数据最后出去。递归函数就是用的堆栈。
  • 优先级队列:
    优先级队列的每个数据都带有一个优先值,优先值越小的越早出去,优先值相同的先放入队列的先出去。
"""
FIFO模式使用方法:【先进先出,常用】
maxsize表示队列中最多有多少个数据,数据堆满了后,堵塞队列,然后数据一个一个的出去。
如果设置0表示队列的元素不设上限,可以有无数个数据,但是需要当心内存溢出。
"""
import queue
q1=queue.Queue(maxsize=0)
"""
LIFO模式使用方法
这里的maxsize和上面的意义一样,但是LifoQueue()是先进后出模式,也就是说先进去的最后出来,有点像倒序的意思。
"""
q2=queue.LifoQueue(maxsize=0)
"""
优先级模式使用方法
优先级模式,这里的元素优先顺序是按照sorted(list(entries))[0]的接过来定义的,而元素的结构形式通常是(priority_number,data)这样的元祖。"""q3=queue.PriorityQueue(maxsize=0)

2.1 放入数据与取出数据

# 首先确定使用的模式,内部最多存储10个元素
q=queue.Queue(maxsize=10)
x = 10
"""
# 这里放入一个数据x,字符串数字都可以,一般来说不要修改配置,使用默认的就可以,直接q.put(x)。
block=True表示队列堵塞,直到队列有空的地方出来再把元素传进去,timeout=10表示10秒后如果没有新的元素传进去就报错。
"""
q.put(x,block=True,timeout=None)"""
# 这里是把数据x取出来,一般使用默认配置就好,知己q.get(x)。
# 如果设置block=True,timeout=10,就是说如果10秒没有接收到数据就直接报错;
# 如果设置block=False的话,timeout参数会被忽略,此时就是只要队列有元素就直接弹出这个元素,没有元素就直接报错。block参数只有在队列中有多个值的时候有效。
"""# q.get(x,block=True,timeout=None)
q.get(x)

2.2判断队列是否为空

# 向队列添加一个数据
q.put(x,block=True,timeout=None)print(q.empty())
# 如果队列是空的就返回TRUE"""
判断队列是否为空这种很常用,如果队列的数据如果不为空,那么就执行后面的消费者,取出数据给消费者对象。
"""
while not q.empty():# 如果队列中还有数据的话print(abs(q.get(x)))# 获取队列的元素,打印出元素的绝对值。这里只是一个演示,一般会把q.get(x)传入一个函数中。

2.3获取队列元素个数

print(q.qsize())

2.4 判断队列是否满了

q.full()
# 如果满了返回TRUE

2.5堵塞调用进程

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

q.join()

2.6队列任务完成

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

q.task_done()

2.7队列实例1:不设置maxsize

import queuelist1=[1,2,3,4,5,6,7,8,9]q=queue.Queue()
#管道模式正序:先进先出
for x in list1:q.put(x)# 把元素一个一个加载进管道中
while not q.empty():
# 当管道的元素不为空的时候print(q.get(x))# 打印出元素print ('queue size:' + str(q.qsize()))# 打印队列中还剩下多少元素
#此种task_done和join()不能加上去,否则会一直堵塞,导致无法结束。

2.8队列实例2,设置maxsize

前面说过maxsize表示队列中最多有多少个数据,数据堆满了后,堵塞队列,然后数据一个一个的出去。

import queue
list1=[1,2,3,4,5,6,7,8,9]
q=queue.Queue(maxsize=5)
#管道内最多存在5个
for x in list1:q.put(x)# 管道内载入5个元素了while not q.empty():print (q.get(x))print ('queue size:' + str(q.qsize()))# 执行完了这5个元素q.task_done()q.join()#task_done和join()可以不加上去,至于两种用法看看上面写的

管道内最多存在5个,循环往管道里面添加数据的时候加了5个之后,由于管道满了之后会一直等到管道释放前面的数据然后才能继续添加后续的数据。这个过程是一个拥堵的过程,可以理解为程序无限循环卡死在这了。

2.9 实例3:多线程中的队列

import threading
import time
import queueq = queue.Queue(maxsize=5)def t1(q):# 由于该函数的写法是死循环,所以该操作会一直发生下去。while 1:for i in range(10):q.put(i)def t2(q):while not q.empty():print('队列中的数据量:'+str(q.qsize()))# q.qsize()是获取队列中剩余的数量print('取出值:'+str(q.get()))# q.get()是一个堵塞的,会等待直到获取到数据print('-----')time.sleep(0.1)t1 = threading.Thread(target=t1,args=(q,))
t2 = threading.Thread(target=t2,args=(q,))
t1.start()
t2.start()

2.10 实例4:多线程中的队列

import threading
import time
import queue
'''
模拟包子店卖包子
厨房每一秒钟制造一个包子
顾客每三秒吃掉一个包子
厨房一次性最多存放100个包子
'''
q = queue.Queue(maxsize=100)
# 厨房一次性最多存放100个包子def produce(q):
# 这个函数专门产生包子for i in range(1,7):q.put('第{}个包子'.format(str(i)))# 生产出包子,表明包子的id号time.sleep(1)# 要一秒才能造出一个包子def consume(q):while not q.empty():# 只要包子店里有包子print('包子店的包子剩余量:'+str(q.qsize()))# q.qsize()是获取队列中剩余的数量print('小桃红吃了:'+str(q.get()))# q.get()是一个堵塞的,会等待直到获取到数据print('------------')time.sleep(3)t1 = threading.Thread(target=produce,args=(q,))
t2 = threading.Thread(target=consume,args=(q,))
t1.start()
t2.start()
包子店的包子剩余量:1
小桃红吃了:第1个包子
------------
包子店的包子剩余量:2
小桃红吃了:第2个包子
------------
包子店的包子剩余量:4
小桃红吃了:第3个包子
------------
包子店的包子剩余量:3
小桃红吃了:第4个包子
------------
包子店的包子剩余量:2
小桃红吃了:第5个包子
------------
包子店的包子剩余量:1
小桃红吃了:第6个包子
------------
import threading
import time
import queue
'''
模拟包子店卖包子
厨房每一秒钟制造一个包子
顾客每三秒吃掉一个包子
厨房一次性最多存放100个包子
'''
q = queue.Queue(maxsize=3)
# 厨房一次性最多存放100个包子def produce(q):
# 这个函数专门产生包子for i in range(1,7):q.put('第{}个包子'.format(str(i)))# 生产出包子,表明包子的id号time.sleep(1)# 要一秒才能造出一个包子def consume(q):while not q.empty():# 只要包子店里有包子print('包子店的包子剩余量:'+str(q.qsize()))# q.qsize()是获取队列中剩余的数量print('小桃红吃了:'+str(q.get()))# q.get()是一个堵塞的,会等待直到获取到数据print('------------')time.sleep(3)t1 = threading.Thread(target=produce,args=(q,))
t2 = threading.Thread(target=consume,args=(q,))
t1.start()
t2.start()
包子店的包子剩余量:1
小桃红吃了:第1个包子
------------
包子店的包子剩余量:2
小桃红吃了:第2个包子
------------
包子店的包子剩余量:3
小桃红吃了:第3个包子
------------
包子店的包子剩余量:3
小桃红吃了:第4个包子
------------
包子店的包子剩余量:2
小桃红吃了:第5个包子
------------
包子店的包子剩余量:1
小桃红吃了:第6个包子
------------

2.11实例5:队列爬虫

import threading
import requests
from lxml import etree
import queue
from faker import Factory
import json
import logging
# 生产网址者--------------------1
def prepareUr1Queue():urlQueue=queue.Queue()baseUrl= 'https://movie.douban.com/top250?start={}&filter='for start in range(0,10):url = baseUrl.format(start*20)#放入队列urlQueue.put(url)return urlQueue# urlQueue = prepareUr1Queue()
# print(urlQueue.get())
# print(urlQueue.get())
# print(urlQueue.get())
urlQueue = prepareUr1Queue()
print(urlQueue.get())
print(urlQueue.get())
print(urlQueue.get())
# 采集线程者--------------------1
class CrawlerThread(threading.Thread):def __init__(self,name,urlQueue,f):#初始化父类,name是线程的名字super().__init__(name=name)# 拿到网址,初始化网址队列self.urlQueue = urlQueueself.f = fself.name = namedef get_header(self):headers = {'User-Agent': self.f.user_agent()}return headers#运行线程的方法  rundef run(self):# 这这里实现数据的采集,即获取responseprint(f"{self.name}---采集线程正在执行")#循环提取网址,不空就执行,就出来元素# self.urlQueue.empty()在队列不空的时候返回while not self.urlQueue.empty():try:#block=False表示为空的时候不堵塞。url = self.urlQueue.get(block=False)headers = self.get_header()response = requests.get(url,headers=headers)
#                 print(response.text)responseQueue.put(response.text)except queue.Empty:passprint(self.name + "执行结束")
# 生产者产生网址
# urlQueue = prepareUr1Queue()
# responseQueue = queue.Queue()
# responseQueue.get()
# 解析线程组
class ParserThread(threading.Thread):def __init__(self,name,responseQueue,fp,lock):#初始化父类,name是线程的名字super().__init__(name=name)# 拿到网址,初始化网址队列self.responseQueue= responseQueueself.fp = fpself.lock = lockself.name = namedef parserResponse(self,response):html = etree.HTML(response)#使用xpath解析出电影条目的列表,每个条目本身也是一种树结构moive_items = html.xpath('//li//div[@class="item"]')moives = []# 遍历电影条目for moive_item in moive_items:# 取出电影名字title = moive_item.xpath('.//span[@class="title"]/text()')[0]#电彩讦分rating = movie_item.xpath('.//span[@class="rating_num"]/text()')[0]# 总评论人数review_num = movie_item.xpath( './/span[4]/text()"')[0]moive = {"title":title,"rating":rating," review_num": review_num}# 载入json文件#with self.lock:#self.fp.write(json.dumps(moive,ensure_ascii=False))self.lock.acquire()#获得锁self.fp.write(json.dumps(moive))self.lock.release()moives.append(moive)return moives#运行线程的方法  rundef run(self):# 这这里实现数据的采集,即获取responseprint(f"{self.name}---解析线程正在执行")#循环提取数据,不空就执行,就出来元素while not parse_exit_flag:try:# 获取到responseQueue了(即response.text),block=False是防止队列为空的时候堵塞response = self.responseQueue.get(block=False)#解析数据的方法data = self.parserResponse(response)print(data)except:passprint(self.name + "执行结束")if __name__ == '__main__':print("主线程开始")# 第一步启动生产者 生产网址-----------------------urlQueue = prepareUr1Queue()responseQueue = queue.Queue()with open('movie250.json','a',encoding='utf-8') as fp:f = Factory.create()parse_exit_flag = Falselock = threading.Lock()# 第二步 启动采集线程组------------------------crawlerThreads = []# 循环4个采集线程,创建线程组for i in range(6):# 掺入不同的名字,网址队列全部传递过去thread = CrawlerThread(f'crawler-{i}',urlQueue,f)crawlerThreads.append(thread)#线程组# 启动线程组for t in crawlerThreads:# 启动t.start()# 第三步  启动解析线程组----------------------------parserThreads = []for i in range(6):# 掺入不同的名字,网址队列全部传递过去thread = ParserThread(f'parser-{i}',responseQueue,fp,lock)parserThreads.append(thread)#线程组for t in parserThreads:# 启动t.start()# 判断 网址队列 urlQueue 是否为空while not urlQueue.empty():pass# 等待关闭线程 ---------------------------按照顺序关闭:先关闭采集再关闭解析# 采集线程的join:等待任务完成然后关闭for t in crawlerThreads:t.join()# 判断 采集 uresponseQueue 是否为空while not responseQueue.empty():pass# 在这个位置以上,前面的线程组都已经结束了parse_exit_flag = True# 解析线程的joinfor t in parserThreads:t.join()print("主线程退出")

3.线程同步

如果没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。

"同"字从字面上容易理解为一起动作,其实不是,"同"字应是指协同、协助、互相配合。如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。

3.1线程锁实现同步控制

线程锁使用threading.Lock()实例化,使用acquire()上锁,使用release()释放锁,牢记acquire与release()必须要同时成对存在。它提供一些如下方法:

  • acquire():上锁,这个时候只能运行上锁后的代码
  • release():解锁,解锁后把资源让出来,给其他线程使用

# 由于这里是死循环,所以这里会无限循环下去。
def run1():while 1:if l1.acquire():# 如果第一把锁上锁了print('我是老大,我先运行')l2.release()# 释放第二把锁
def run2():while 1:if l2.acquire():# 如果第二把锁上锁了print('我是老二,我第二运行')l3.release()# 释放第三把锁def run3():while 1:if l3.acquire():# 如果第三把锁上锁了print('我是老三,我最后运行')l1.release()# 释放第一把锁t1 = threading.Thread(target=run1)
t2 = threading.Thread(target=run2)
t3 = threading.Thread(target=run3)l1 = threading.Lock()
l2 = threading.Lock()
l3 = threading.Lock()
# 实例化三把锁l2.acquire()
l3.acquire()t1.start()
t2.start()
t3.start()

3.2条件变量实现同步精准控制

条件变量,用于复杂的线程间同步。在一些对线程间通信要求比较精准的需求下,使用简单的lock加锁解锁已经没法实现需求,这个时候condition条件控制就派上用场了。

"""
由于线程的不安全性,每次生成和获取的数字都并非同时是按顺序索取要得到的,
这个时候condition就派上用场了(其实如果设置消息队列的q=queue.Queue(size=1)就能解决这个问题)。
"""import threading
import randomdef produce():global qwhile 1:con.acquire()# 必须在有锁的前提下才能使用条件变量q = str(random.randint(1,100))print('我生成了一个随机数字:'+q)con.notify()# 发起一个信号,释放一个被堵塞的线程con.wait()# 发起一个信号,堵塞当前线程,等待另一个notify出现的时候就执行下面的代码con.release()# 必须要解锁def consume():global qwhile 1:con.acquire()# 必须在有锁的前提下才能使用条件变量print('我获取到你生成的随机数字:'+q)con.notify()# 发起一个信号,释放一个被堵塞的线程con.wait()# 堵塞当前线程con.release()t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
con = threading.Condition()
t1.start()
t2.start()

3.3信号量实现定量的线程同步

semaphore适用于控制进入数量的锁,好比文件的读写操作,写入的时候一般只用一个线程写,如果多个线程同时执行写入操作的时候,就会造成写入数据混乱。 但是读取的时候可以用多个线程来读取,可以看到写与写是互斥的,读与写不是互斥的,读与读不是互斥的。文件读写只是个例子,在一些日常业务中比如爬虫读取网址的线程数量控制等。

BoundedSemaphore。这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。

import time
import threadingdef run(n, se):se.acquire()print("run the thread: %s" % n)time.sleep(1)se.release()# 设置允许5个线程同时运行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):t = threading.Thread(target=run, args=(i,semaphore))t.start()

3.4事件实现线程锁同步

事件线程锁的运行机制:

全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。

事件主要提供了四个方法set()、wait()、clear()和is_set()。

  • 调用wait()方法将等待信号。
  • is_set():判断当前是否状态
  • 调用set()方法会将Flag设置为True。
  • 调用clear()方法会将事件的Flag设置为False。
import threading
import time
import randomboys = ['此时一位捡瓶子的靓仔路过\n------------','此时一位没钱的网友路过\n------------','此时一位推着屎球的屎壳郎路过\n------------']
event = threading.Event()
def lighter():event.set()while 1:ti = (random.randint(1, 10))time.sleep(ti)print('等待 {} 秒后'.format(str(ti)))event.clear()time.sleep(ti)event.set()def go(boy):while 1:if event.is_set():# 如果事件被设置print('在辽阔的街头')print(boy)time.sleep(random.randint(1, 5))else:print('在寂静的田野')print(boy)event.wait()print('突然,一辆火车驶过')time.sleep(5)t1 = threading.Thread(target=lighter)
t1.start()for boy in boys:t2 = threading.Thread(target=go,args=(boy,))t2.start()

4.生产者与消费者

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者消耗数据或者资料。

举一个寄信的例子,假设要寄一封信,大致过程如下:

  • 1.你把信写好-―相当于生产者制造数据;
  • 2.把信放入邮简――相当于生产者把数据放入缓冲区;
  • 3.递员把信从邮简取出—一相当于消费者把数据取出缓冲区;
  • 4.递员把信拿去邮局做相应的处理――相当于消费者处理数据;

4.1基于生产者消费者的多线程爬虫

import requests
from lxml import etreeheaders = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36","referer": "https://www.cnblogs.com/"
}json_export= []for connect in range(1,50+1):payload = {"CategoryId": "808","CategoryType": "SiteHome","ItemListActionName": "AggSitePostList","PageIndex": connect,"ParentCategoryId": "0","TotalPostCount": "4000"}json_export.append(payload)def craw(payloads):s = requests.session()rep = s.get("https://www.cnblogs.com/#p",headers=headers)rpost = s.post("https://www.cnblogs.com/AggSite/AggSitePostList",json=payloads,headers=headers)# print(rpost.text)return rpost.textdef parse(txt):html = etree.HTML(txt)title = html.xpath("//article[@class='post-item']//a[@class='post-item-title']/text()")href_u = html.xpath("//article[@class='post-item']//a/@href")resource_json = list(zip(title,href_u))         #将2个列表zip压缩,title元素1对应href_u 元素1依次整列。再转为list类型return resource_json
import queue
import time
import random
import threadingdef do_craw(url_q: queue.Queue,html_q:queue.Queue):while True:         # 保证全局不停止payload = url_q.get()html = craw(payload)html_q.put(html)print(threading.current_thread().name,f"craw {payload}","url_q.size=",url_q.qsize())      # 打印生产线程状态time.sleep(random.randint(1,2))     # 随机延迟1-2sdef do_parse(html_q:queue.Queue,fout):while True:         # 保证全局不停止html = html_q.get()results = parse(html)for result in results:# print(result)fout.write(str(result) +"\n")print(threading.current_thread().name,f"results.size", len(results),"html_q.size=",html_q.qsize())        # 打印消费线程状态time.sleep(random.randint(1,2))     # 随机延迟1-2sif __name__ == "__main__":url_q = queue.Queue()   # 该对象存放所有post加密参数html_q = queue.Queue()  # 该对象存放所有请求网站后返回的text资源for payload in json_export:url_q.put(payload)      # 添加psot参数for idx in range(3):        # 创建3个线程进行生产# 多线程应用在do_craw函数下,args对函数传参,name给线程起名字t = threading.Thread(target=do_craw,args=(url_q,html_q),name=f"craw{idx}")t.start()fout = open("BKdata.txt",'w',encoding="utf-8")      # 创建文件流对象,写入数据for idx in range(2):        # 创建2个线程进行消费t = threading.Thread(target=do_parse,args=(html_q,fout),name=f"parse{idx}")t.start()

5.Python中创建新线程

应用程序必须 通过操作系统提供的 系统调用,请求操作系统分配一个新的线程。python3 将 系统调用创建线程 的功能封装在 标准库 threading 中。

print('主线程执行代码') # 从 threading 库中导入Thread类
from threading import Thread
from time import sleep# 定义一个函数,作为新线程执行的入口函数
def threadFunc(arg1,arg2):print('子线程 开始')print(f'线程函数参数是:{arg1}, {arg2}')sleep(5)print('子线程 结束')# 创建 Thread 类的实例对象
thread = Thread(# target 参数 指定 新线程要执行的函数# 注意,这里指定的函数对象只能写一个名字,不能后面加括号,# 如果加括号就是直接在当前线程调用执行,而不是在新线程中执行了target=threadFunc, # 如果 新线程函数需要参数,在 args里面填入参数# 注意参数是元组, 如果只有一个参数,后面要有逗号,像这样 args=('参数1',)args=('参数1', '参数2')
)# 执行start 方法,就会创建新线程,
# 并且新线程会去执行入口函数里面的代码。
# 这时候 这个进程 有两个线程(子线程)了。start()会创建新线程,然后执行新线程。
thread.start()# 主线程的代码执行 :子线程对象的join方法,
# 就会等待子线程结束,才继续执行下面的代码。(等待一个子线程结束才会继续下一个子线程)
thread.join()
print('主线程结束')

thread = Thread(target=threadFunc('参数1', '参数2'))thread = Thread(target=threadFunc,args=('参数1', '参数2'))的代码虽然结果一样,但是存在本质区别!

thread = Thread(target=threadFunc('参数1', '参数2'))是将函数执行结果即返回值None传给target,而thread = Thread(target=threadFunc,args=('参数1', '参数2'))是将函数实例传给target。前者在主线程已经运行完了,后者是将函数传给target然后创建了Thread实例对象但是并没有执行。

6.解决共享数据的访问控制问题-多线程

一般在操作数据库的时候需要注意这个问题。
做多线程开发,经常遇到这样的情况:多个线程里面的代码 需要访问 同一个 公共的数据对象。这个公共的数据对象可以是任何类型, 比如一个 列表、字典、或者自定义类的对象。有的时候,程序 需要 防止线程的代码 同时操作 公共数据对象。 否则,就有可能导致 数据的访问互相冲突影响

# 用一个简单的程序模拟一个银行系统,用户可以往自己的帐号上存钱。
from threading import Thread,Lock
from time import sleepbank = {'byhy' : 0
}bankLock = Lock()# 定义一个函数,作为新线程执行的入口函数
def deposit(theadidx,amount):# 操作共享数据前,申请获取锁bankLock.acquire()balance =  bank['byhy']# 执行一些任务,耗费了0.1秒sleep(0.1)bank['byhy']  = balance + amountprint(f'子线程 {theadidx} 结束')# 操作完共享数据后,申请释放锁bankLock.release()theadlist = []
for idx in range(10):# 创建 Thread 类的实例对象thread = Thread(target = deposit,args = (idx,1))# 创建新线程thread.start()# 把线程对象都存储到 threadlist中theadlist.append(thread)for thread in theadlist:thread.join()print('主线程结束')
print(f'最后我们的账号余额为 {bank["byhy"]}')
子线程 0 结束
子线程 1 结束
子线程 2 结束
子线程 3 结束
子线程 4 结束
子线程 5 结束
子线程 6 结束
子线程 7 结束
子线程 8 结束
子线程 9 结束
主线程结束
最后我们的账号余额为 10

7.daemon和non-daemon线程

Python 中,构造线程的时候,可以设置daemon属性,这个属性必须再start()方法前设置好。线程daemon属性,如果设定就是用户的设置,否则就取当前线程的daemon值。

主线程是non-daemon线程,即daemon = False。线程具有一个daemon属性,可以手动设置为True或者False,也可以不设置,则取默认值为None

如果除主线程之外还有non-daemon线程的时候,主线程退出时,也不会杀掉所有daemon线程,直到所有non-daemon线程全部结束,如果还有daemon线程,主线程需要退出,会结束所有daemon线程,程序退出。

from threading import Thread
from time import sleep"""
可以发现,主线程先结束,要过个2秒钟,等子线程运行完,整个程序才会结束退出。Python程序中当所有的 `非daemon线程` 结束了,整个程序才会结束。
主线程是非daemon线程,启动的子线程默认也是 非daemon 线程。所以,要等到 主线程和子线程 都结束,程序才会结束。
"""def threadFunc():sleep(5)print('子线程 结束')# 不设置的话,默认就是non-daemon线程。
thread = Thread(target=threadFunc)
thread.start()
print('主线程结束')
"""
可以在创建线程的时候,设置daemon参数值为True。
再次运行,可以发现,只要主线程结束了,整个程序就结束了,不需要停顿5秒。因为只有主线程是非daemon线程。
"""from threading import Thread
from time import sleepdef threadFunc():sleep(5)print('子线程 结束')
# 设置新线程为daemon线程
thread = Thread(target=threadFunc,daemon=True)
thread.start()
print('主线程结束')

Python多线程和多进程:初步了解相关推荐

  1. python多线程和多进程的使用_python多线程与多进程

    python多线程与多进程 python多线程 python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来 ...

  2. python 多进程_说说Python多线程与多进程的区别?

    公众号新增加了一个栏目,就是每天给大家解答一道Python常见的面试题,反正每天不贪多,一天一题,正好合适,只希望这个面试栏目,给那些正在准备面试的同学,提供一点点帮助! 小猿会从最基础的面试题开始, ...

  3. python多线程,多进程,线程池,进程池

    https://blog.csdn.net/somezz/article/details/80963760 python 多线程 线程(Thread)也叫轻量级进程,是操作系统能够进行运算调度的最小单 ...

  4. Python——多线程与多进程

    Python--多线程与多进程 学习python进阶能力,多进程与多线程的能力是必须的,不然真out了.以下内容部分摘自博客:Python 多线程与多进程.Python:多线程及多进程的使用. 一.线 ...

  5. Python多线程与多进程微信公众号后台开发

    目录 前言 一.线程与进程 1.什么是线程 2.什么是进程 3.进程与线程的关系 4.总结 5.CPU密集型与IO密集型 二.Python的多线程和多进程 1.GIL(Global Interpret ...

  6. Python - 多线程与多进程

    Python - 多线程与多进程 多线程 程序默认都是单线程(这个默认线程又叫主线程,其他的线程都叫子线程) Thread类的对象就是线程对象,程序需要多少个子线程就创建多少个Thread的对象 im ...

  7. python多线程_干货|理解python多线程和多进程

    点击上方"AI遇见机器学习",选择"星标"公众号 原创干货,第一时间送达 一.多线程与多进程 在介绍Python多线程编程之前,先给大家复习一下进程和线程的概念 ...

  8. Python多线程和多进程编程

    原文地址:https://tracholar.github.io/wiki/python/python-multiprocessing-tutorial.html 简介 早已进入多核时代的计算机,怎能 ...

  9. python多线程与多进程

    参考链接:python并行任务技巧 python多线程到底有没有用 1 from multiprocessing import Pool 2 from multiprocessing.dummy im ...

最新文章

  1. 【9704】【9109】麦森数
  2. jvm性能调优实战 - 27亿级数据量的实时分析引擎,为啥频繁发生Full GC
  3. python 读取sqlite存入文件_如何通过python读取sqlite数据文件
  4. linux系统/opt目录和/usr/local目录有什么区别
  5. [UE4]把枪抽象为一个类
  6. 物联网的几大开源操作系统
  7. 【二分答案】【Heap-Dijkstra】bzoj2709 [Violet 1]迷宫花园
  8. JQuery插件iScroll实现下拉刷新,滚动翻页特效
  9. 离散数学(第2版)屈婉玲版知识点小结(用于个人快速复习)-1
  10. 华为交换机初始化_华为S5700交换机初始化和配置TELNET远程登录
  11. 用protues作RC桥式振荡电路仿真,无法形成正弦波,求解惑
  12. WIFI提示“已连接,但无法访问互联网”
  13. 泛微OA流程附件在服务器磁盘中的位置
  14. python父亲节礼物送什么_父亲节送什么礼物好
  15. 解决安装webrtcvad出现错误的方法
  16. PUM-main makefile报错 #error -- unsupported GNU version gcc versions later than 6 are not supported
  17. 如何避免手机失窃后倾家荡产——手把手教你设置SIM卡密码(也就是PIN密码)
  18. 人工智能证书有什么作用?
  19. 信息技术学考 这题不会做!!
  20. “山寨机、猪流感”的英文说法(及更多热词——每天更新)

热门文章

  1. 2008.09.25 半夜爬起来
  2. psutil详细使用
  3. jmockit教程_java unit test Mock框架jMockit示例教程 - Mock 构造方法,基于状态的Mock
  4. android音频hal层简介
  5. python用逗号隔开输出_关于for循环:Python – 打印出用逗号分隔的列表
  6. js中以多个字符拆分字符串
  7. 7.HDFS之——NameNode的概述、自动Name的概述、NameNode HA 集群搭建
  8. Stable Diffusion攻略集(Stable Diffusion官方文档、kaggle notebook、webui资源帖)
  9. 【Java】JVM学习(一)
  10. LeetCode 676. 实现一个魔法字典