基本示例

from gevent import monkey
monkey.patch_all()  # 记住一定放在第一行,这里是打补丁的意思,time模块在使用协程gevent模块的时候,必须打补丁才行,记得放在第一行。
import gevent
import timedef eat(name):print(f"{name} eat first")time.sleep(3)print(f"{name} eat second")def play(name):print(f"{name} play phone 1")time.sleep(2)print(f"{name} play phone 2")g1 = gevent.spawn(eat, "lily")
g2 = gevent.spawn(play, name="lily")
g1.join()
g2.join()

1,gevent介绍

gevent是第三方库,通过 greenlet 实现 coroutine,创建、调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高。

gevent 实现了 python 标准库中一些阻塞库的非阻塞版本,如 socket、os、select 等 (全部的可参考 gevent1.0 的 monkey.py 源码),可用这些非阻塞的库替代 python 标准库中的阻塞的库。

gevent 提供的 API 与 python 标准库中的用法和名称类似。

其基本思想是:当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

gevent是基于协程的Python网络库。特点:

  • 基于libev的快速事件循环(Linux上epoll,FreeBSD上kqueue)。
  • 基于greenlet的轻量级执行单元。
  • API的概念和Python标准库一致(如事件,队列)。
  • 可以配合socket,ssl模块使用。
  • 能够使用标准库和第三方模块创建标准的阻塞套接字(gevent.monkey)。
  • 默认通过线程池进行DNS查询,也可通过c-are(通过GEVENT_RESOLVER=ares环境变量开启)。
  • TCP/UDP/HTTP服务器
  • 子进程支持(通过gevent.subprocess)
  • 线程池

gevent常用方法:

gevent.spawn()                              创建一个普通的Greenlet对象并切换
gevent.spawn_later(seconds=3)    延时创建一个普通的Greenlet对象并切换
gevent.spawn_raw()                       创建的协程对象属于一个组
gevent.getcurrent()                         返回当前正在执行的greenlet
gevent.joinall(jobs)                          将协程任务添加到事件循环,接收一个任务列表
gevent.wait()                                    可以替代join函数等待循环结束,也可以传入协程对象列表
gevent.kill()                                      杀死一个协程
gevent.killall()                                   杀死一个协程列表里的所有协程
monkey.patch_all()                            非常重要,会自动将python的一些标准模块替换成gevent框架

greenlet常用实例方法:

# Greenlet对象
from gevent import Greenlet# Greenlet对象创建
job = Greenlet(target0, 3)
Greenlet.spawn() # 创建一个协程并启动
Greenlet.spawn_later(seconds=3) # 延时启动# 协程启动
job.start() # 将协程加入循环并启动协程
job.start_later(3) # 延时启动# 等待任务完成
job.join() # 等待任务完成
job.get() # 获取协程返回的值# 任务中断和判断任务状态
job.dead() # 判断协程是否死亡
job.kill() # 杀死正在运行的协程并唤醒其他的协程,这个协程将不会再执行,可以
job.ready() # 任务完成返回一个真值
job.successful() # 任务成功完成返回真值,否则抛出错误# 获取属性
job.loop # 时间循环对象
job.value # 获取返回的值# 捕捉异常
job.exception # 如果运行有错误,获取它
job.exc_info # 错误的详细信息# 设置回调函数
job.rawlink(back) # 普通回调,将job对象作为回调函数的参数
job.unlink() # 删除回调函数
# 执行成功的回调函数
job.link_value(back)
# 执行失败的回调函数
job.link_exception(back)

gevent.Pool的特殊方法:

pool.wait_available():等待直到有一个协程有结果
pool.dd(greenlet):向进程池添加一个方法并跟踪,非阻塞
pool.discard(greenlet):停止跟踪某个协程
pool.start(greenlet):加入并启动协程
pool.join():阻塞等待结束
pool.kill():杀死所有跟踪的协程
pool.killone(greenlet):杀死一个协程

2,什么时候用/不用gevent

gevent 的优势:

  • 可以通过同步的逻辑实现并发操作,大大降低了编写并行/并发程序的难度
  • 在一个进程中使用 gevent 可以有效避免对 临界资源 的互斥访问

