第十一章 网络与WEB编程

11.1 作为客户端与HTTP服务交互

requests模块

11.2 创建 TCP 服务器

简单的应答服务器

from socketserver import BaseRequestHandler, TCPServerclass EchoHandler(BaseRequestHandler):def handle(self):print('Got conection from', self.client_address)while True:msg = self.request.recv(8192)if not msg:breakself.request.send(msg)if __name__ == '__main__':serv = TCPServer(('', 20000), EchoHandler)serv.serve_forever()

handle 用来为客户端连接服务。request 是客户端的 socket,client_address 有客户端的地址。
连接这个服务器:

from socket import socket, AF_INET, SOCK_STREAM
s = socket(AF_INET, SOCK_STREAM)
s.connect(('localhost', 20000))
s.send(b'Hello')
s.recv(8192)

把类文件接口放到底层socket:

from socketserver import StreamRequestHandler, TCPServer
class EchoHandler(StreamRequestHandler):def handle(self):print('Got connection from', self.client_address)for line in self.rfile:self.write(line)if __name__ == '__main__':serv = TCPServer(('', 20000), EchoHandler)serv.serve_forever()

默认情况下 socketserver 创建的服务器是单线程的,一次只能为一个客户端连接。使用 ForkingTCPServer 或 ThreadingTCPServer 能处理多个客户端。

from socketserver import ThreadingTCPServerif __name__ == '__main__':serv = ThreadingTCPServer('', 20000), EchoHandler)serv.serve_forever()

这种方法会为每个客户端连接创建一个进程或线程。为了避免客户端连接过多导致服务器崩溃,可以创建一个预先分配大小的工作线程池或进程池。
先创建一个非线程服务器,然后在一个线程池中使用 server_forever()方法启动这些连接。

if __name__ == '__main__':from threading import ThreadNWORKERS = 16serv = TCPSeerver(('', 20000), EchoHandler)for n in range(NWORKERS):t = Thread(target=serv.serve_forever)t.daemon = Truet.start()serv.serve_forever()

TCPServer 在实例化的时候会绑定并激活相应的 socket。如果想通过某些参数调整底层的 socket,可以设置 bind_and_activate=False。

if __name__ == '__main__':serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)serv.server_bind()serv.server_activate()serv.server_forever()

StreamRequestHandler 能通过设置类变量来支持一些特性。

import socketclass EchoHandler(StreamRequestHandler):timeout = 5     # 全部socket操作的响应时间rbufsize = -1   # 读缓冲大小wbufsize = 0    # 写缓冲大小disable_nagle_algorithm = False # 设置socket的TCP_NODELAY选项def handle(self):print('Got connection of', self.client_address)try:for line in self.rwrite:self.wfile.write(line)except socket.timeout:print('Timed out!')

大部分 Python 的高层网络模块都是建立在 socketserver 功能上的。直接使用 socket 库就能实现 TCP 服务器。

from socket import socket, AF_INET, SOCK_STREAMdef echo_handler(address, client_sock):pritn('Got connection of', address)while True:msg = client_sock.recv(8192)if not msg:breakclient_sock.sendall(msg)client_sock.close()def echo_server(address, backlog=5):sock = socket(AF_INET, SOCK_STREAM)sock.bind(address)sock.listen(backlog)while True:client_sock, client_addr = sock.accept()echo_handler(client_addr, client_address)if __name__ == '__main__':echo_server(('', 20000))

11.3 创建UDP服务器

简单的时间服务器:

from socketserver import BaseRequestHandler, UDPServer
import timeclass TimeHandler(BaseRequestHandler):def handle(self):print('Got connection of', self.client_address)msg, sock = self.requestresp = time.ctime()sock.sendto(resp.encode('ascii'), self.client_address)if __name__ == '__main__':serv = UDPServer(('', 20000), TimeHandler)serv.serve_forever()

UDP 服务器接收到达的数据报和客户端地址,需要应答的时候,服务器给客户端回发一个数据报。
UDPServer 类是单线程的。
直接使用 socket 实现 UDP 服务器:

from socket import socket, AF_INET, SOCK_DGRAM
import timedef time_server(address):sock = socket(AF_INET, SOCK_DGRAM)sock.bind(address)while True:msg, addr = sock.recvfrom(8192)print('Got connection of', addr)resp = time.ctime()sock.sendto(resp.encode('ascii'), addr)if __name__ == '__main__':time_server(('', 20000))

