greenlet instances 之间的关系存在两种:

  • 仅有包含于 greenlet instances 集合的关系
  • 同步关系,即存在协作关系

第一种形式很常见,不同的 greenlet instance 之间没有交流,且没有共享数据需要进行操作,各自做各自的事情。
对于第二种形式,gevent 提供了几种数据结构便于 greenlet instances 间进行同步。

Gevent常用数据结构和多进程/线程类似,有以下7种:

  • 事件
  • 队列
  • 组和池
  • 锁和信号量
  • 线程局部变量
  • 子进程
  • Actors

1,事件

事件(event)是一个在Greenlet之间异步通信的形式。

gevent.event.Event

通过这种同步原语可以控制一个 greenlet 与多个 greenlet 通过 flag 的 True/False 值进行同步,Event instance 的初始 flag 是 False.但这种同步是 一次性的,即 set() 设置 flag 为 True 对之前因为 wait() 阻塞的 greenlets 都有效,但对 set() 之后再次设置wait() 的 greenlet 无效。
一般的使用场景是:一个 greenlet 控制何时通过 Event().set() 方法设置 flag 的值为 True 来控制其它协作的 greenlet 运行,有协作关系的 greenlet 通过 Event().wait() 方法等待 flag 被设为 True,否则一直阻塞。

代码示例:

import gevent
from gevent.event import Event'''
Illustrates the use of events
'''evt = Event()def setter():'''After 3 seconds, wake all threads waiting on the value of evt'''print('厨师开始做菜')gevent.sleep(3)print("菜做好了")evt.set() #设置event的flag为True,所有wait的greenlet解除阻塞def waiter(name):'''After 3 seconds the get call will unblock'''print("{} 服务员等菜".format(name))evt.wait()  # 阻塞,跳转到下一个greenletprint("菜好了, {} 送餐".format(name))def main():gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter, "服务员 A"),gevent.spawn(waiter, "服务员 B"),])if __name__ == '__main__':main()

运行结果:

厨师开始做菜
服务员 A 服务员等菜
服务员 B 服务员等菜
菜做好了
菜好了, 服务员 A 送餐
菜好了, 服务员 B 送餐

这里setter和waiter一共起了三个协程。分析一下运行顺序应该很容易了解evt是干嘛的:

首先回调之行到运行setter 打印str然后gevent.sleep(3);

然后执行第二个回调waitter()执行到evt.wait()的时候阻塞住然后切换到下一个greenlet实例,执行第三个回调waitter()执行到evt.wait()又被阻塞了,这个时候继续执行下一个回调就会回到setter里面,因为没有人在他前面往hub.loop里注册了;

之后这里执行【print("菜做好了")】,运行evt.set()将flag设置为True。

然后另外两个被阻塞的waitter的evt.wait()方法在看到flag已经为True之后不再继续阻塞运行并且结束。

可以看到,Event可以协同好几个Greenlet同时工作,并且一个主Greenlet在操作的时候可以让其他几个都处于等待的状态,可以实现一些特定的环境和需求。

注意:

一旦 flag 被设为了 True,之前所有的Event().wait() 操作将不再阻塞,这些 greenlet 将等待 hub greenlet 进行调度,即使之后 _flag 又被设置为了 False。可参考这个 问题,还可以再对比一个程序理解这点儿,参考 这个。

若出现 wait() 处于永远的阻塞状态,即在 wait() 设置之后,在其它 greenlet 中没有 set() 操作,则 gevent 会抛出这样的异常。

示例代码:

import randomimport gevent
from gevent.event import Eventclass TestEvent(object):def __init__(self):self.event = Event()def run(self):producers = [gevent.spawn(self._producer, i) for i in range(3)]consumers = [gevent.spawn(self._consumer, i) for i in range(3)]tasks = []tasks.extend(producers)tasks.extend(consumers)gevent.joinall(tasks)def _producer(self, pid):print("I'm producer %d and now I don't want consume to do something" % (pid,))self.event.clear()sleeptime = random.randint(0, 5) * 0.01print("Sleeping time is %f" % (sleeptime,))gevent.sleep(sleeptime)print("I'm producer %d and now consumer could do something." % (pid,))self.event.set()def _consumer(self, pid):print("I'm consumer %d and now I'm waiting for producer" % (pid,))gevent.sleep(random.randint(0, 10) * 0.1)flag = self.event.wait()print("I'm consumer %d. Flag is %r and now I can do something" % (pid, flag))self.event.clear()if __name__ == '__main__':test = TestEvent()test.run()

