本篇将讨论gevent的两架马车-libev和greenlet如何协同工作的。

gevent事件驱动底层使用了libev,我们先看看如何单独使用gevent中的事件循环。

#coding=utf8
import socket
import gevent
from gevent.core import loopdef f():s, address = sock.accept()print addresss.send("hello world\r\n")loop = loop()
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind(("localhost",8000))
sock.listen(10)
io = loop.io(sock.fileno(),1) #1代表read
io.start(f)
loop.run()

代码很简单,使用core.loop新建了一个loop实例,通过io加入socket读事件,通过start设置回调,然后run启动事件循环,一个简单的helloworld服务器搭建好了,可以通过telnet localhost 8000看响应结果。

gevent的整个事件循环是在hub.run中启动的,

    def run(self):assert self is getcurrent(), 'Do not call Hub.run() directly'while True:loop = self.looploop.error_handler = selftry:loop.run()finally:loop.error_handler = None  # break the refcount cycleself.parent.throw(LoopExit('This operation would block forever'))

上面的self.loop和我们上面自己新建的loop对象是一样的,下面我们通过socket的recv函数看时间是如何注册到loop中。
gevent的socket对象被gevent重新封装,原始socket就是下面的self._sock
我们来看看gevent的socket一次recv做了什么操作。

gevent/socket.py

   def recv(self, *args):sock = self._sock  # keeping the reference so that fd is not closed during waitingwhile True:try:return sock.recv(*args) # 1.如果此时socket已经有数据,则直接returnexcept error:#没有数据将会抛出异常,且errno为EWOULDBLOCKex = sys.exc_info()[1]if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:raise# QQQ without clearing exc_info test__refcount.test_clean_exit failssys.exc_clear()#此时将该文件描述符的”读事件“加入到loop中self._wait(self._read_event)"""self._wait会调用hub.wait,def wait(self, watcher):waiter = Waiter()unique = object()watcher.start(waiter.switch, unique) #这个watcher就是上面说的loop.io()实例,waiter.switch就是回调函数try:result = waiter.get()assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)finally:watcher.stop()当loop捕获到”可读事件“时,将会回调waiter.switch方法,此时将回到这里(因为while循环)继续执行sock.recv(*args)一般来说当重新recv时肯定是可以读到数据的,将直接返回"""

上面的self._read_event = io(fileno, 1),再次回到while大循环中,将直接return sock.recv的结果。我们知道socke.recv(1024)可能返回的并没有1024字节,这要看此时缓冲区已接受多少字节,所以说数据可能一次没有读完,所以可能会触发多次

EWOULDBLOCK,多次读取,只有recv为空字符串时才代表读取结束。典型的读取整个数据一般如下所示:

    buff = []while 1:s = socket.recv(1024)if not s:breakelse:buff.append(s)buff = "".jon(buff)

你可能有点好奇,在gevent中有多处使用了assert判断waiter的返回值,如:hub.wait

class Hub(greenlet):def wait(self, watcher):waiter = Waiter()unique = object()watcher.start(waiter.switch, unique)try:result = waiter.get()assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)#这里为什么要assert?#因为正常肯定是loop调用waiter.switch(unique),那么waiter.get()获取的肯定是unique,#如果不是unique,肯定是有其它地方调用waiter.switch,这很不正常finally:watcher.stop()

这主要是为了防止回调函数被其它greenlet调用,因为greenlet通过switch传递参数,看下面代码:

def f(t):gevent.sleep(t)p = gevent.spawn(f,2)
gevent.sleep(0) # 2s后libev将回调f,所以下面p.get获取的是2
switcher = gevent.spawn(p.switch, 'hello') #强先回调p.switch,传递参数hello
result = p.get()

将返回以下异常:

将报如下异常:
AssertionError: Invalid switch into <Greenlet at 0x252c2b0: f(2)>: 'hello' (expected <object object at 0x020414E0>)
<Greenlet at 0x252c2b0: f(2)> failed with AssertionError

我们再看看gevent封装的greenlet,

class Greenlet(greenlet):"""A light-weight cooperatively-scheduled execution unit."""def __init__(self, run=None, *args, **kwargs):hub = get_hub()greenlet.__init__(self, parent=hub)if run is not None:self._run = run

我们看到所有的Greenlet的parent都是hub,这有什么好处呢?
因为当一个greenlet死掉的时候将回到父greenlet中,也就是hub中,hub将从运行上次回调的地方继续开始事件循环,这也就是为什么事件循环是在hub中运行的理由。

我们来看一个一个Greenlet的生命周期