11.4 通过 CIDR 地址生成对应的 IP 地址集

从 CIDR 网络地址得到它代表的所有 IP。
ipaddress模块在操作网络地址时很有用。

import ipaddress
net = ipaddress.ip_network('123.45.67.64/27')
# 可以迭代和索引
# 成员判断
a = ipaddress.ip_address('123.45.67.69')
a in net

11.5 创建一个简单的 REST 接口

不安装web框架,只用一个简单的 REST 接口通过网络远程控制或访问应用程序。
最简单的方法是创建一个基于 WSGI 标准的库:

# resty.py
import cgidef notfound_404(environ, start_response):start_response('404 Not Found', [('Content-type', 'text/plain')])class PathDispatcher():def __init__(self):self.pathmap = {}def __call__(self, environ, start_response):path = environ['PATH_INFO']params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)method = environ['REQUEST_METHOD'].lower()environ['params'] = {key: params.getvalue(key) for key in params}handler = self.pathmap.get((method, path), notfound_404)return handler(environ, start_response)def register(self, method, path, function):self.pathmap[method.lower(), path] = functionreturn function

使用这个调度器,需要编写不同的处理器:

import time_hello_resp = '''\
<html><head><title>Hello {name}</title></head><body><h1>Hello {name}</h1></body>
</html>'''def hello_world(environ, start_response):start_response('200 OK', [('Content-type', 'text/html')])params = environ['params']resp = _hello_resp.format(name=params.get('name'))yield resp.encode('utf-8')_localtime_resp = '''
<?xml version="1.0"?>
<time><year>{t.tm_year}</year><month>{t.tm_mon}</month><day>{t.tm_mday}</day><hour>{t.tm_hour}</hour><minute>{t.tm_min}</minute><second>{t.tm_sec}</second>
</time>'''def localtime(environ, start_response):start_response('200 OK', [('Content-type', 'applicatin/xml')])resp = _localtime_resp.format(t=time.localtime())yield resp.encode('utf-8')if __name__ == '__main__':from resty import PathDispatcherfrom wsgiref.simple_server import make_server# 创建调度器和注册的函数dispatcher = PathDispatcher()dispatcher.register('Get', '/hello', hello_world)dispatcher.register('Get', '/localtime', localtime)# 启动基本服务http = make_server('', 8080, dispatcher)print('Serving on port 8080...')http.serve_forever()

使用浏览器或 urllib 与这个服务器交互。
REST 接口一般服务于普通的 HTTP 请求,通常只需要处理数据。
实现一个简单的 REST 接口,只需要满足 Python 的 WSGI 标准即可。

import cgidef wsgi_app(environ, start_response):method = environ['REQUEST_METHOD']path = environ['PATH_INFO']# 解析请求的参数params = cgi.FieldStorage(environ['wsgi.input'], environ=rnviron)

environ, 字典, 包含从服务器的CGI接口获取的值。
start_response 是一个为了初始化一个请求对象而必须调用的参数。第一个参数是返回的 HTTP 状态值,第二个参数是一个(名, 值)元组列表,用来构建返回的 HTTP 头。
为了返回数据,一个 WSGI 程序必须返回一个字节字符串序列。

11.6 通过 XML-RPC 实现简单的远程调用

执行运行在远程机器上的 Python 程序中的函数或方法。
一个实现了键-值存储功能的简单服务器:

from xmlrpc.server import SimpleXMLRPCServerclass KeyValueServer:_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']def __init__(self, address):self._data = {}self._serv = SimpleXMLRPCServer(address, allow_none=True)for name in self._rpc_methods_:self._serv.register_function(getattr(self, name))def get(self, name):return self._data[name]def set(self, name, value):self._data[name] = valuedef delete(self, name):del self._data[name]def exists(self, name):return name in self._datadef keys(self):return list(self._data)def serve_forever(self):self._serv.serve_forever()if __name__ == '__main__':kvserv = KeyValueServer(('', 15000))kvserv.serve_forever
from xmlrpc.client import ServerProxy
s = ServerProxy('http://localhost:15000', allow_none=True)
s.set('foo', 'bar')

XML-RPC 能让我们构造一个简单的远程调用服务。创建一个服务器实例,通过 register_function() 注册函数,然后启动它。
通过 XML-RPC 出来的函数只适用于部分数据类型。如果传递对象实例,处理的只是这个实例的字典。
XML-RPC 把数据都序列化为 XML 格式,能被其它语言识别,但是运行会慢一些。

