2010-01-07 javaeye http://angeloce.iteye.com/admin/blogs/565333

============================

>>>from twisted.internet import reactor
>>>reactor
<twisted.internet.selectreactor.SelectReactor object at 0x01C5BFD0>

reactor本来是一个模块,怎么变成对象了?

查看 reactor.py, 看到就一个模块方法selectreactor.install()

查看install方法:

def install():"""Configure the twisted mainloop to be run using the select() reactor."""reactor = SelectReactor()from twisted.internet.main import installReactorinstallReactor(reactor)

这里生成了一个SelectReactor的对象,似乎就是我们要找的reactor.

再查看main.py

def installReactor(reactor):# this stuff should be common to all reactors.import twisted.internetimport sysassert not sys.modules.has_key('twisted.internet.reactor'), \"reactor already installed"twisted.internet.reactor = reactorsys.modules['twisted.internet.reactor'] = reactor

哦, reactor已经被偷梁换柱了.

回到selectreactor.py, 看看 SelectReactor 类是个什么东西.

SelectReactor 继承父类posixbase.PosixReactorBase, 本身增加了一些方法, 似乎看不出什么.那我们就去posixbase.py看看他爸爸是干什么的.

PosixReactorBase继承两个父类_SignalReactorMixin 和 ReactorBase.  先不管其他,  寻根溯源,这两个父类都来自于internet.base模块.

好吧, 找到这里算是到头了. ReactorBase 作为 "Reactor" 的基类, 提供了reactor大部分及其重要的方法, 另一些重要的方法由_SignalReactorMixin来扩展. 下面做下详细的分析.

# 一般来说, 建立一个服务器基本遵循以下几个步骤
# 以建立一个最基本的TCP服务器为例#1
reactor.listenTCP(PORT, Factory())#2
reactor.run()

对于#1比较好理解, 在posixbase.PosixReactorBase中

def listenTCP(self, port, factory, backlog=50, interface=''):"""@see: twisted.internet.interfaces.IReactorTCP.listenTCP"""p = tcp.Port(port, factory, backlog, interface, self)p.startListening()return p# 其中包括socket的建立,绑定等等一系列手续.
# 详细内容以后再表.

对于#2, 在base.SignalReactorMixin中

def run(self, installSignalHandlers=True):self.startRunning(installSignalHandlers=installSignalHandlers)self.mainLoop()def mainLoop(self):while self._started:try:while self._started:# Advance simulation time in delayed event# processors.self.runUntilCurrent()t2 = self.timeout()t = self.running and t2self.doIteration(t)except:log.msg("Unexpected error in main loop.")log.err()else:log.msg('Main loop terminated.')

reactor一直孜孜不倦地执行两个方法:self.runUntilCurrent和 self.doIteration. 看看这两个函数都是干什么的:

# 在ReactorBase中, runUntilCurrent方法主要做了两件事,
# 把self.threadCallQueue和self.pendingTimedCalls 里的对象执行一遍
def runUntilCurrent(self):if self.threadCallQueue:# Keep track of how many calls we actually make, as we're# making them, in case another call is added to the queue# while we're in this loop.count = 0total = len(self.threadCallQueue)for (f, a, kw) in self.threadCallQueue:try:f(*a, **kw)except:log.err()count += 1if count == total:breakdel self.threadCallQueue[:count]if self.threadCallQueue:if self.waker:self.waker.wakeUp()# insert new delayed calls nowself._insertNewDelayedCalls()now = self.seconds()while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):call = heappop(self._pendingTimedCalls)if call.cancelled:self._cancellations-=1continueif call.delayed_time > 0:call.activate_delay()heappush(self._pendingTimedCalls, call)continuetry:call.called = 1call.func(*call.args, **call.kw)except:log.deferr()if hasattr(call, "creator"):e = "\n"e += " C: previous exception occurred in " + \"a DelayedCall created here:\n"e += " C:"e += "".join(call.creator).rstrip().replace("\n","\n C:")e += "\n"log.msg(e)if (self._cancellations > 50 andself._cancellations > len(self._pendingTimedCalls) >> 1):self._cancellations = 0self._pendingTimedCalls = [x for x in self._pendingTimedCallsif not x.cancelled]heapify(self._pendingTimedCalls)if self._justStopped:self._justStopped = Falseself.fireSystemEvent("shutdown")
# 回到SelectReactor中,查看 doSelect(doIteration)方法
# _select既是select.select函数
# self._reads和self._writes内存储的应该都是类文件操作符,比如socket..
# 再看下self._doReadOrWrite方法,会发现所有的reader/writer都执行自身
# 的 doRead/doWrite方法.def doSelect(self, timeout):"""Run one iteration of the I/O monitor loop.This will run all selectables who had input or output readinesswaiting for them."""while 1:try:r, w, ignored = _select(self._reads.keys(),self._writes.keys(),[], timeout)breakexcept ValueError, ve:# Possibly a file descriptor has gone negative?log.err()self._preenDescriptors()except TypeError, te:# Something *totally* invalid (object w/o fileno, non-integral# result) was passedlog.err()self._preenDescriptors()except (select.error, IOError), se:# select(2) encountered an errorif se.args[0] in (0, 2):# windows does this if it got an empty listif (not self._reads) and (not self._writes):returnelse:raiseelif se.args[0] == EINTR:returnelif se.args[0] == EBADF:self._preenDescriptors()else:# OK, I really don't know what's going on.  Blow up.raise_drdw = self._doReadOrWrite_logrun = log.callWithLoggerfor selectables, method, fdset in ((r, "doRead", self._reads),(w,"doWrite", self._writes)):for selectable in selectables:# if this was disconnected in another thread, kill it.# ^^^^ --- what the !@#*?  serious!  -exarkunif selectable not in fdset:continue# This for pausing input when we're not ready for more._logrun(selectable, _drdw, selectable, method, dict)

好吧,从上面基本可以看出, reactor在run循环里做了两件事, 执行线程队列和延迟对象队列,操作类文件对象符.

对于线程队列和延迟对象队列, 还比较好理解.

对于类文件对象的队列, reactor 是什么时候把它们加进的呢?

写道
# 插播 ReactorBase.callLater方法 
# 执行callLater后reactor把DelayedCall对象存放在_newTimedCalls队列中 
# 在执行ReactorBase.runUntilCurrent时, 
# reactor执行了_insertNewDelayedCalls 方法 
# 把_newTimedCalls内的数据存入_pendingTimedCalls队列中 
def callLater(self, _seconds, _f, *args, **kw): 
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw, 
self._cancelCallLater, 
self._moveCallLaterSooner, 
seconds=self.seconds) 
self._newTimedCalls.append(tple) 
return tple

# 同样对于thread 
# callFromThread方法也是把thread存入到threadCallQueue中 
# 直到在runUntilCurrent中执行

def callFromThread(self, f, *args, **kw): 
self.threadCallQueue.append((f, args, kw))

观看上面的代码, reactor似乎没有主动加入过 reader/writer, reactor如何操作socket的呢?

重新想象reactor在run之前还做过什么?

对了, 连接/建立连接!

就如reactor.listenTCP

def listenTCP(self, port, factory, backlog=50, interface=''):p = tcp.Port(port, factory, backlog, interface, self)p.startListening()return p

看看tcp.Port的设计

tcp.Port继承于base.BasePort 和 tcp._SocketCloser,

而base.BasePort 继承于abstract.FileDescriptor, 一个抽象的文件操作符类

tcp.Port实例化时没有做太多动作, 我们聚焦在方法 startListening 上

# tcp.Port.startListening 生成并绑定了一个socket
# 也没有做什么过多的动作, 直接看看最下面的startReadingdef startListening(self):try:skt = self.createInternetSocket()skt.bind((self.interface, self.port))except socket.error, le:raise CannotListenError, (self.interface, self.port, le)# Make sure that if we listened on port 0, we update that to# reflect what the OS actually assigned us.self._realPortNumber = skt.getsockname()[1]log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))# The order of the next 6 lines is kind of bizarre.  If no one# can explain it, perhaps we should re-arrange them.self.factory.doStart()skt.listen(self.backlog)self.connected = Trueself.socket = sktself.fileno = self.socket.filenoself.numberAccepts = 100self.startReading()
# 一直找到abstract.FileDescriptor.startReading
# 执行了reactor.addReaderdef startReading(self):"""Start waiting for read availability."""self.reactor.addReader(self)# selectreactor.SelectReactor.addReader指明了
# 一个tcp.Port对象被作为reader加入到了reactor的reads队列中
def addReader(self, reader):"""Add a FileDescriptor for notification of data available to read."""self._reads[reader] = 1