启动Greenlet需要调用start()方法,

    def start(self):"""Schedule the greenlet to run in this loop iteration"""if self._start_event is None:self._start_event = self.parent.loop.run_callback(self.switch)

也就是将当前的switch加入到loop事件循环中。当loop回调self.switch时将运行run方法(这是底层greenlet提供的),

继承时我们可以提供_run方法。

    def run(self):try:if self._start_event is None:self._start_event = _dummy_eventelse:self._start_event.stop() #取消之前添加的回调函数,loop将会从回调链中剔除该函数。#libev提供了一系列的对象封装,如io,timer,都有start,stop方法#而回调是通过loop.run_callback开启的,和其它有所不同try:result = self._run(*self.args, **self.kwargs) #运行自定义_run方法except:self._report_error(sys.exc_info())returnself._report_result(result) #设置返回结果,这是个比较重要的方法,下面会单独看看finally:pass

一切顺利,没有异常将调用_report_result方法,我们具体看看:

    def _report_result(self, result):self._exception = Noneself.value = result #设置返回结果,可通过get()获取,注意要获取value时#不要直接通过.value,一定要用get方法,因为get()会获取到真正的运行后结果,#而.value那是该Greenlet可能还没结束if self._links and not self._notifier: #这个是干什么的?self._notifier = self.parent.loop.run_callback(self._notify_links)

为什么说一定要通过get()才能获取最后返回结果呢,因为get()相当于异步的结果返回,那么很有可能Greenlet还没结果我们就调用
get()想获取结果,如果不是异步,肯定是获取不到的。我们看看get()操作,

    def get(self, block=True, timeout=None):"""Return the result the greenlet has returned or re-raise the exception it has raised.If block is ``False``, raise :class:`gevent.Timeout` if the greenlet is still alive.If block is ``True``, unschedule the current greenlet until the result is availableor the timeout expires. In the latter case, :class:`gevent.Timeout` is raised."""if self.ready(): #该Greenlet已经运行结束,直接返回结果if self.successful():return self.valueelse:raise self._exceptionif block: #到这里说明该Greenlet并没有结束switch = getcurrent().switchself.rawlink(switch) #将当前Greenlet.switch加到自己的回调链中"""self._links.append(callback)"""try:t = Timeout.start_new(timeout)try:result = self.parent.switch() #切换到hub,可以理解为当前get()阻塞了,当再次回调刚刚注册的switch将回到这里#可问题是好像我们没有将switch注册到hub中,那是谁去回调的呢?#幕后黑手其实就是上面的_report_result,当Greenlet结束最后会调用_report_result,#而_report_result把将_notify_links注册到loop的回调中,最后由_notify_links回调我们刚注册的switch# def _notify_links(self):#     while self._links:#     link = self._links.popleft()#     try:#         link(self) #就是这里了,我们看到还把self传给了switch,所以result结果就是self(greenlet通过switch传递结果)#     except:#         self.parent.handle_error((link, self), *sys.exc_info())assert result is self, 'Invalid switch into Greenlet.get(): %r' % (result, ) #知道为什么result是self的原因了吧finally:t.cancel()except:self.unlink(switch)raise#运行到这里,其实Greenlet已经结束了,换句话说self.ready()肯定为Trueif self.ready():if self.successful():return self.valueelse:raise self._exceptionelse: #还没结束,你又不等待,没有值返回啊,只能抛出异常了raise Timeout

通过上面我们知道其实get()就是异步返回结果的方式,当Greenelt要结束时通过run()函数最后的_report_result返回,所以_report_result还是很重要的。

其实_notify_links不只为get提供了最后回调的方法,还提供了Grenlet的link协议。所谓link协议就是Greenlet可以通过

link方法把执行结果传递给一回调函数。

def f(source):print source.value
gevent.spawn(lambda: 'gg').link(f)
gevent.sleep(1)

当Greenlet结束时就会调用f方法,并把self传给f。AsyncResult通过__callback__提供了link方法。

from gevent.event import AsyncResult
a = AsyncResult()
gevent.spawn(lambda: 'gg').link(a)
print a.get()
gevent.sleep(1)

看看AsyncEvent的__call__方法,和我们上面的f差不多

    # link protocoldef __call__(self, source):if source.successful():self.set(source.value)else:self.set_exception(source.exception)

其实Greenlet还提供了一个switch_out的方法,在gevent中switch_out是和switch相对应的一个概念,当切换到Greenlet时将

调用switch方法,切换到hub时将调用Greenlet的switch_out方法,也就是给Greenlet一个保存恢复的功能。

gevent中backdoor.py(提供了一个python解释器的后门)使用了switch,我们来看看