11.7 在不同的 Python 解释器之间交互

不同机器上的多个 Python 解释器实例想通过消息交换数据。
multiprocessing.connection 模块:

from multiprocessing.connection import Listener
import tracebackdef echo_client(conn):try:while True:msg = conn.recv()conn.send(msg)except EOFError:print('Connection closed')def echo_server(address, authkey):serv = Listener(address, authkey=authkey)while True:try:client = serv.accept()echo_client(client)except Exception:traceback.print_exc()exco_server(('', 25000), authkey=b'peekaboo')

客户端连接服务器并发送消息:

from multiprocessing.connection import Client
c = Client(('localhost', 25000), authkey=b'peekaboo')
c.send('hello')
c.recv()

这个方法适合用来建立长连接,通信功能简单。如果想对底层连接做更多控制如超时、非阻塞I/O等,最好使用另外的库或在高层 socket 上实现。

11.8 实现远程方法调用

在消息传输层(socket、multiprocessing connections 或 ZeroMQ)实现简单的远程过程调用(RPC)。

在不同的解释器之间传送 pickle 字节字符串的 RPC 处理器:

# rpcserver.py
import pickle
class RPCHandler:def __init__(self):self._functions = {}def register_function(self, func):self._functions[func.__name__] = funcdef handle_connection(self, connection):try:while True:# 接收信息func_name, args, kwargs = pickle.loads(connection.recv())# 运行RPC发出响应try:r = self._functions[func_name](*args, **kwargs)connection.send(pickle.dumps(r))except Exception as e:connection.send(pickle.dumps(e))except EOFError:pass

使用 multiprocessing 实现的 RPC 服务器:

from multiprocessing.connection import Listener
from threading import Threaddef rpc_server(handler, address, authkey):sock = Listener(address, authkey=authkey)while True:client = sock.accept()t = Thread(target=handler.handle_connection, args=(client,))t.daemon = Truet.satrt()# 远程方法
def add(x, y):return x + ydef sub(x, y):return x - y# 和 handler 绑定
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)# 运行服务
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')

再在客户端创建一个用来传送请求的 RPC 代理类:

import pickleclass RPCProxy:def __init__(self, connection):self._connection = connectiondef __getattr__(self, name):def do_rpc(*args, **kwargs):self._connection.send(pickle.dumps((name, args, kwargs)))result = pickle.loads(self._connection.recv())if isinstance(result, Exception):raise resultreturn resultreturn do_rpc

这个代理类装在一个服务器连接上面:

from multiprocessing.connection import Client
c = Client(('localhost', 17000), authkey=b'peekaboo')
proxy = RPCProxy(c)
proxy.add(2, 3)

客户端想调用一个函数的时候,代理类 RPCProxy 创建了一个包含函数名和参数的元组,这个元组序列化后通过网络连接发出去。服务器收到后反序列化,查找并执行已注册过的函数,执行地结果或异常被序列化后返回客户端。

11.9 简单的客户端认证

在分布式系统中实现一个简单的客户端连接认证,但是不像 SSL 那样复杂。
hmac 模块实现认证:

import hmac
import osdef client_authenticate(connection, secret_key):'''客户端到远程服务的认证。connection 是网络连接。secret_key 是仅客户端和服务端知道的密钥'''message = connection.recv(32)hash = hmac.new(secret_key, message)digest = hash.digest()connection.send(digest)def server_authenticate(connection, secret_key):message = os.urandom(32)connections.send(message)hash = hmac.new(secret_key, message)digest = hash.digest()response = connection.recv(len(digest))return hmac.compare_digest(digest, response)

连接建立后,服务端给客户端发送一个随机的字节消息。客户端和服务端同时用 hmac 和一个只有双方知道的密钥计算出一个加密哈希值。客户端将它计算的摘要发送给服务器,服务器通过比较这个值是否一致决定要不要接受连接。
不用 == 号比较,使用 hmac.compare_digest() 比较能避免时间分析攻击。
将它集成到已有的网络或消息代码中:

from socket import socket, AF_INET, SOCK_STREAMserect_key = b'peekaboo'
def echo_handler(client_sock):if not server_authenticate(client_sock, screct_key):client_sock.close()return while True:msg = client_sock.recv(8192)if not msg:breakclient_sock.sendmail(msg)def echo_server(address):s = socket(AF_INET, SOCKET_STREAM)s.bind(address)s.listen(5)while True:c, a = s.accept()echo_handler(c)echo_server(('', 18000))
from socket import socket, AF_INET, SOCK_STREAMserect_key = b'peekaboo's = socket(AF_INET, SOCKET_STREAM)
s.connect(('localhost', 18000))
client_authenticate(s, secret_key)
s.send(b'Hello World')
resp = s.recv(1024)