如果程序涉及较多的 I/O,可用 gevent 替代多线程来提高程序效率。但由于

  • gevent 中 coroutine 的调度是由使用者而非操作系统决定
  • 主要解决的是 I/O 问题,提高 IO-bound 类型的程序的效率
  • 由于是在一个进程中实现 coroutine,且操作系统以进程为单位分配处理机资源 (一个进程分配一个处理机)

因此,gevent 不适合在以下场景中使用:

  • 对任务延迟有要求的场景,如交互式程序中 (此时需要操作系统进行 公平调度)
  • CPU-bound 任务
  • 当需要使用多处理机时 (可通过运行多个进程,每个进程内实现 coroutine 来解决这个问题)

3,gevent操作

如何生成 greenlet instance

一般有两种方法:

  • 使用 gevent.spawn() API
  • subclass Greenlet

第一种方法是调用了 Greenlet class 中的 spawn 类方法,且生成 greenlet instance 后将其放入 coroutine 的调度队列中。第二种方法需要手动通过 instance.start() 方法手动将其加入到 coroutine 的调度队列中。
代码示例:

import gevent
from gevent import Greenletclass MyGreen(Greenlet):def __init__(self, timeout, msg):Greenlet.__init__(self)self.timeout = timeoutself.msg = msgdef _run(self):print("I'm from subclass of Greenlet and want to say: %s" % (self.msg,))gevent.sleep(self.timeout)print("I'm from subclass of Greenlet and done!")class TestMultigreen(object):def __init__(self, timeout=0):self.timeout = timeoutdef run(self):green0 = gevent.spawn(self._task, 0, 'just 0 test') #方式一:使用gevent的spawn方法创建greenlet实例green1 = Greenlet.spawn(self._task, 1, 'just 1 test') #方式一:使用Greenlet的spawn方法创建greenlet实例green2 = MyGreen(self.timeout, 'just 2 test') #方式二:使用自定义的Greenlet子类创建实例,需要调用start()手动将greenlet实例加入到 coroutine 的调度队列中green2.start()gevent.joinall([green0, green1, green2])print('Tasks done!')def _task(self, pid, msg):print("I'm task %d and want to say: %s" % (pid, msg))gevent.sleep(self.timeout)print("Task %d done." % (pid,))if __name__ == '__main__':test = TestMultigreen()test.run()

需要注意:

  • 若仅是想生成 greenlet instance 并置于调度队列中,最好采用 gevent.spawn() API
  • 若想仅生成 greenlet instance 且暂时不想加入到调度队列,则可采用第二种方法。之后若想将其加入到调度队列,则手动执行 instance.start() 方法。

如何进行主线程到 hub greenlet instance 的切换

  • gevent.sleep()
  • Greenlet 或 Greenlet 子类的 instance 的 join() 方法
  • monkey patch 的库或方法 (参见 monkey.py):
  •     socket
  •     ssl
  •     os.fork
  •     time.sleep
  •     select.select
  •     thread
  •     subprocess
  •     sys.stdin,sys.stdout,sys.stderr

4,gevent核心功能

  • Greenlets
  • 同步和异步执行
  • 确定性
  • 创建Greenlets
  • Greenlet状态
  • 程序停止
  • 超时
  • 猴子补丁

4.1,Greenlets

在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

一个 “greenlet” 是一个小型的独立伪线程。可以把它想像成一些栈帧,栈底是初始调用的函数,而栈顶是当前greenlet的暂停位置。你使用greenlet创建一堆这样的堆栈,然后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另一个greenlet,这会让前一个挂起,而后一个在此前挂起处恢复执行。不同greenlets之间的跳转称为切换(switching) 。

greenlet不是一种真正的并发机制,而是在同一线程内,在不同函数的执行代码块之间切换,实施“你运行一会、我运行一会”,并且在进行切换时必须指定何时切换以及切换到哪。

greenlet类主要有两个方法:

  • switch:用来切换协程;
  • throw():用来抛出异常同时终止程序;