class SocketConsole(Greenlet):def switch(self, *args, **kw):self.saved = sys.stdin, sys.stderr, sys.stdoutsys.stdin = sys.stdout = sys.stderr = self.descGreenlet.switch(self, *args, **kw)def switch_out(self):sys.stdin, sys.stderr, sys.stdout = self.saved

switch_out用的非常漂亮,因为交换环境需要使用sys.stdin,sys.stdout,sys.stderr,所以当切换到我们Greenlet时,

把这三个变量都替换成我们自己的socket描述符,但当要切换到hub时需要恢复这三个变量,所以在switch中先保存,在switch_out中再恢复,switch_out是切换到hub时,与hub的switch调用实现:

class Hub(Greenlet):def switch(self):#我们看到的确是先调用先前的Greenlet.switch_outswitch_out = getattr(getcurrent(), 'switch_out', None)if switch_out is not None:switch_out()return greenlet.switch(self)

可以通过下面两句话就启动一个python后门解释器,感兴趣的童鞋可以玩玩。

from gevent.backdoor import BackdoorServer
BackdoorServer(('127.0.0.1', 9000)).serve_forever()

通过telnet,你可以为所欲为。

在gevent中基本上每个函数都有timeout参数,这主要是通过libev的timer实现。

使用如下:

Timeout对象有pending属性,判断是是否还未运行

t=Timeout(1)
t.start()
try:print 'aaa'import timeassert t.pending == Truetime.sleep(2)gevent.sleep(0.1) #注意这里不可以是sleep(0),虽然sleep(0)也切换到hub,定时器也到了,但gevent注册的回调#是优先级是高于定时器的(在libev事件循环中先调用callback,然后才是timer)
except Timeout,e:assert t.pending == Falseassert e is t #判断是否是我的定时器,和上面的assert一致,防止不是hub调用t.switchprint sys.exc_info()
finally: #取消定时器,不管定时器是否可用,都可取消t.cancel()

Timout对象还提供了with上下文支持:

with Timeout(1) as t:assert t.pendinggevent.sleep(0.5)
assert not t.pending

Timeout第二个参数可以自定义异常,如果是Fasle,with上下文将不传递异常

with Timeout(1,False) as t:assert t.pendinggevent.sleep(2)
assert not sys.exc_info()[1]
我们看到并没有抛出异常

还有一个with_timeout快捷方式:

def f():import timetime.sleep(2)gevent.sleep(0.1) #不能使用gevent.sleep(0)print 'fff't = with_timeout(1,f,timeout_value=10)
assert t == 10

注意with_timeout必须有timeout_value参数时才不会抛Timeout异常。

到这里我们对gevnet的底层应该都很熟悉了,对gevent还未介绍到的就是一些高层的东西,如Event,Pool等,后期也会单独拿出来

讲讲。我觉得还需要关注的就是libev的使用,不过这就需要我们深入分析core.pyx的libev cython扩展了,这需要cython的知识,最近我也一直在看源码,后期也会和大家分享。

至于为什么要分析libev的扩展呢?主要是在游戏中有一些定时执行的任务,通过gevent现有的实现比较蹩脚,其实libev提供的timer有两个参数,一个after,一个repeat,after是多久以后启动该定时器,repeat是多次以后再次启动,这刚好满足我的需求,

下面就是我写的一个简单的定时任务脚本,通过gfirefly启动,还提供了web接口。

#coding:utf-8
'''
Created on 2014-9-5@author: http://blog.csdn.net/yueguanghaidao
'''
import traceback
import datetime
from flask import request
from gevent.hub import get_hub
from gtwisted.utils import log
from gfirefly.server.globalobject import webserviceHandle
from app.models.role import Role'''
定时任务任务名 (运行时间(0-24),每次间隔)单位为小时,回调函数均为do_name
'''CRONTAB = {"energy": (0, 1), #恢复体力"god_surplustime": (0, 24),"arena_surplustime": (22, 24),"arena_rewrad": (21, 24),"sign_reset": (1, 24)
}def log_except(fun):def wrapper(*args):try:log.msg(fun.__name__)return fun(args)except:log.msg(traceback.format_exc())return wrapperclass Task(object):"""所有定时任务"""@classmethod@log_exceptdef do_energy(cls):"""每一个小时增加1体力(体力小于8)"""Role.objects(energy__lt=8).update(inc__energy=1)@classmethod@log_exceptdef do_god_surplustime(cls):"""财神剩余次数"""Role.objects(god__exists=True).update(set__god__surplustime=10)@webserviceHandle("/cron", methods=['GET', 'POST'])
def cron():"""提供web接口调用"""action = request.args.get("action")if not action:return "action:<br/><br/>"+"<br/>".join(( a for a in CRONTAB))else:try:f = getattr(Task, "do_"+action)try:f()except:return traceback.format_exc()return "success"except AttributeError:return "action:<br/><br/>"+"<br/>".join(( a for a in CRONTAB))def timer(after, repeat):return get_hub().loop.timer(after, repeat)def run():log.msg("cron start")#配置mongodbmongoconfig.init_Mongo()for action, t in CRONTAB.items():log.msg("%s start" % action)f = getattr(Task, "do_"+action)now = datetime.datetime.now()other = now.replace(hour=t[0],minute=0,second=0)if other > now:after = (other-now).secondselse:after = 24*3600-(now-other).seconds#after = t[0]*3600timer(after, t[1]*3600).start(f)run()