hamc 认证的常用场景是内部消息通信系统和进程间通信。
认证成功后的消息是明文发送的。

11.10 在网络服务中加入 SSL

ssl 模块能为底层 socket 连接添加 SSL 支持。ssl.wrap_socket() 函数接受一个已存在的 socket 作为参数并使用 SSL 层来包装它。

from socket import socket, AF_INET, SOCK_STREAM
import sslKEYFILE = 'sever_key.pem'       # 服务器的私钥
CERTFILE = 'server_cert.pem'    # 服务认证(给客户端)def echo_client(s):while True:data = s.recv(8192)if data == b'':breaks.send(data)s.close()print('Connection closed')def echo_server(address):s = socket(AF_INET, SOCK_STREAM)s.bind(address)s.listen(1)# 使用一个需要客户端验证的SSl层包装s_ssl = ssl.wrap_socket(s,keyfile = KEYFILE,certfile = CERTFILE,server_side = True)# 等待连接while True:try:c, a = s_ssl.accept()print('Got conection', c, a)echo_client(c)except Exception as e:print('{}: {}'.format(e.__class__.__name__, e))echo_server(('', 20000))

客户端和这个服务器交互:

from socket import socket, AF_INET, SOCK_STREAM
import ssl
s = socket(AF_INET, SOCK_STREAM)
s_ssl = ssl.wrap_socket(s,cert_reqs = ssl.CERT_REQUIRED,ca_certs = 'server_cert.pem')
s_ssl.connect(('localhost', 20000))
s_ssl.send(b'Hello World?')
s_ssl.recv(8192)

上面这样直接处理底层的方式不能很好地和标准库中已存在的网络服务兼容。很多服务器代码是基于 socketserver 库实现的,客户端代码在更高的层级上实现。
服务器能用一个 mixin 类添加 SSL:

import sslclass SSLMixin:"""为基于socketserver模块的服务提供SSl支持的混入类"""def __init__(self, *args, keyfile=None, certfile=None, ca_certs=None, cert_reqs=ssl.NONE, **kwargs):self._keyfile = keyfileself._certfile = certfileself._ca_certs = ca_certsself._cert_reqs = cert_reqssuper().__init__(*args, **kwargs)def get_request(self):cient, addr = super().get_request()client_ssl = ssl.wrap_socket(client, keyfile=self._keyfile, certfile=self._certfile, ca_certs=self._ca_certs, cert_reqs=self._cert_reqs,server_side=True)return client_ssl, addr

使用时将这个混入类和其它服务器类混合。

# XML_RPC server with SSLimport ssl
from xmlrpc.server import SimpleXMLRPCServer
from sslmixin import SSLMixinclass SSLSimpleXMLRPCServer(SSLMixin, SimpleXMLRPCServer):passclass KeyValueServer:_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']def __init__(self, *args, **kwargs):self._data = {}self._serv = SSLSimpleXMLRPCServer(*args, allow_none=True, **kwargs)for name in self._rpc_methods_:self._serv.register_function(getattr(self, name))def get(self, name):return self._data[name]def set(self, name, value):self._data[name] = valuedef delete(self, name):del self._data[name]def exists(self, name):return name in self._datadef keys(self):return list(self._data)def serve_forever(self):self._serv.serve_forever()if __name__ == '__main__':KEYFILE = 'server_key.pem'CERTFILE = 'server_cert.pem'kvserv = KeyValueServer(('', 15000), keyfile=KEYFILE, certfile=CERTFILE)kvserv.serve_forever()

这个服务器能使用 xmlrpc.client 模块连接:

from xmlrpc.client import ServerProxy
s = ServerProxy('https://localhost:15000', allow_none=True)
s.set('foo', 'bar')
s.set('spam', [1, 2, 3])

SSL客户端的一个问题是如何确认服务器证书或为服务器提供客户端认证。没有标准的解决方法,需要自己解决。