from greenlet import greenlet
import timedef test1(gr,g):for i in range(100):print("---A--")gr.switch(g, gr) # 切换到另一个协程执行time.sleep(0.5)def test2(gr, g):for i in range(100):print("---B--")gr.switch(g, gr)# gr.throw(AttributeError)time.sleep(0.5)if __name__ == '__main__':# 创建一个协程1gr1 = greenlet(test1)# 创建一个协程2gr2 = greenlet(test2)# 启动协程gr1.switch(gr2, gr1)

4.2,同步和异步执行

并发的核心思想在于,大的任务可以分解成一系列的子任务,后者可以被调度成 同时执行或异步执行,而不是一次一个地或者同步地执行。两个子任务之间的 切换也就是上下文切换。在gevent里面,上下文切换是通过yielding来完成的.

当我们在受限于网络或IO的函数中使用gevent,这些函数会被协作式的调度, gevent的真正能力会得到发挥。Gevent处理了所有的细节, 来保证你的网络库会在可能的时候,隐式交出greenlet上下文的执行权。

示例如下:

例子中的select()函数通常是一个在各种文件描述符上轮询的阻塞调用。

import time
import gevent
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():print('Started Polling: %s' % tic())select.select([], [], [], 1)print('Ended Polling: %s' % tic())
def gr2():print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic())
def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(3)print('Ended Polling: %s' % tic())
gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3),
])

输出:

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 1.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 3.0 seconds

同步vs异步

下面是另外一个多少有点人造色彩的例子,定义一个非确定性的(non-deterministic) 的task函数(给定相同输入的情况下,它的输出不保证相同)。 此例中执行这个函数的副作用就是,每次task在它的执行过程中都会随机地停某些秒。

import gevent
import randomdef task(pid):gevent.sleep(random.randint(0,2)*0.001)print('task {} done'.format(pid))def synchronous():for i in range(5):task(i)def asynchronous():gev_list = [gevent.spawn(task, i) for i in range(5)]gevent.joinall(gev_list)print("synchronous:")
synchronous()print("asynchronous:")
asynchronous()

运行结果:

synchronous:
task 0 done
task 1 done
task 2 done
task 3 done
task 4 done
asynchronous:
task 4 done
task 3 done
task 0 done
task 1 done
task 2 done

上例中,在同步的部分,所有的task都同步的执行, 结果当每个task在执行时主流程被阻塞(主流程的执行暂时停住)。

程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

要重点留意的是,异步的部分本质上是随机的,而且异步部分的整体运行时间比同步 要大大减少。事实上,同步部分的最大运行时间,即是每个task停0.002秒,结果整个 队列要停0.02秒。而异步部分的最大运行时间大致为0.002秒,因为没有任何一个task会 阻塞其它task的执行。

4.3,确定性

greenlet具有确定性。在相同配置相同输入的情况下,它们总是会产生相同的输出。

下面是另外一个多少有点人造色彩的例子,定义一个非确定性的(non-deterministic) 的task函数(给定相同输入的情况下,它的输出不保证相同)。 此例中执行这个函数的副作用就是,每次task在它的执行过程中都会随机地停某些秒。

import time
def echo(i):time.sleep(0.001)return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
print(run1 == run2 == run3 == run4)
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]
print(run1 == run2 == run3 == run4)

执行结果

False
True

即使gevent通常带有确定性,当开始与如socket或文件等外部服务交互时, 不确定性也可能溜进你的程序中。因此尽管gevent线程是一种“确定的并发”形式, 使用它仍然可能会遇到像使用POSIX线程或进程时遇到的那些问题。

涉及并发长期存在的问题就是竞争条件(race condition)(当两个并发线程/进程都依赖于某个共享资源同时都尝试去修改它的时候, 就会出现竞争条件),这会导致资源修改的结果状态依赖于时间和执行顺序。 这个问题,会导致整个程序行为变得不确定。

解决办法: 始终避免所有全局的状态.

4.4,创建Greenlets

gevent对Greenlet初始化提供了一些封装.

import gevent
from gevent import Greenlet
def foo(message, n):gevent.sleep(n)print(message)
thread1 = Greenlet.spawn(foo, "Hello", 1)
thread2 = gevent.spawn(foo, "I live!", 2)
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
gevent.joinall(threads)