[gevent源码分析] gevent两架马车-libev和greenlet相关推荐

  1. gevent源码初探-wsgi例子解析

    gevent源码分析 本文环境gevent-0.9.0. gevent简介 gevent是Python的一个并发框架,以协程库greenlet为基础,基于libev的高性能IO复用机制,其中可以使用m ...

  2. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  3. v45.05 鸿蒙内核源码分析(Fork) | 一次调用 两次返回 | 百篇博客分析HarmonyOS源码

    孔子于乡党,恂恂如也,似不能言者.其在宗庙朝廷,便便言,唯谨尔. <论语>:乡党篇 百篇博客系列篇.本篇为: v45.xx 鸿蒙内核源码分析(Fork篇) | 一次调用 两次返回 进程管理 ...

  4. celery源码分析-wroker初始化分析(上)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery与Django的配合使用 首先,在安装有django的环境中创建一个django ...

  5. celery源码分析:multi命令分析

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery简介 celery是一款异步任务框架,基于AMQP协议的任务调度框架.使用的场景 ...

  6. 深入理解Spark:核心思想与源码分析

    大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术 ...

  7. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

  8. YYCache 源码分析(一)

    iOS 开发中总会用到各种缓存,YYCache或许是你最好的选择.性能上有优势,用法也很简单.作者ibireme曾经对比过同类轮子:http://blog.ibireme.com/2015/10/26 ...

  9. Docker源码分析(一):Docker架构

    1 背景 1.1 Docker简介 Docker是Docker公司开源的一个基于轻量级虚拟化技术的容器引擎项目,整个项目基于Go语言开发,并遵从Apache 2.0协议.目前,Docker可以在容器内 ...

最新文章

  1. DARPA新局长维多利亚·科尔曼展望未来发展
  2. 很抱歉,这场大会我们没法卖票给你了
  3. 鸿蒙系统明年上市巧,鸿蒙系统官网下载-鸿蒙系统官网下载手机版 v2.0下载-955游戏网...
  4. Nginx服务测试时的一些配置:wireshark、常用搜索URL格式、关闭防火墙、siege
  5. 【PAT乙级】1077 互评成绩计算 (20 分)
  6. linux 系统迁移到固态硬盘,windows 和 Linux 系统 从硬盘迁移到SSD
  7. Micro-CMS v1
  8. ZJOI2005午餐
  9. 华为服务器芯片总在pc,服务器芯片 华为
  10. Java I/O体系详细讲解
  11. eclipse adt bundle不显示Android SDK菜单
  12. 程序员有话说 | 我成了敲代码的“佛教徒”
  13. Centos 7.2 安装Docker CE实践并配置加速器
  14. 【高级PDF库】上海道宁为您提供先进的.Net库,完全控制您的PDF创建工作流程,在WEB或任何服务器系统上创建动态PDF
  15. nginx过滤HttpHeader的 中划线
  16. 请编写函数实现自然底数 e=2.718281828
  17. Linux文件管理及用户命令
  18. 【数学建模和matlab】反思与总结(1)
  19. (二)海思3519av100开发:开发板环境搭建
  20. configure: error: Cannot find ldap libraries in /usr/lib

热门文章

  1. Bright Star - John Keats
  2. sim808模块收发送短信
  3. 20200404-斜坡补偿
  4. Word中如何创建自动编号的标题?
  5. 多智能体强化学习:鼓励共享多智能体强化学习中的多样性
  6. 中关村互联网教育创新中心:这里是互联网教育的主场
  7. 网络安全星球推荐,进入后从小白变大神
  8. js网页简繁体切换cookie记录状态
  9. 虚幻4 虚拟漫游场景 制作过程
  10. emq查看状态“node emqx@127.0.0.1 not responding to pings”