原来在这里, 在reactor.listenTCP时候就被加入到了reader队列中.

赶紧回头看看, 在 selectreactor.SelectReactor.doSelect中,如果一个类文件操作符状态改变了,会执行其doRead/doWriter方法.那去看看作为reader的tcp.Port的doRead方法.

# tcp.Port的socket接受了一个连接,
# 并执行了self.factory.buildProtocol方法生成一个portocol
# 通过self.transport生成了一个tcp.Server对象def doRead(self):try:if platformType == "posix":numAccepts = self.numberAcceptselse:# win32 event loop breaks if we do more than one accept()# in an iteration of the event loop.numAccepts = 1for i in range(numAccepts):# we need this so we can deal with a factory's buildProtocol# calling our loseConnectionif self.disconnecting:returntry:skt, addr = self.socket.accept()except socket.error, e:if e.args[0] in (EWOULDBLOCK, EAGAIN):self.numberAccepts = ibreakelif e.args[0] == EPERM:# Netfilter on Linux may have rejected the# connection, but we get told to try to accept()# anyway.continueelif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):log.msg("Could not accept new connection (%s)" % (errorcode[e.args[0]],))breakraiseprotocol = self.factory.buildProtocol(self._buildAddr(addr))if protocol is None:skt.close()continues = self.sessionnoself.sessionno = s+1transport = self.transport(skt, protocol, addr, self, s, self.reactor)transport = self._preMakeConnection(transport)protocol.makeConnection(transport)else:self.numberAccepts = self.numberAccepts+20except:log.deferr()

虽然还有点迷糊, 不过知道了protocol对象产生于此处.那这个产生的transport实例具体作用是什么呢?

先看下 protocol.makeConnection

# protocol.BaseProtocol
def makeConnection(self, transport):self.connected = 1self.transport = transportself.connectionMade()

看到了一个熟悉的方法connectionMade!

protocol的三个事件方法 connectionMade, dataReceived, connectionLost是protocol最重要的三个方法了.

其一出现了, 剩下的两个是在何处被触发的呢?

先不急, 先看看transport 是怎么回事:

tcp.Server 来自于 父类 tcp.Connection. 而Connection继承于abstract.FileDescriptor,又是一个类文件符.

tcp.Server实例时还是做了点小动作的

# tcp.Server
def __init__(self, sock, protocol, client, server, sessionno, reactor):Connection.__init__(self, sock, protocol, reactor)self.server = serverself.client = clientself.sessionno = sessionnoself.hostname = client[0]self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,sessionno,self.hostname)self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,self.sessionno,self.server._realPortNumber)self.startReading()self.connected = 1

self.startReading从 abstract.FileDescriptor上知晓是把 该实例作为reader加入到reactor队列中的.

那我们就看看tcp.Server的doRead方法

# tcp.Connection
def doRead(self):"""Calls self.protocol.dataReceived with all available data.This reads up to self.bufferSize bytes of data from its socket, thencalls self.dataReceived(data) to process it.  If the connection is notlost through an error in the physical recv(), this function will returnthe result of the dataReceived call."""try:data = self.socket.recv(self.bufferSize)except socket.error, se:if se.args[0] == EWOULDBLOCK:returnelse:return main.CONNECTION_LOSTif not data:return main.CONNECTION_DONEreturn self.protocol.dataReceived(data)

眼前一亮, dataReceived方法!

转载于:https://www.cnblogs.com/c9com/archive/2013/01/05/2845552.html