from xmlrpc.client import SafeTransport, ServerProxy
import sslclass VerifyCertSafeTransport(SafeTransport):def __init__(self, cafile, certfile=None, keyfile=None):SafeTransport.__init__(self)self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)self._ssl_context.load_verify_locations(cafile)if cert:self._ssl_context.load_cert_chain(certfile, keyfile)self._ssl_context.verify_mode = ssl.CERT_REQUIREDdef make_connection(self, host):# 参数表示的目录下各项被当为关键字参数传给 http.client.HTTPSConnection() 构造器。# context 参数可接收带有 SSL 确认的 ssl.SSLContext 实例s = super().make_connection((host, {'context': self._ssl_context}))return s# 创建客户端代理
s = ServerProxy('https://localhost:15000',transport = VerifyCertSafeTransport('server_cert.pem'),allow_none = True)

服务器把证书发送给客户端,客户端确认它的合法性。如果服务器想确认客户端,可以将服务器启动代码修改如下:

if __name__ == '__main__':KEYFILE = 'server_key.pem'CERTFILE = 'server_cert.pem'CA_CERTS = 'client_cert.pem'kvserv = KeyValueServer(('', 15000),keyfile = KEYFILE,certfile = CERTFILE,ca_certs = CA_CERTS,cert_reqs = ssl.CERT_REQUIRED,)kvserv.serve_forever()

客户端发送证书的初始化代码改为:

s = ServerProxy('https://localhost:15000',transport = VerifyCertSafeTransport('server_cert.pem', 'client_cert.pem', 'client_key.pem'),allow_none = True,)

本节最后一句话: 做好花费不少时间来测试它正长工作的准备。慢慢折腾吧~ _

11.11 进程间传递 Socket 文件描述符

多个解释器在同时运行的时候,想把打开的文件描述符从一个解释器传递给另一个解释器。
首先要把这些进程连接在一起,Unix 上使用 Unix 套接字,windows 上使用命名管道。使用 multiprocessing 模块能创建。
之后就能用 multiprocessing.reduction 中的 send_handle() 和 recv_handle() 在不同的处理器直接传递文件描述符。

import multiprocessing
from multiprocessing.reduction import recv_handle, send_handle
import socketdef worker(in_p, out_p):out_p.close()while True:fd = recv_handle(in_p)print('CHILD: GOT FD', fd)with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as s:while True:msg = s.recv(1024)if not msg:breakprint(f'CHILD: RECV {msg}')s.send(msg)def server(address, in_p, out_p, worker_pid):in_p.close()s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)s.bind(address)s.listen(1)while True:client, addr = s.accept()print('SERVER: Got connection from', addr)send_handle(out_p, client.fileno(), worker_pid)client.close()if __name__ == '__main__':c1, c2 = multiprocessing.Pipe()worker_p = multiprocessing.Process(target=worker, args=(c1, c2))worker_p.start()server_p = multiprocessing.Process(target=server,args=(('', 15000), c1, c2, worker_p.pid))server_p.start()c1.close()c2.close()

两个进程被创建并用管道连接。服务器进程接收客户端的请求,收到的文件描述符交给工作进程后就关闭这个连接。工作进程处理并把结果返回给客户端。
不同进程之间传递文件描述符对大部分程序并无必要。但是有时能用来构建可扩展系统。
在多核机器上可以有多个 Python 解释器实例,能用文件描述符的传递实现负载均衡。
使用 send_handle() 和 recv_handle() 代替管道的使用:

# servermp.py
from multiprocessing.connection import Listener
from multiprocessing.reduction import send_handle
import socketdef server(work_address, port):# 等待工作进程连接work_serv = Listen(work_address, authkey=b'peekaboo')worker = work_serv.accept()worker_pid = worker.recv()# 运行TCP/IP服务,把客户端发给工作进程s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)s.bind(('', port))s.listen(1)while True:client, addr = s.accept()print('SERVER: Got connection from', addr)send_handle(worker, client.fileno(), worker_pid)client.close()if __name__ == '__main__':import sysif len(sys.argv) != 3:print('Usage: server.py server_address port', file=sys.stderr)raise SystemExit(1)server(sys.argv[1], int(sys.argv[2]))

执行 python3 servermp.py /tmp/servconn 15000 来运行这个服务器。