执行结果:

I'm producer 0 and now I don't want consume to do something
Sleeping time is 0.020000
I'm producer 1 and now I don't want consume to do something
Sleeping time is 0.000000
I'm producer 2 and now I don't want consume to do something
Sleeping time is 0.020000
I'm consumer 0 and now I'm waiting for producer
I'm consumer 1 and now I'm waiting for producer
I'm consumer 2 and now I'm waiting for producer
I'm producer 1 and now consumer could do something.
I'm producer 0 and now consumer could do something.
I'm producer 2 and now consumer could do something.
I'm consumer 2. Flag is True and now I can do something
Traceback (most recent call last):File "coroutine.py", line 72, in <module>test.run()File "coroutine.py", line 51, in rungevent.joinall(tasks)File "src/gevent/greenlet.py", line 849, in gevent._greenlet.joinallFile "src/gevent/greenlet.py", line 859, in gevent._greenlet.joinallFile "src/gevent/_hub_primitives.py", line 198, in gevent.__hub_primitives.wait_on_objectsFile "src/gevent/_hub_primitives.py", line 235, in gevent.__hub_primitives.wait_on_objectsFile "src/gevent/_hub_primitives.py", line 149, in gevent.__hub_primitives._WaitIterator.__next__File "src/gevent/_hub_primitives.py", line 140, in gevent.__hub_primitives._WaitIterator.__next__File "src/gevent/_waiter.py", line 192, in gevent.__waiter.MultipleWaiter.getFile "src/gevent/_waiter.py", line 151, in gevent.__waiter.Waiter.getFile "src/gevent/_greenlet_primitives.py", line 59, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switchFile "src/gevent/_greenlet_primitives.py", line 59, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switchFile "src/gevent/_greenlet_primitives.py", line 63, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switchFile "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
gevent.exceptions.LoopExit: This operation would block foreverHub: <Hub '' at 0x1046436d8 select default pending=0 ref=0 thread_ident=0x101899340>Handles:
[]

问题分析:

任何一个producer中设置event的flag为True,都可能唤醒全部的consumer。

异常gevent.exceptions.LoopExit: This operation would block forever,是由于在第一个consumer实例执行时设置flag为false,导致后续其他consumer一直block。

生产者操作:P-A: Clear the event
P-B: Set the event消费者操作:C-A: Wait for the event to become set
C-B: Wake up when event is set
C-C: Get the event value to return it
C-D: Clear the event随机sleeptime,导致producer和consumer,或者consumers之间顺序错乱。可能造成如下执行过程:
P-A (event is clear)
C1-A
C2-A
C3-A (all three consumers are waiting)
P-B (event is set)
C1-B
C2-B
C3-B (all consumers have woken up because the event is set)
C1-C (consumer 1 sees the event as set)
C1-D (consumer 1 has cleared the event)
C2-C (consumer 2 sees event as clear)
C3-C (consumer 3 sees event as clear)
C2-D
C3-D

gevent.event.AsyncResult

类似 gevent.event.Event,但在调用 set() 方法时可以传递消息,等待的 greenlet 可以通过 wait() 或 get() 方法读取 set() 设置的值。

代码示例:

import gevent
from gevent.event import AsyncResult'''
Illustrates the use of events
'''a_evt = AsyncResult()def setter():'''After 3 seconds, wake all threads waiting on the value of evt'''print('厨师开始做菜')gevent.sleep(3)print("菜做好了")a_evt.set("鱼香肉丝") #设置event的flag为True,所有wait的greenlet解除阻塞def waiter(name):'''After 3 seconds the get call will unblock'''print("{} 服务员等菜".format(name))data = a_evt.get()  # 阻塞,跳转到下一个greenletprint("{} 菜好了, {} 送餐".format(data, name))def main():gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter, "服务员 A"),gevent.spawn(waiter, "服务员 B"),])if __name__ == '__main__':main()

运行结果:

厨师开始做菜
服务员 A 服务员等菜
服务员 B 服务员等菜
菜做好了
鱼香肉丝 菜好了, 服务员 A 送餐
鱼香肉丝 菜好了, 服务员 B 送餐

e.rawlink回调

rawlink是注册一个回调,wait是等待任务的完成之后,才能后续进行。    他的用法有些像twisted的一些思想,注册事件,事件和回调函数关联。    gevent最大的优势是用同步的写法,实现异步的功能。 所以这功能不太实用。

代码示例:

import gevent
import timedef event_callback(e): #e为event事件print("回调函数")def event_setter(e): #e为event事件print('开始produce任务')e.rawlink(event_callback) #注册回调函数print("数据处理中。。。。")gevent.sleep(2)e.set() #设置event flag为True,解除阻塞状态print('任务已生成')def event_waiter(e):#e为event事件print("等待任务")e.wait()print("任务执行。。。")def try_event():from gevent.event import Evente = Event()gevent.joinall([gevent.spawn(event_setter, e),gevent.spawn(event_waiter, e),gevent.spawn(event_waiter, e),])try_event()

运行结果:

开始produce任务
数据处理中。。。。
等待任务
等待任务
任务已生成
回调函数
任务执行。。。
任务执行。。。

2,队列queue

队列是一个排序的数据集合,它有常见的put / get操作, 但是它是以在Greenlet之间可以安全操作的方式来实现的。

举例来说,如果一个Greenlet从队列中取出一项,此项就不会被同时执行的其它Greenlet再取到了。

gevent.queue

这个模块中包含了几种常用的 queue,它的方法与标准库中的 Queue 类似,主要区别在于 put() 和 get() 方法中阻塞时 (对于put 是 queue 已经满了,对于 get 是 queue 已经空了) 添加了将执行权交给 hub greenlet 的操作。使用时需要注意几点:put(item, block=True, timeout=None) 往队列放入数据,可选是否阻塞和超时时间。put() 阻塞执行 (默认情况) 时,若 queue 未满,则装入,否则将执行权交给 hub greenlet,实现 并发。若之后没有代码执行 get() 操作,则 put() 操作会抛出异常,指出它会无限等待。get() 阻塞执行 (默认情况) 时与之类似。

  • put_nowait(item) 非阻塞的往队列放入数据,队列已满时抛出Full Exception。putnowait() 与标准库中的做法一样,若 queue 未满,则装入,否者抛出 Full 异常。getnowait() 与之类似
  • get(block=True, timeout=None) 从队列读出数据,可选是否阻塞和超时时间
  • get_nowait() 费阻塞地从队列读出数据,队列为空是抛出Empty Exception
  • peek(block=True, timeout=None) 和get()类似,但获取的数据不会从队列移除
  • peek_nowait() 类似get_nowait()
  • empty() 队列为空返回True
  • full() 队列已满返回True
  • qsize() 返回队列长度,即队列中的数据数,而非Queue(maxlength)初始化时的maxlength
  • put() 与 putnowait() 的区别在于如何对待 queue 满的情况,前者等待,后者直接抛出异常。get() 与 getnowait() 的区别与之类似。

示例代码:

import gevent
from gevent.queue import Queuetasks= Queue()def worker(n):while not tasks.empty():task = tasks.get()print('Worker {} got task {}'.format(n, task))gevent.sleep(0)print("没数据了")def producer():for i in range(3):tasks.put(i)print("produce: {}".format(i))gevent.spawn(producer).join()
gevent.joinall([gevent.spawn(worker("worker A")),gevent.spawn(worker("worker B")),gevent.spawn(worker("worker C"))
])

执行结果:

produce: 0
produce: 1
produce: 2
Worker worker A got task 0
Worker worker A got task 1
Worker worker A got task 2
quit
quit
quit

首先初始化一个Queue()实例。这里会先运行boss() 调用put_nowait()方法不阻塞的往队列里面放3个元素。然后下面依次从Queue里对数字进行消费,起了三个协程分别命名了不同的名字,使用get方法依次消费队列里面的数字直到消费完毕。个worker A的greenlet实例从queue中获取的数据,不会再被其他greenlet获取。

3,组Group和池Pool