执行结果:

Hello
I live!

除使用基本的Greenlet类之外,你也可以子类化Greenlet类,重载它的_run方法。

import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

执行结果

Hi there!

4.5,Greenlet状态

greenlet的状态通常是一个依赖于时间的参数:

  • started – Boolean, 指示此Greenlet是否已经启动
  • ready() – Boolean, 指示此Greenlet是否已经停止
  • successful() – Boolean, 指示此Greenlet是否已经停止而且没抛异常
  • value – 任意值, 此Greenlet代码返回的值
  • exception – 异常, 此Greenlet内抛出的未捕获异常

代码示例:

import geventdef win():return 'win game'
def fail():raise Exception('You failed.')winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started)
print(loser.started)
# Greenlet异常会保存在Greenlet,不会上抛给主进程.
try:gevent.joinall([winner, loser])
except Exception as e:print('This will never be reached') #此处不能捕获Greenlet异常,永远不会触发print(loser.exception) #Greenlet异常print(winner.value)  # 'You win!'
print(loser.value)  # Noneprint(winner.ready())  # True
print(loser.ready())  # True
print(winner.successful())  # True
print(loser.successful())  # False

执行结果

True
True
You failed.
win game
None
True
True
True
False
Traceback (most recent call last):File "src/gevent/greenlet.py", line 716, in gevent._greenlet.Greenlet.runFile "coroutine.py", line 121, in failraise Exception('You failed.')
Exception: You failed.
2019-01-22T09:05:05Z <Greenlet "Greenlet-0" at 0x103d02848: fail> failed with Exception

4.6,程序停止
当主程序(main program)收到一个SIGQUIT信号时,不能成功做yield操作的 Greenlet可能会令意外地挂起程序的执行。这导致了所谓的僵尸进程, 它需要在Python解释器之外被kill掉。

通用的处理模式就是在主程序中监听SIGQUIT信号,调用gevent.shutdown退出程序。

import gevent
import signal
def run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)thread = gevent.spawn(run_forever)thread.join()

4.7,超时

通过超时可以对代码块儿或一个Greenlet的运行时间进行约束。

import gevent
from gevent import Timeout
seconds = 3
timeout = Timeout(seconds)
timeout.start()def wait():gevent.sleep(4)try:gevent.spawn(wait).join()
except Timeout:print('Could not complete')

执行结果:

Could not complete

超时类

import gevent
from gevent import Timeouttime_to_wait = 5class TimeLong(Exception):passwith Timeout(time_to_wait, TimeLong):gevent.sleep(6)

4.8,猴子补丁(Monkey patching)

我们现在来到gevent的死角了. 在此之前,我已经避免提到猴子补丁(monkey patching) 以尝试使gevent这个强大的协程模型变得生动有趣,但现在到了讨论猴子补丁的黑色艺术 的时候了。你之前可能注意到我们提到了monkey.patch_socket()这个命令,这个 纯粹副作用命令是用来改变标准socket库的。

import socket
print(socket.socket)
print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)

执行结果:

<class 'socket.socket'>
After monkey patch
<class 'gevent._socket3.socket'>
<built-in function select>
After monkey patch
<function select at 0x1074631e0>

Python的运行环境允许我们在运行时修改大部分的对象,包括模块,类甚至函数。 这是个一般说来令人惊奇的坏主意,因为它创造了“隐式的副作用”,如果出现问题 它很多时候是极难调试的。虽然如此,在极端情况下当一个库需要修改Python本身 的基础行为的时候,猴子补丁就派上用场了。在这种情况下,gevent能够修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。

例如,Redis的python绑定一般使用常规的tcp socket来与redis-server实例通信。 通过简单地调用gevent.monkey.patch_all(),可以使得redis的绑定协作式的调度 请求,与gevent栈的其它部分一起工作。

这让我们可以将一般不能与gevent共同工作的库结合起来,而不用写哪怕一行代码。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是“有用的邪恶(useful evil)”。

参考文献:

https://blog.csdn.net/xumesang/article/details/53288363

http://blog.chinaunix.net/uid-9162199-id-4738168.html