# workermp.pyfrom multiprocessing.connection import Client
from multiprocessing.reduction import recv_handle
import os
from socket import socket, AF_INET, SOCK_STREAMdef worker(server_address):serv = Client(server_address, authkey=b'peekaboo')serv.send(os.getpid())while True:fd = recv_handle(serv)print('WORKER: GOT FD', fd)with socket(AF_INET, SOCK_STREAM, fileno=fd) as client:while True:msg = client.recv(1024)if not msg:breakprint(f'WORKER: RECV {msg}')client.send(msg)if __name__ == '__main__':import sysif len(sys.argv) != 2:print('Usage: worker.py server_address', file=sys.stderr)raise SystemExit(1)worker(sys.argv[1])

执行 python3 workermp.py /tmp/servconn 运行工作者,效果跟 Pipe() 一样。
文件描述符的传递会涉及 UNIX 域套接字的创建和套接字的 sendmsg() 方法。使用套接字传递描述符的实现:

# server.pyimport socket
import structdef send_fd(sock, fd):"""发送单个文件描述符"""sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('i', fd))])ack = sock.recv(2)assert ack == b'OK'def server(work_address, port):# 等待工作进程连接work_serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)work_serv.bind(work_address)work_serv.listen(1)worker, addr = work_serv.accept()# 运行TCP/IP服务,把客户端发送给工作进程s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.setsockopt(socket.SOL_SCKET, socket.SO_REUSEADDR, True)s.bind(('', port))s.listen(1)while True:client, addr = s.accept()print('SERVER: Got connection from', addr)send_fd(worker, client.fileno())client.close()if __name__ == '__main__':import sysif len(sys.argv) != 3:print('Usage: server.py server_address port', file=sys.stderr)raise SystemExit(1)server(sys.argv[1], int(sys.argv[2]))
# worker.py
import socket
import structdef recv_fd(sock):"""接受单个文件描述符"""msg. ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(struct.calcsize('i')))cmsg_level, cmsg_type, cmsg_data = ancdata[0]assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTSsock.sendall(b'OK')return struct.unpack('i', cmsg_data)[0]def worker(server_address):serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)serv.connect(server_address)while True:fd = recv_fd(serv)print('WORKER: GOT FD', fd)with socket.socket(socket.AF_INETM socket.SOCK_STREAM, fileno=fd) as client:while True:msg = client.recv(1024)if not msg:breakprint(f'WORKER: RECV {msg}')client.send(msg)if __name__ == '__main__':import sysif len(sys.argv) != 2:  print('USAGE: worker.py server_address', file=sys.stderr)raise SystemExit(1)worker(sys.argv[1])

11.12 理解事件驱动的 IO

事件驱动 I/O 本质上是把基本 I/O 操作转化为你程序需要处理的事件。
数据在某个 socket 上被接收后,转化为一个 receive 事件,然后被自定义的回调方法处理。
一个事件驱动的框架可能会以一个实现了一系列基本事件处理器方法的基类开始:

class EventHandler():def fileno(self):"""返回关联的文件描述符"""raise NotImplemented('must implement')def wants_to_raceive(self):"""返回真如果允许接收"""return Falsedef handle_receive(self):"""执行接收操作"""passdef wants_to_send(self):"""返回真如果需要发送"""return Falsedef handle_send(self):"""向外发送数据"""pass

这个类的实例作为插件被放入类似下面这样的事件循环中:

import selectdef event_lop(handlers):while True:wants_recv = [h for h in handlers if h.wants_to_receive()]wants_send = [h for h in handlers if h.wants_to_send()]can_recv, can_send, _ = select.select(wants_recv, wants_send, [])for h in can_recv:h.handle_receive()for h in can_send:h.handle_send()

select() 会不断轮询文件描述符从而激活它。在调用 select() 之前,时间循环会询问所有的处理器来决定哪一个想接收或发送,然后将它的结果列表给 select()。select() 返回真被接收货发送的对象的列表,相应的 handle_receive() 或 handle_send() 方法被触发。下面是基于 UDP 网络服务的处理器例子:

import socket
import timeclass UDPServer(EventHandler):def __init__(self, address):self.sock = socket(socket.AF_INET, socket.SOCK_DGRAM)self.sock.bind(address)def fileno(self):return self.sock.fileno()def wants_to_receive(self):return Trueclass UDPTimeServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(1)self.sock.sendto(time.ctime().encode('ascii'), addr)class UDPEchoServer(UDPServer):def handle_receive(self):msg, addr = self.sock.recvfrom(8192)self.sock.sendto(msg, addr)if __name__ == '__main__':handlers = [UDPTimeServer(('', 14000)), UDPEchoServer(('', 15000))]event_loop(handlers)

TCP 服务器,每个客户端都需要初始化一个新的处理器对象:

class TCPServer(EventHandler):def __init__(self, address, client_handler, handler_list):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)self.sock.bind(address)self.sock.listen(1)self.client_handler = client_handlerself.handler_list = handler_listdef fileno(self):   return self.sock.filenodef wants_to_receive(self):return Truedef handle_receive(self):client, addr = self.sock.accept()self.handler_list.append(self.client_handler(client, self.handler_list))class TCPClient(EventHandler):def __init__(self, sock, handler_list):self.sock = sockself.handler_list = handler_listself.outgoing = bytearray()def fileno(self):return self.sock.fileno()def close(self):self.sock.close()self.handler_list.remove(self)def wants_to_send(self):return True if self.outgoing else Falsedef handle_send(self):nsent = self.sock.send(self.outgoing)self.outgoing = self.outgoing[nsent:]class TCPEchoClient(TCPClient):def wants_to_receive(self):return Truedef handle_receive(self):data = self.sock.recv(8192)if not data:self.close()else:self.outgoing.extend(data)if __name__ == '__main__':handlers = []handlers.append(TCPServer(('', 16000), TCPEchoClient, handlers))event_loop(handlers)

每发起一个连接,就创建处理器并填到处理器列表中。连接关闭后,客户端自行把对应的处理器从列表中删除。

这样的事件驱动能处理很大的并发连接而不需要使用多线程或多进程。???循环中一次处理一个事件,不需要并发机制。
事件驱动的缺点是没有同步机制,遇到一个耗时的计算会阻塞所有的处理进程。遇到阻塞或耗时计算的时候能把该事件发送给其它单独的进程处理。
事件驱动中引入多进程和多线程很麻烦,下面用 concurrent.futures 模块实现:

from concurrent.futures import ThreadPoolExecutor
import osclass ThreadPoolHandler(EventHandler):def __init__(self, nworkers):if os.name == 'posix':self.signal_done_sock, self.done_sock = socket.socketpair()else:server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind(('127.0.0.1', 0))server.listen(1)self.signal_done_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.signal_done_sock.connect(server.getsockname())self.done_sock, _ = server.getsockname()server.close()self.pending = []self.pool = ThreadPoolExcutor(nworkers)def fileno(self):return self.done_sock.fileno()# 线程结束时回传执行def _complete(self, callback, r):self.pending.append((calback, r,result()))self.signal_done_sock.send()# 在线程池运行一个函数def run(self, func, args=(), kwargs={}, *, callback):r = self.pool.submit(func, *args, **kwargs)r.add_done_callback(lambda r: self._complete(callback, r))def wants_to_receive(self):return True# 运行已完成进程的回传函数def handle_receive(self):for callback, result in self.pending:callback(result)self.done_sock.recv(1)self.pending = []

run() 方法被提交给回调函数池,处理完被激发。
???创建了一对 socket 并将其作为信号量机制使用。
线程池完成工作后,会执行类中的 _complete() 方法,这个方法在往 socket 上写入前会把挂起的回调函数和结果放入队列中。

11.13 发送与接收大型数组

网络连接发送和接收连续数据,并减少复制操作。
memoryviews

# zerocopy.pydef send_from(arr, dest):view = memoryviews(arr).cast('B')while len(view):nsent = dest.send(view)view = view[nsent:]def recv_into(arr, source):view = memoryviews(arr).cast('B')while len(view):nrecv = source.recv_into(view)view = view[nrecv:]

数据需要先转化成原始字节以便网络函数使用,还要切分为多个块。
把数据序列化为字节字符串会创建复制。
内存视图本质上是已存在数组的覆盖层,能以不同的方式转换数据,并可以被相关函数使用。sock.send() 和 send.recv_into() 会直接操作内存区域。
每个 send() 和 recv_into() 操作会把视图通过发送或接受的字节量且分为新的视图。