组(group)是一个运行中greenlet集合,集合中的greenlet像一个组一样会被共同管理和调度。 它也兼饰了像Python的multiprocessing库那样的平行调度器的角色,主要用在在管理异步任务的时候进行分组。

我的理解是,在一个组(group)里面的greenlet会被统一管理和调度。

Group().add(greenlet)

add(greenlet)方法传入greenlet,将greenlet添加到group中

实例代码:

import gevent
from gevent.pool import Groupdef talk(msg):for i in range(2):print(msg)g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')group = Group()
group.add(g1) #将greenlet添加进group
group.add(g2) #将greenlet添加进group
group.join()

这个例子非常简单。就是spawn了好几个talk,然后都加到组里面。最后使用group.join()来等待所有spawn完成,每完成一个就会从group里面去掉。

Group().map()

map(func, iterable)由第二个参数控制迭代次数,并且传递给第一个参数值而运行,返回可迭代对象执行结果列表

示例代码:

from gevent.pool import Group
from gevent import getcurrentdef talk(data):print('Size of group %s' % len(group))print('Hello from Greenlet %s' % id(getcurrent()))return datagroup = Group()
res = group.map(talk, [1, 2, 3])
print(type(res))
print(res)

运行结果:

Size of group 3
Hello from Greenlet 4365232456
Size of group 3
Hello from Greenlet 4365232712
Size of group 3
Hello from Greenlet 4365232968
<class 'list'>
[1, 2, 3]

这里使用了group.map()这个函数来取得各spawn的返回值。map()是由第二个参数控制迭代次数,并且传递给第一个参数值而运行的。拿这个函数举例,这里会返回一个list构成这个list的对象就是将迭代的参数传进函数运行之后的返回值。这里得到的结果是[1, 2, 3]

Group().imap()

这里imap与map不一样的地方是,map返回list对象,而imap返回一个iterable对象。

代码示例:

from gevent.pool import Group
from gevent import getcurrentdef talk(data):print('Size of group %s' % len(group))print('Hello from Greenlet %s' % id(getcurrent()))return datagroup = Group()
res = group.imap(talk, [1, 2, 3])print(type(res))
print(res)for i in res:print(i)

运行结果:

<class 'gevent.__imap.IMap'>
<IMap "Greenlet-0" at 0x101c73978: _run>
Size of group 3
Hello from Greenlet 4366280776
Size of group 3
Hello from Greenlet 4366281032
Size of group 3
Hello from Greenlet 4366281288
1
2
3

这里imap与map不一样的地方可能熟悉python基础库的同学很容易看出来,map返回list对象,而imap返回一个iterable对象。所以如果要取得里面的值比如想打印就必须写成像代码最后一行那种。(或者直接包一个list让他变成map函数)。另外提一下imap的内部实现,其实是继承了Greenlet对象,在里面启了spawn()。imap里面还有一个挺好用的参数maxsize默认情况是没有设置的当设置了之后,会将迭代变成一批一批的执行,这里再举一个例子:

from gevent.pool import Group
from gevent import getcurrentdef talk(data):print('Size of group %s' % len(group))print('Hello from Greenlet %s' % id(getcurrent()))return datagroup = Group()
res = group.imap(talk, range(5), maxsize=2)print(type(res))
print(res)
for i in res:print(i)

运行结果:

<class 'gevent.__imap.IMap'>
<IMap "Greenlet-0" at 0x101e73978: _run>
Size of group 3
Hello from Greenlet 4366391368
Size of group 3
Hello from Greenlet 4366391624
Size of group 3
Hello from Greenlet 4366391880
0
1
2
Size of group 2
Hello from Greenlet 4366391880
Size of group 2
Hello from Greenlet 4366391624
3
4

这里运行的时候,会将并行控制到3个,执行也是一次执行3个,而不是不设置的时候输出所有的结果。

Group().imap_unordered()

imap_unordered()和imap的区别是,先返回的先回来。

示例代码:

import random
import gevent
from gevent.pool import Groupdef intensive(n):sleeptime = random.randint(0, 5) * 0.01print("Sleeping time is %f" % (sleeptime,))gevent.sleep(sleeptime)return 'task', n, sleeptimeigroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):print(i)

运行结果:

Sleeping time is 0.020000
Sleeping time is 0.000000
Sleeping time is 0.040000
('task', 1, 0.0)
('task', 0, 0.02)
('task', 2, 0.04)

从输出可以看出task 1最先执行完,最先返回。

gevent.pool

通过 gevent.pool.Group 和 gevent.pool.Pool 可以管理一组 greenlets,前者没有容量限制,后者可设置 Pool 中 greenlet 的最大数量。当容器中的 greenlet 任务完成了,容器会自动将其从中除去。对于 Pool,容器中的 greenlet 数超过设置的最大值时,会将执行权交给 hub greenlet 进行调度。
Pool 可以通过设置最大数量来控制并发度,在爬虫中控制并发量有帮助。

示例代码:

from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):print('Size of pool %s' % len(pool))return nres = pool.imap(hello_from, range(5))
for i in res:print(i)

运行结果:

Size of pool 2
Size of pool 2
0
1
Size of pool 2
Size of pool 2
2
3
Size of pool 1
4

通过 gevent.spawn() 生成的 greenlet 是已经加入到调度队列中的,当 Pool 中 greenlet 数量达到最大值时,阻塞自身的 add()操作,执行权交给 hub greenlet,此时已经生成的 greenlet 都将被调度执行,当 Pool 再次被调度到时,会先检查已有的 greenlet 状态,若这些 greenlet 的 ready() 方法返回 True (即表示运行完),则删除该 greenlet,然后加入新的 greenlet。

示例代码:

class TestPool(object):def __init__(self, maxsize=2):self.pool = gevent.pool.Pool(maxsize)def run(self):for i in range(5):gworker = gevent.spawn(self._worker, i)self.pool.add(gworker)print('sizeof pool is %d' % (len(self.pool),))print('worker %d in? %r' % (i, gworker in self.pool))# add this statement to trap into the loopself.pool.join()print('All tasks done.')def _worker(self, pid):print('My pid is %d' % (pid,))gevent.sleep(1)print('%d has done.' % (pid,))if __name__ == '__main__':test = TestPool()test.run()

运行结果:

sizeof pool is 1
worker 0 in? True
sizeof pool is 2
worker 1 in? True
My pid is 0
My pid is 1
My pid is 2
0 has done.
1 has done.
2 has done.
sizeof pool is 1
worker 2 in? True
sizeof pool is 2
worker 3 in? True
My pid is 3
My pid is 4
sizeof pool is 2
worker 4 in? True
3 has done.
4 has done.
All tasks done.

4,锁和信号量

semaphore信号量是一个允许greenlet相互合作,限制并发访问互斥资源。 信号量有两个方法,acquire和release。

在信号量是否已经被 acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。

锁(lock)是范围为1的信号量。它向单个greenlet提供了互斥访问。 信号量和锁常被用来保证资源只在程序上下文被单次使用。

示例代码:

from gevent import sleep
from gevent.pool import Pool
from gevent.lock import BoundedSemaphoresem = BoundedSemaphore(2)def worker1(n):sem.acquire()print('Worker1 %i acquired semaphore' % n)sleep(0)sem.release()print('Worker1 %i released semaphore' % n)def worker2(n):with sem:print('Worker2 %i acquired semaphore' % n)sleep(0)print('Worker2 %i released semaphore' % n)pool = Pool()
pool.map(worker1, range(0,2))
pool.map(worker2, range(3,6))

运行结果:

Worker1 0 acquired semaphore
Worker1 1 acquired semaphore
Worker1 0 released semaphore
Worker1 1 released semaphore
Worker2 3 acquired semaphore
Worker2 4 acquired semaphore
Worker2 3 released semaphore
Worker2 4 released semaphore
Worker2 5 acquired semaphore
Worker2 5 released semaphore

从以上输出可以看出,每次同时只有两个worker能获取到信号量执行。

当我们使用底层同步元语对象BoundSemaphore(2)初始化两个范围之后,就意味着,同时只能有两个greenlet去拿到这两个锁,在这两个锁被aquire之后,其他人必须等待其被relase,否则无法再继续并发更多的greenlet。

第一组代码的运行先spawn两个greenlet去拿到锁然后释放锁,这里似乎没有什么不一样。但是work2中你会发现,有锁的话你只能看到3,4一起运行然后再运行5。如果没有sem,3,4,5将会同时运行。而且从代码来看,semaphore还支持上下文管理器。