twisted reactor解剖相关推荐

  1. twisted reactor 实现源码解析

    twisted reactor 实现源码解析 1.      reactor源码解析 1.1.    案例分析代码: from twisted.internet import protocol fro ...

  2. python twised系列教程四–twisted Poetry client

    我们第一个twisted client 尽管twisted 经常被用来写server端的,但client往往会比较简单些,我们就以最简单的client 开始.源代码在twisted-client-1/ ...

  3. (转) Twisted 第四部分: 由Twisted支持的诗歌客户端

    2019独角兽企业重金招聘Python工程师标准>>> 第一个twisted支持的诗歌服务器 尽管Twisted大多数情况下用来写服务器代码,为了一开始尽量从简单处着手,我们首先从简 ...

  4. 对twisted诗歌服务器的总结和笔记

    差不多两个月之前的时候看过一段时间的twisted源码和诗歌服务器的教程,但是当时的笔记都记在笔记本,两个月之后想要再用的时候印象又已经模糊了.况且当时对于事件驱动和异步回调的理解没有现在深,系统地看 ...

  5. Twisted 框架简介

    Twisted 框架简介 Twisted 框架介绍 创建Twisted Reactor TCP 服务器 创建Twisted Reactor TCP 客户端 Twisted 框架介绍 Twisted 是 ...

  6. Twisted 入门 教程

    GitHub 地址:https://github.com/likebeta/twisted-intro-cn/tree/master/zh             https://github.com ...

  7. scrapy之settings参数

    #==>第一部分:基本配置<=== #1.项目名称,默认的USER_AGENT由它来构成,也作为日志记录的日志名 BOT_NAME = 'Amazon'#2.爬虫应用路径 SPIDER_M ...

  8. python 关闭窗口事件_关于python:如何在Tkinter中处理窗口关闭事件?

    如何在Python Tkinter程序中处理窗口关闭事件(用户单击" X"按钮)? Tkinter支持一种称为协议处理程序的机制.在这里,术语协议是指应用程序和窗口管理器之间的交互 ...

  9. Scrapy源码阅读分析_5_Scrapy-settings源码分析

    From:https://blog.csdn.net/weixin_37947156/article/details/74972642 The global defaults are located ...

最新文章

  1. Elasticsearch环境搭建
  2. Java中的Annotation(2)----Annotation工作原理
  3. 2017前端框架何去何从
  4. 下拉列表select显示ng-options
  5. 【操作系统】进程的创建与终止过程中的父子进程
  6. 微软取消被指下流的Windows 10更新方法
  7. 帝国列表页分开调取年份和月份单独调用的方法?
  8. ElasticSearch从入门到精通:Logstash妙用
  9. 原神 - 米游社 每日签到
  10. dest在C语言什么作用,dest(车的dest是什么意思)
  11. 你必须认识的五名网络女红人
  12. 书名带冒号_冒号_冒号的用法和作用_冒号怎么打_标点符号网
  13. 正规蓝牙耳机一般多少钱?音质好又便宜的蓝牙耳机
  14. Bootstrap【第二章】全局CSS之排版代码表格
  15. 中国人民解放军郑州计算机学院官网,解放军信息工程大学录取分数线2021
  16. php不是当前时间,php取得时间与当前时间不一样
  17. 嵌入式软件设计第7次实验报告-140201235-陈宇
  18. 新年上班第一天生产环境分布式文件系统崩了!!
  19. web服务器 ---nginx 虚拟主机的创建(基于 域名 . 端口 . ip )以及nginx访问控制
  20. 基于用户协同过滤算法的电影打分与推荐

热门文章

  1. 盘点微软 .NET 技术八年发展历程转
  2. 入门数据挖掘(二手车交易价格预测案例)(二):特征工程
  3. 如何在 Linux 中检查打开的端口?
  4. kindeditor编辑器代码过滤解决方法.
  5. 计算基本统计值,输出一组数据的平均值,方差,众数和中位数
  6. 马里奥制造2正在维护服务器,超级马里奥制造
  7. 西餐和计算机专业哪个好,烹饪专业学校前十排名有哪些
  8. axios封装 —— 数据缓存、防止重复请求、动态加载
  9. 叹服!华为高工手写344页高性能Java架构核心原理实战大神手册
  10. 春秋云镜 CVE-2022-24112