note-PythonCookbook-第十一章 网络与WEB编程相关推荐

  1. Python(十一)网络与web编程

    1.作为客户端与HTTP服务交互 (1)发送一个简单的HTTP GET请求到远程的服务上 from urllib import request, parse#url的get请求 请求信息连同url一起 ...

  2. CSAPP:第十一章 网络编程

    CSAPP:第十一章 网络编程 11.1 客户端服务器模型11.2 全球IP因特网11.3 套接字接口 11.1 客户端服务器模型   每个网络应用都是基于客户端-服务器模型.采用这个模型,一个应用是 ...

  3. 泄漏计算机网络安全法情节,第十一章网络安全法第十二章电子商务纠纷的法律解决详细分解.doc...

    第十一章 网络安全法 第一节 计算机网络安全概述 第二节 计算机与网络犯罪概述 第三节 计算机网络安全法律规范 (一)名词解释 1,计算机信息网络安全:"为数据处理系统建立和采用的技术和管理 ...

  4. 【正点原子Linux连载】第十一章 网络编程 摘自【正点原子】I.MX6U嵌入式Qt开发指南V1.0.2

    1)实验平台:正点原子阿尔法Linux开发板 2)平台购买地址:https://item.taobao.com/item.htm?id=603672744434 2)全套实验源码+手册+视频下载地址: ...

  5. 《jQuery与JavaScript入门经典》——第 1 章 动态Web编程简介 1.1理解Web服务器浏览器范式...

    本节书摘来自异步社区<jQuery与JavaScript入门经典>一书中的第1章,第1.1节,作者:[美]Brad Dayley著,更多章节内容可以访问云栖社区"异步社区&quo ...

  6. 信安软考 第十一章 网络物理隔离技术与应用

       目录汇总 一.网络物理隔离概述 1.1 网络物理隔离概念   随着网络攻击技术不断增强,恶意入侵内部网络的风险性也相应急剧提高.同时,内部网的用户因为安全意识薄弱,可能有意或者无意地建敏感数据泄 ...

  7. CSAPP第十一章 网络编程

    客户端-服务器编程模型 套接字对  套接字接口 基于套接字接口的网络应用概述

  8. Spring基础专题——第十一章(高级注解编程完结)

    前言:去年到现在一直没有很好的时间完成这个spring基础+源码的博客目标,去年一年比较懒吧,所以今年我希望我的知识可以分享给正在奋斗中的互联网开发人员,以及未来想往架构师上走的道友们我们一起进步,从 ...

  9. 第十一、十二、十三、十四章 网络配置管理、归档和远程复制同步文件、软件包管理、创建访问linux文件系统

    第十一章 网络配置管理 网络地址获取方式: 1)DHCP自动获取 2)手动配置 1.网卡配置文件: /etc/sysconfig/network-scripts/ [root@server0 Desk ...

最新文章

  1. SAP JAM活跃度统计
  2. Xampp修改默认端口号
  3. jquery ajax 设置header的方式
  4. 第13天:页面布局实例-博雅主页
  5. [C#]用Forms.TreeView显示Icon会有黑边
  6. ajax请求 cache,JavaScript_解析jquery中的ajax缓存问题,jquery的ajax请求默认请求cache是t - phpStudy...
  7. SQLSERVER中判断表中的某列是否存在两个方法
  8. 汇川PLC软件下载及安装
  9. 带计算机来学校检讨,校园检讨书
  10. 一脸懵逼学习oracle(图形化界面操作---》PLSQL图形化界面)
  11. 电机matlab程序计算公式,MATLAB用于电机电磁计算的计算机编程
  12. KeyError: 2
  13. 坐标转换:将imu坐标系下的角速度、线速度转换到车体坐标系,参考Autoware
  14. Multisim14.0 简易交通灯设计
  15. 领导要我6点下班前创建1000个有效的手机号,现在5点半了!random模块10分钟搞定!
  16. 关于magic leap-magic leap 核心技术解密-谷歌到底看中了magic leap的什么
  17. 【论文阅读】【逐字翻译】 爱丁堡大学IEEE TPAMI 2021年最新元学习综述 《Meta-Learning in Neural Networks: A Survey》
  18. 武汉唯众智创科技有限公司通过2020第二批产学合作协同育人项目申报指南
  19. jsp标签自定义属性取值问题
  20. 两招提高孩子识字兴趣和效果

热门文章

  1. 张飞的流水帐(无厘头式的搞笑)【
  2. 华硕双路服务器主板装系统,华硕双路服务器主板Z8PE-D12X
  3. saas系统需要什么样的云服务器,SAAS系统和云服务器的区别
  4. 王煜全老师谈SNS创业
  5. 电子齿轮 电子凸轮
  6. 魔性!Python生成全网爆火的“蚂蚁呀嘿”
  7. 微信抢票应用开发总结
  8. 正大国际期货:外盘黄金期货怎么做,需要注意什么?
  9. buildroot制作树莓派CM3的系统
  10. 京东微信、手机QQ引领社交化购物趋势