5,线程局部变量(Thread Locals)

Gevent也允许你指定局部于greenlet上下文的数据。 在内部,它被实现为以greenlet的getcurrent()为键, 在一个私有命名空间寻址的全局查找。它只在自己的greenlet的命名空间中有效.

示例代码:

import gevent
from gevent.local import localdata = local()def f1():data.x = 1print(data.x)print(data.__dict__)def f2():data.y = 2print(data.y)print(data.__dict__)try:print(data.x)except AttributeError:print("x is not local to f2")gevent.spawn(f1).join()
gevent.spawn(f2).join()

运行结果:

1
{'x': 1}
2
{'y': 2}
x is not local to f2

这里先初始化了一个线程本地对象local,给data。然后data会把保存给它的属性当作线程本地变量给存储起来。当其他greenlet去访问它的时候是无法访问到的,它只在自己的greenlet的命名空间中有效。这样可以让我们用来做一些有趣的事情,比如打印属于该greenlet的log日志,将日志存储在greenlet本地local()中从而与其它greenlet互不影响,在协程切换的时候也能打出完整日志。还有这是不是很容易让我们联想到常用python web应用框架flask的requests实现?一个requests就是一个http访问,在整个访问过程中我们可以从requests对象里面拿到很多参数,但是它和其他的requests互不影响,这就是线程本地变量的作用。

另外再提一点,genvent.local还可以被继承实现基于当前greenlet能访问的一组属性的自己的类,来看代码:

import gevent
from gevent.local import localclass MyLocal(local):__slots__ = ('x') '''在常规的类里面指定__slots__的意思往往是只允许该类下的属性只允许有__slots__里面这些,超出的就会报出Attribute error的错误。但是继承了local的__slots__在这里却是指,申明了的属性将会穿透所有greenlet变成一个全局可读的,并不再是线程本地的.'''initialized = Falsedef __init__(self, **kw):if self.initialized:raise SystemError('__init__ called too many times')self.initialized = Trueself.__dict__.update(kw)def squared(self):return self.number ** 2def f1():stash.x = 1stash.number = 3print(stash.x)print(stash.number)def f2():stash.y = 2print(stash.y)try:print(stash.x) #x是全局变量except AttributeError:print("x is not local to f2")try:print(stash.number) #number是local变量except AttributeError:print("number is not local to f2")stash = MyLocal()
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)gevent.joinall([g1, g2])

运行结果:

1
3
2
1
number is not local to f2

注意:

我们知道在常规的类里面指定__slots__的意思往往是只允许该类下的属性只允许有__slots__里面这些,超出的就会报出Attribute error的错误。但是继承了local的__slots__在这里却是指,申明了的属性将会穿透所有greenlet变成一个全局可读的,并不再是线程本地的

6,子进程

gevent 1.0支持子线程gevent.subprocess;它支持协作式的等待子进程。

示例代码:

import gevent
from gevent.subprocess import PIPE, Popendef cron_job(n):while True:print("cron_job is running")gevent.sleep(n)g1 = gevent.spawn(cron_job, 0.1)
sub = Popen(['sleep 0.3; uptime'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g1.kill()
print(out.rstrip())

运行结果:

cron_job is running
cron_job is running
cron_job is running
cron_job is running
b'16:16  up 36 days,  6:43, 1 user, load averages: 5.19 4.64 4.68'

7,Actors

actor模型是一个由于Erlang变得普及的更高层的并发模型。 简单的说它的主要思想就是许多个独立的Actor,每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并 根据它期望的行为来采取行动。

Gevent没有原生的Actor类型,但在一个子类化的Greenlet内使用队列, 我们可以定义一个非常简单的。

示例代码:

定义Actor类:

import gevent
from gevent.queue import Queueclass Actor(gevent.Greenlet):def __init__(self):self.inbox = Queue()super(Actor).__init__()def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)

使用actor示例:

import gevent
from gevent.queue import Queue
from gevent import Greenletclass Pinger(Actor):def receive(self, message):print(message)pong.inbox.put('ping')gevent.sleep(0)class Ponger(Actor):def receive(self, message):print(message)ping.inbox.put('pong')gevent.sleep(0)ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])