https://www.cnblogs.com/cwp-bg/p/9593405.html

Python并发之协程gevent基础相关推荐

  1. Python并发之协程gevent基础(5)

    1,gevent介绍 gevent是第三方库,通过 greenlet 实现 coroutine,创建.调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高. gevent 实现了 ...

  2. Python并发之协程gevent数据结构和实践(6)

    greenlet instances 之间的关系存在两种: 仅有包含于 greenlet instances 集合的关系 同步关系,即存在协作关系 第一种形式很常见,不同的 greenlet inst ...

  3. python并发之协程_python并发编程之协程

    一 引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去 ...

  4. python中的协程:greenlet和gevent

    协程是一中多任务实现方式,它不需要多个进程或线程就可以实现多任务. 1.通过yield实现协程: 代码: import time def A():while 1:print('------A----- ...

  5. python中的协程(二)

    协程 1.协程: 单线程实现并发 在应用程序里控制多个任务的切换+保存状态 优点: 应用程序级别速度要远远高于操作系统的切换 缺点: 多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他 ...

  6. python进程线程协程区别_Python3多线程与协程

    python中的多线程非常的常用,之前一直糊里糊涂地使用,没有一些系统性的概念,记录一下~ 0x001 多线程的优势:可将长时间占用的程序放到后台 可能会加速程序执行速度 能够实现一些类似同步执行的效 ...

  7. Python与Golang协程异同

    背景知识 这里先给出一些常用的知识点简要说明,以便理解后面的文章内容. 进程的定义: 进程,是计算机中已运行程序的实体.程序本身只是指令.数据及其组织形式的描述,进程才是程序的真正运行实例. 线程的定 ...

  8. Python中的协程

    Python中的协程 文章目录 Python中的协程 一.什么是协程 1.概念 2.协程的好处 3.缺点 二.了解协程的过程 1.yield工作原理 2.协程在运行过程中有四个状态: 3.预激协程的装 ...

  9. Python 中 异步协程 的 使用方法介绍

    静觅 崔庆才的个人博客:Python中异步协程的使用方法介绍:https://cuiqingcai.com/6160.html Python 异步 IO .协程.asyncio.async/await ...

最新文章

  1. mysql AB 的基本搭建
  2. bzoj千题计划153:bzoj2431: [HAOI2009]逆序对数列
  3. PHP专题-开发基础(七)
  4. CM3计算板安装硬件时钟DS3231
  5. HTML的格式化应用
  6. visio 模具_Visio2013 自定义模具 简单公式
  7. Java回文数的判断与生成
  8. close与volume的相关性
  9. Atiit 如何手写词法解析器
  10. Axure实战002:APP原型设计思路
  11. andriod studio 运行项目时没有NDK(Android Studio如何更改JDK和SDK或者DNK的路径)
  12. 数据分类算法-朴素贝叶斯
  13. discuz 论坛配置 QQ/163 网易邮箱
  14. 台式cpu温度过高的两个原因及解决方法
  15. awk使用手册(全)
  16. c++ string 易语言,如何把C++变成易语言
  17. 2019年南京大学计算机系暨人工智能学院开放日和九月推免全记录
  18. python 基于itchat详解微信防撤回程序
  19. HDOJ2000题Java代码
  20. 【挑战学习一百天冲刺实习面试】第二十一天:全面理解BIO、NIO、AIO

热门文章

  1. 文章用图的修改和排版(2)
  2. java遍历子目录_Java遍历文件夹及子目录代码实例
  3. 信息学奥赛一本通 提高篇 第六部分 数学基础 相关的真题
  4. 2.2基本算法之递归和自调用函数_7592 求最大公约数问题
  5. Vue第一部分(6):Vue的生命周期
  6. python去重复字符串_python3取出重复3次的字符串保存为3列
  7. C++工作笔记-仿大佬使用枚举类型
  8. staf工作笔记-扩展stax官方实例的补坑说明
  9. 谁动了你的主机-Windows“唤醒”和“开机”时自动拍照-狩猎者项目
  10. php判断绝对路径文件是否存在,php – 如何确定文件路径是否绝对?