Python twisted事件驱动网络框架 源码剖析
一、Twisted简介
Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。
事件驱动简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。
Protocols
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
makeConnection 在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
Transports
Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
二、源码分析
EchoServer:
from twisted.internet import protocol
from twisted.internet import reactor #reactor无限循环,写好了事件,reactor自动检测,类似于selectclass Echo(protocol.Protocol):def dataReceived(self, data): #只要twisted一收到数据,就会调用dataRecevied方法self.transport.write(data) #把收到的数据返回给客户端def main():factory = protocol.ServerFactory() #定义一个基类,类似于socketserver的handler上面一层的类factory.protocol = Echo #类似于socketserver中的handler,必须定义此Echo,表明每一个客户端过来后都会调用Echo建立一个实例reactor.listenTCP(1234,factory) #reactor类似于select,是一个触发器,检测1234端口,需要把定义的基础类传进来reactor.run() #reactor执行if __name__ == '__main__':main()
EchoClient:
from twisted.internet import reactor, protocol# a client protocolclass EchoClient(protocol.Protocol):"""Once connected, send a message, then print the result."""def connectionMade(self): #只要连接一建立成功,就会自动调用此方法self.transport.write("hello!") #给服务端发送hello def dataReceived(self, data): #当有数据收到时,就会调用这个方法,自动进行"As soon as any data is received, write it back."print "Server said:", data #收到数据后打印数据self.transport.loseConnection() #数据传送完毕后,关闭连接,执行了下面的方法 |
# v
# ---------<--------------<-----------------<----------------<---
# |
# vdef connectionLost(self, reason): #client connection断开了,会执行此方法,此为自己定义的connectionLost方法print "connection lost"class EchoFactory(protocol.ClientFactory):protocol = EchoClient #在类中定义protocal,重写这个类;EchoClient相当于socketserver中的handle方法def clientConnectionFailed(self, connector, reason): #如果reactor连接不上服务端,自动调用这方法print "Connection failed - goodbye!" #打印连接失败信息reactor.stop() #关闭连接def clientConnectionLost(self, connector, reason): #如果client connection断开了,会自动调用此方法,类似于socketserver的handle后面的finish方法,和上面的connectionLost方法不同。print "Connection lost - goodbye!" #打印连接断开信息reactor.stop() #关闭连接# this connects the protocol to a server running on port 8000
def main():f = EchoFactory() #创建一个客户端的基类,与服务端的ServerFactory类似reactor.connectTCP("localhost", 1234, f) #连接'localhost',端口号,把客户端的基类传入reactorreactor.run() #运行reactor# this only runs if the module was *not* imported
if __name__ == '__main__':main() #程序入口,进入主程序
运行服务器端脚本将启动一个TCP服务器,监听端口1234上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP连接,回显服务器端的回应然后终止连接并停止reactor事件循环。这里的 Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。
三、RPC客户端向服务端发送命令源码分析:
RPC server:
#Project interpreter: 2.7
import pika, os, timedef operate(body):sys_result=os.popen(body).read()print("%s client execute \033[1;31;0m%s\033[0m result:\n%s" % (time.strftime('%Y-%m-%d %H:%M:%S'),body,sys_result))return sys_resultdef on_request(ch, method, props, body):response = operate(body)ch.basic_publish(exchange='', #basic_publish指向管道内发送数据routing_key=props.reply_to, #指定向哪个队列发数据properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response)) #body是发送的消息内容ch.basic_ack(delivery_tag = method.delivery_tag)if __name__ == '__main__':try:connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #这是阻塞的连接channel = connection.channel() #生成一个管道channel.queue_declare(queue='rpc_queue') #在管道中创建一个队列,名字叫rpc_queuechannel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print("Server is waiting RPC requests...")channel.start_consuming() #开始接收数据,阻塞状态except KeyboardInterrupt:print("Connection lost...")
RPC client:
#Project interpreter: 2.7
import pika, uuidclass OperateRpcClient(object): #对类进行实例化def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将此queue删除self.callback_queue = result.method.queue #服务端执行完结果返回的queue名字self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #no_ack不需要确认,如果为False,当客户端消费完成后,会给服务端发送确认消息;queue参数指定了收取消息队列的名称def on_response(self, ch, method, props, body): #回调方法if self.corr_id == props.correlation_id:self.response = bodydef call(self):self.response = Noneself.corr_id = str(uuid.uuid4())self.input = raw_input("root@client>> ")self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=self.input)while self.response is None:self.connection.process_data_events() #不断的去Queue里面接收数据,而且不是阻塞的return self.responseif __name__ == '__main__':print("This program use rabbitMQ send your OS command to server, your can use common command at here, enjoy it!\n\te.g.\n\t\t1.ls\n\t\t2.pwd\n\t\t3.free -m\n\t\t4.df -Th\n\t\t5.netstat -anplute")while True:try:operate_rpc = OperateRpcClient()response = operate_rpc.call()print(response)except KeyboardInterrupt:print("Connection lost...")
转载于:https://www.cnblogs.com/stefan-liu/p/5307377.html
Python twisted事件驱动网络框架 源码剖析相关推荐
- 【java集合框架源码剖析系列】java源码剖析之ArrayList
注:博主java集合框架源码剖析系列的源码全部基于JDK1.8.0版本. 本博客将从源码角度带领大家学习关于ArrayList的知识. 一ArrayList类的定义: public class Arr ...
- SpringDataJPA+Hibernate框架源码剖析(六)@PersistenceContext和@Autowired注入EntityManager的区别
SpringDataJPA+Hibernate框架源码剖析系列文章: SpringDataJPA+Hibernate框架源码剖析(一)框架介绍 SpringDataJPA+Hibernate框架源码剖 ...
- python字符串代码对象_Python源码剖析 - Python中的字符串对象
1. 前言 我们已经在 [Python中的整数对象] 章节中对定长对象进行了详细的讲解,接下来我们将介绍变长对象,而字符串类型,则是这类对象的典型代表. 这里必须先引入一个概念: Python 中的变 ...
- 【java集合框架源码剖析系列】java源码剖析之java集合中的折半插入排序算法
注:关于排序算法,博主写过[数据结构排序算法系列]数据结构八大排序算法,基本上把所有的排序算法都详细的讲解过,而之所以单独将java集合中的排序算法拿出来讲解,是因为在阿里巴巴内推面试的时候面试官问过 ...
- Java集合框架源码剖析:LinkedHashSet 和 LinkedHashMap
Java LinkedHashMap和HashMap有什么区别和联系?为什么LinkedHashMap会有着更快的迭代速度?LinkedHashSet跟LinkedHashMap有着怎样的内在联系?本 ...
- Mina2.0框架源码剖析(八)
这篇来看看AbstractPollingIoConnector抽象类,它用于用于实现客户端连接的轮询策略.处理逻辑基本上和上一篇文章说的AbstractPollingIoAcceptor类似,它继承自 ...
- 一个完整的python项目源码-一个Python开源项目-腾讯哈勃沙箱源码剖析(上)
前言 2019年来了,2020年还会远吗? 请把下一年的年终奖发一下,谢谢... 回顾逝去的2018年,最大的改变是从一名学生变成了一位工作者,不敢说自己多么的职业化,但是正在努力往那个方向走. 以前 ...
- Python envoy 模块源码剖析
Kenneth Reitz 是公认的这个世界上 Python 代码写得最好的人之一.抱着学习的心态,我阅读了 Reitz 写的 envoy 模块的源码,将笔记记录如下. 介绍 和 requests 模 ...
- python源码剖析 豆瓣_在数据分析师的分析中豆瓣的书那些值得读
最近总是有人问我有什么书好推荐看看,特烦.但是看到那么多人问,看来挺多人有这个需求,便想了一下,如何通过数据分析找到值得看的书.通过爬取某个标签例如产品,运营获取对应已经打了标签的书,获取书对应的评分 ...
- Python源码剖析[1] —— 编译Python
[ 绝对原创,转载请注明出处] 注意 :第一部分Python总体架构采用了网络文档<The Architecture of Python>,这是网络上唯一可见的以剖析Python实现为己任 ...
最新文章
- 查看linux主机是否安装宋体码,Linux 安装宋体字体的简单办法
- 在Linux上如何安装Oracle数据库
- ArcGIS Server 部署与配置
- 【Python基础】Python列表生成式
- git 远程分支创建与推送
- 如何快速过滤出一次请求的所有日志?
- linux 安全审计功能,数据库安全审计在数据安全中的功能
- python做系统查人的信息_Python综合项目之员工信息查询
- 1000以内所有同构数java算法_使用c语言求1到1000同构数的代码
- 测试用例集-8.公交卡测试用例
- fatal: HttpRequestException encountered (附:网盘下载地址)
- 网页为什么只加载了基本html,关于HTML的那些事
- windows C盘自动清理bat脚本
- haneWIN搭建Win10 NFS服务器
- adb最新版下载地址
- 快递单号的正则提取试
- 基于HTML、CSS、JavaScript、jQuery的app小项目--简易备忘录
- VB编程:取整函数Int、CInt、Fix区别-30
- djay Pro 2 Mac(DJ混音软件) v2.0.11激活版
- python之路day3_python之路:day3
热门文章
- java 文件夹拷贝(文件夹里包含文件和文件夹) 代码
- [导入]Text To Picture
- 【WPF】对Frame控件的Content属性做绑定时出现的一个小问题
- requests 上传本地文件到服务器
- os.environ 和 keras.json
- 详解机器翻译前沿技术及落地应用
- 分类问题的评估指标一览
- 【干货】图文并茂生动详解命名实体识别NER理论与代码实战
- Ubuntu16.04下Nvidia+Cuda8.0+Dynet安装教程
- 6.6 AdaBoost实战