参考文献:

https://www.cnblogs.com/piperck/p/5719733.html

http://blog.chinaunix.net/uid-9162199-id-4738168.html

Python并发之协程gevent数据结构和实践(6)相关推荐

  1. Python并发之协程gevent基础(5)

    1,gevent介绍 gevent是第三方库,通过 greenlet 实现 coroutine,创建.调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高. gevent 实现了 ...

  2. Python并发之协程gevent基础

    基本示例 from gevent import monkey monkey.patch_all() # 记住一定放在第一行,这里是打补丁的意思,time模块在使用协程gevent模块的时候,必须打补丁 ...

  3. python并发之协程_python并发编程之协程

    一 引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去 ...

  4. python中的协程:greenlet和gevent

    协程是一中多任务实现方式,它不需要多个进程或线程就可以实现多任务. 1.通过yield实现协程: 代码: import time def A():while 1:print('------A----- ...

  5. python中的协程(二)

    协程 1.协程: 单线程实现并发 在应用程序里控制多个任务的切换+保存状态 优点: 应用程序级别速度要远远高于操作系统的切换 缺点: 多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他 ...

  6. python进程线程协程区别_Python3多线程与协程

    python中的多线程非常的常用,之前一直糊里糊涂地使用,没有一些系统性的概念,记录一下~ 0x001 多线程的优势:可将长时间占用的程序放到后台 可能会加速程序执行速度 能够实现一些类似同步执行的效 ...

  7. Python中的协程

    Python中的协程 文章目录 Python中的协程 一.什么是协程 1.概念 2.协程的好处 3.缺点 二.了解协程的过程 1.yield工作原理 2.协程在运行过程中有四个状态: 3.预激协程的装 ...

  8. python 实现多任务协程下载斗鱼平台图片

    python 实现多任务协程下载斗鱼平台图片 import re import gevent from gevent import monkey, pool import time, random i ...

  9. Python与Golang协程异同

    背景知识 这里先给出一些常用的知识点简要说明,以便理解后面的文章内容. 进程的定义: 进程,是计算机中已运行程序的实体.程序本身只是指令.数据及其组织形式的描述,进程才是程序的真正运行实例. 线程的定 ...

最新文章

  1. mysql数据库备份、恢复文档
  2. Linux系统调用--getrlimit()与setrlimit()函数详解【转】
  3. 自增符号++(自减符号--)带来的部分影响
  4. 【Python面试】谈谈对 Python 和其他语言的区别?​
  5. 数据库设置_CentOS7 - 设置MySQL数据库
  6. 今年后,再见Excel,你好python
  7. IBM Holosofx 进行业务流程管理
  8. Makefile之eval与call用法
  9. ping不同的网卡方法
  10. weblogic系列漏洞整理 -- 1. weblogic安装
  11. 阿里云服务器连接ftp服务(软件的使用)
  12. 如何使用WinDbg查找蓝屏原因
  13. java读取服务器文件_JAVA读取服务器端文件
  14. 我的毕业设计历程——基于Unity3D的MOBA游戏设计(二)
  15. matlab在三维人体及服装建模上的应用,Matlab在三维人体及服装建模上的应用
  16. C# 串口接收含有asic2=0x3f时发生的故障
  17. 俗语“手握金鱼骨,富贵不用愁”,是啥意思?金鱼骨怎么形成的?
  18. WorkBench简介
  19. 利用python爬取网易云歌手top50歌曲歌词
  20. js逆向-逆向部分技巧总结

热门文章

  1. matlab park clark,[转载]park,clark和ipark浅析
  2. 活动创意来源和感想思考
  3. 试用MarkDown标记法和Haroopad编辑器
  4. c语言教改课题开题报告,教改项目结题及新项目开题报告会简讯
  5. CSRF 跨站请求伪造攻击
  6. U盘、打印机泄密的隐患
  7. ACM-ICPC 2018 南京赛区网络预赛 AC Challenge
  8. TTDP、TRDP协议
  9. 目标码格式解析之DSP目标码Cinit段
  10. C.H. Robinson罗宾逊全球物流在Gartner实时交通可视化平台魔力象限报告中被评为“挑战者”