python websocket 的异步实现:websockets
官方文档: https://websockets.readthedocs.io/
Getting started
环境:python3.7及以上
安装
(Version: 8.1)
pip install websockets
基本示例
服务端:
不断监听客户端连接,当客户端发送一个name字符串后,返回一条 ‘hello’ + name 的问候消息。
# WS server exampleimport asyncio
import websocketsasync def hello(websocket, path):# path标识请求路径,可以来自定义需求name = await websocket.recv()print(f"< {name}")greeting = f"Hello {name}!"await websocket.send(greeting)print(f"> {greeting}")start_server = websockets.serve(hello, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
客户端:
客户端连接到server,执行协程处理器 handler,当协程退出后与server断开连接。
# WS client exampleimport asyncio
import websocketsasync def hello():uri = "ws://localhost:8765"async with websockets.connect(uri) as websocket:name = input("What's your name? ")await websocket.send(name)print(f"> {name}")greeting = await websocket.recv()print(f"< {greeting}")asyncio.get_event_loop().run_until_complete(hello())
connect()通过 上下文管理器保证了协程退出之前关闭socket连接。
SSL安全连接示例
安全的WebSocket连接提高了保密性和可靠性,降低了了使用不安全的proxy代理服务的风险。
WSS协议之于WS就像HTTPS之于HTTP: 连接是用传输层安全(TLS)加密的,TLS通常被称为安全套接字层(SSL)。WSS需要类似HTTPS的TLS证书。
下面介绍如改写上面的服务器示例以提供安全连接。请参阅ssl
模块的文档配置上下文。
# WSS (WS over TLS) server example, with a self-signed certificateimport asyncio
import pathlib
import ssl
import websocketsasync def hello(websocket, path):name = await websocket.recv()print(f"< {name}")greeting = f"Hello {name}!"await websocket.send(greeting)print(f"> {greeting}")ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_cert_chain(localhost_pem)start_server = websockets.serve(hello, "localhost", 8765, ssl=ssl_context
)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
改写相应的客户端安全连接实现:
# WSS (WS over TLS) client example, with a self-signed certificateimport asyncio
import pathlib
import ssl
import websocketsssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("localhost.pem")
ssl_context.load_verify_locations(localhost_pem)async def hello():uri = "wss://localhost:8765"async with websockets.connect(uri, ssl=ssl_context) as websocket:name = input("What's your name? ")await websocket.send(name)print(f"> {name}")greeting = await websocket.recv()print(f"< {greeting}")asyncio.get_event_loop().run_until_complete(hello())
注意:
此处客户端需要一个ssl上下文,因为此处使用的是一个 自签名的CA证书。
如果使用有效的证书(即由Python安装信任的CA签名)连接到安全的WebSocket服务器的客户端可以简单地将ssl=True传递给connect(),而不必构建ssl上下文。
使用浏览器连接到服务器
从浏览器连接到我们自己实现的服务器:
从console里运行以下程序:
# WS server that sends messages at random intervalsimport asyncio
import datetime
import random
import websocketsasync def time(websocket, path):while True:now = datetime.datetime.utcnow().isoformat() + "Z"await websocket.send(now)await asyncio.sleep(random.random() * 3)start_server = websockets.serve(time, "127.0.0.1", 5678)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
将以下代码写入html文件,并从浏览器里打开
<!DOCTYPE html>
<html><head><title>WebSocket demo</title></head><body><script>var ws = new WebSocket("ws://127.0.0.1:5678/"),messages = document.createElement('ul');ws.onmessage = function (event) {var messages = document.getElementsByTagName('ul')[0],message = document.createElement('li'),content = document.createTextNode(event.data);message.appendChild(content);messages.appendChild(message);};document.body.appendChild(messages);</script></body>
</html>
多客户端同步
一个websocket服务器可以从多个客户端接收消息,处理消息并同步状态到所有客户端。
以下示例展示了任意一客户端将一变量counter进行增加或减少后,所有已连接的客户端都能同步更新counter的实时值。
协程模块 asyncio
保证了变量的更新是按先后顺序的。
服务端代码:
# WS server example that synchronizes state across clientsimport asyncio
import json
import logging
import websocketslogging.basicConfig()STATE = {"value": 0}# 保存所有在线客户端
USERS = set()def state_event():return json.dumps({"type": "state", **STATE})def users_event():return json.dumps({"type": "users", "count": len(USERS)})# 更新所有客户端显示的counter值
async def notify_state():if USERS: # asyncio.wait doesn't accept an empty listmessage = state_event()await asyncio.wait([user.send(message) for user in USERS])# 通知客户端在线数量
async def notify_users():if USERS: # asyncio.wait doesn't accept an empty listmessage = users_event()await asyncio.wait([user.send(message) for user in USERS])# 注册客户端
async def register(websocket):USERS.add(websocket)await notify_users()# 注销客户端
async def unregister(websocket):USERS.remove(websocket)await notify_users()async def counter(websocket, path):# register(websocket) sends user_event() to websocketawait register(websocket)try:await websocket.send(state_event())# 迭代websocket以不断接收消息,此处要求对象实现了 __iter__()、__await__()、 __aenter__()、 __aexit__() 方法。async for message in websocket:data = json.loads(message)if data["action"] == "minus":STATE["value"] -= 1await notify_state()elif data["action"] == "plus":STATE["value"] += 1await notify_state()else:logging.error("unsupported event: {}", data)finally:# 客户端断开后,退出上面的 for 循环,即客户端协程退出后await unregister(websocket)start_server = websockets.serve(counter, "localhost", 6789)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
将以下代码写入html文件,并用浏览器打开多个页面模拟多个ws客户端
<html><head><title>WebSocket demo</title><style type="text/css">body {font-family: "Courier New", sans-serif;text-align: center;}.buttons {font-size: 4em;display: flex;justify-content: center;}.button, .value {line-height: 1;padding: 2rem;margin: 2rem;border: medium solid;min-height: 1em;min-width: 1em;}.button {cursor: pointer;user-select: none;}.minus {color: red;}.plus {color: green;}.value {min-width: 2em;}.state {font-size: 2em;}</style></head><body><div class="buttons"><div class="minus button">-</div><div class="value">?</div><div class="plus button">+</div></div><div class="state"><span class="users">?</span> online</div><script>var minus = document.querySelector('.minus'),plus = document.querySelector('.plus'),value = document.querySelector('.value'),users = document.querySelector('.users'),websocket = new WebSocket("ws://127.0.0.1:6789/");minus.onclick = function (event) {websocket.send(JSON.stringify({action: 'minus'}));}plus.onclick = function (event) {websocket.send(JSON.stringify({action: 'plus'}));}websocket.onmessage = function (event) {data = JSON.parse(event.data);switch (data.type) {case 'state':value.textContent = data.value;break;case 'users':users.textContent = (data.count.toString() + " user" +(data.count == 1 ? "" : "s"));break;default:console.error("unsupported event", data);}};</script></body>
</html>
Server常用模板
你要在连接的生命周期里处理多条消息,你必须实现一个loop,以下为你提供了一个构建Websocket Server的基础模板。
消费者
接收消息并且传递到消费者协程中。
async def consumer_handler(websocket, path):async for message in websocket:await consumer(message)
当客户端断开连接后终止迭代。
生产者
从生产者协程生成消息,并且发送出去
async def producer_handler(websocket, path):while True:message = await producer()await websocket.send(message)
此处生产者代表你的产生消息的业务逻辑。
注意:当客户端断开连接后,send() 会引发 ConnectionClosed
异常,从而从 while True
的 loop 中 退出。
生产者+消费者
你可以将上面两种模式结合起来,两个协程任务并行。
async def handler(websocket, path):consumer_task = asyncio.ensure_future(consumer_handler(websocket, path))producer_task = asyncio.ensure_future(producer_handler(websocket, path))done, pending = await asyncio.wait([consumer_task, producer_task],return_when=asyncio.FIRST_COMPLETED,)for task in pending:task.cancel()
注册客户端
参考上面多客户端同步的代码,你需要记录所有已连接的客户端,当他们连接到server时进行注册,断开时注销。
connected = set()async def handler(websocket, path):# Register.connected.add(websocket)try:# Implement logic here.await asyncio.wait([ws.send("Hello!") for ws in connected])await asyncio.sleep(10)finally:# Unregister.connected.remove(websocket)
这个简单的示例展示了如何在内存中跟踪连接的客户端,这只在运行单个进程时有效。在实际应用程序中,handler可以注册到消息代理broker 上的某些channels。
Cheat sheet
Server
handler处理器: 实现一个协程处理单个连接,接收两个参数(WebSocket协议实例和url 路径参数)
- 随时调用
recv()
和send()
来接收和发送消息。 - 当
recv()
或send()
引发ConnectionClosed
时,清除并退出。如果您启动了其他异步任务,在退出之前终止它们。 - 如果您不是在awaiting
recv()
,可以考虑awaitingwait_closed()
,以便在连接关闭时快速检测。 - 如果你愿意,你可以
ping()
或pong()
,但一般不需要
- 随时调用
使用
serve()
创建一个服务器,它类似于asyncio
中loop的create_server()
。您还可以将它用作异步上下文管理器。- 服务器负责建立连接,然后让处理程序执行应用程序逻辑,最后在处理程序正常或异常退出后关闭连接
- 对于高级定制,您可以继承
WebSocketServerProtocol
的子类,并将这个子类或工厂函数作为create_protocol
参数传递。
Client
使用
connect()
创建一个客户端,它类似于asyncio
中loop的create_connection()
。您还可以将它用作异步上下文管理器。- 对于高级定制,您可以继承
WebSocketClientProtocol
的子类,并将这个子类或工厂函数作为create_protocol
参数传递。
- 对于高级定制,您可以继承
随时调用recv()和send()来接收和发送消息。
如果你愿意,你可以ping()或pong(),但一般不需要。
如果没有使用
connect()
作为上下文管理器,请调用close()
来终止连接。
Debugging
如果你不了解websocket的工作原理,请打开日志调试
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
如果你不了解asyncio
库, 建议查看官方文档develop with asyncio.
传递额外的参数到handler
import asyncio
import functools
import websocketsasync def handler(websocket, path, extra_argument):...bound_handler = functools.partial(handler, extra_argument='spam')
start_server = websockets.serve(bound_handler, '127.0.0.1', 8765)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
实现此结果的另一种方法是在存在extra_argument
变量的范围内定义handler
协程,而不是通过参数注入它。
Deployment
如何关闭server
- 使用异步上下文管理器 async with
- 调用
close()
方法,然后等待它自身的wait_closed()
方法执行结束。
在Unix系统上(windows就不要试了),退出通常是通过发送一个信号来触发的。
import asyncio
import signal
import websocketsasync def echo(websocket, path):async for message in websocket:await websocket.send(message)async def echo_server(stop):async with websockets.serve(echo, "localhost", 8765):await stoploop = asyncio.get_event_loop()# The stop condition is set when receiving SIGTERM.
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)# Run the server until the stop condition is met.
loop.run_until_complete(echo_server(stop))
如果你的server不是运行在主线程上,可以使用 call_soon_threadsafe()
.
端口共享
Websocket是HTTP/1.1.的扩展,在同一个端口上同时提供HTTP和WebSocket是ok的。
WebSocket的作者并不认为这是一个好主意,因为HTTP和WebSocket的操作特性有很大的不同。
websockets
使用process_request
参数钩子,为响应HTTP请求提供了最低限度的支持。典型的用例包括健康检查。这里有一个例子
# WS echo server with HTTP endpoint at /health/import asyncio
import http
import websocketsasync def health_check(path, request_headers):if path == "/health/":return http.HTTPStatus.OK, [], b"OK\n"async def echo(websocket, path):async for message in websocket:await websocket.send(message)start_server = websockets.serve(echo, "localhost", 8765, process_request=health_check
)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Extensions
websockets 支持扩展
RFC 7692
WebSocket Per-Message Deflate
serve()
and connect()
默认支持Per-Message Deflate, 你可以
通过参数禁用compression=None
.
如果希望自定义Per-Message Deflate参数,还可以显式配置每个消息的Deflate扩展。
- 服务端
import websockets
from websockets.extensions import permessage_deflatewebsockets.serve(...,extensions=[permessage_deflate.ServerPerMessageDeflateFactory(server_max_window_bits=11,client_max_window_bits=11,compress_settings={'memLevel': 4},),],
)
- 客户端
from websockets.extensions import permessage_deflatewebsockets.connect(...,extensions=[permessage_deflate.ClientPerMessageDeflateFactory(server_max_window_bits=11,client_max_window_bits=11,compress_settings={'memLevel': 4},),],
)
参考API文档ServerPerMessageDeflateFactory 和 ClientPerMessageDeflateFactory 了解更多细节。
API
python websocket 的异步实现:websockets相关推荐
- python websocket爬虫_Python如何爬取实时变化的WebSocket数据
一.前言 作为一名爬虫工程师,在工作中常常会遇到爬取实时数据的需求,比如体育赛事实时数据.股市实时数据或币圈实时变化的数据.如下图: Web 领域中,用于实现数据'实时'更新的手段有轮询和 WebSo ...
- python协程详解_对Python协程之异步同步的区别详解
一下代码通过协程.多线程.多进程的方式,运行代码展示异步与同步的区别. import gevent import threading import multiprocessing # 这里展示同步和异 ...
- python processpoolexector 释放内存_一起看看python 中日志异步发送到远程服务器
在python中使用日志最常用的方式就是在控制台和文件中输出日志了,logging模块也很好的提供的相应的类,使用起来也非常方便,但是有时我们可能会有一些需求,如还需要将日志发送到远端,或者直接写入数 ...
- Python爬虫获取异步加载站点pexels并下载图片(Python爬虫实战3)
Python爬虫获取异步加载站点pexels并下载图片(Python爬虫实战3) 1. 异步加载爬虫 对于静态页面爬虫很容易获取到站点的数据内容,然而静态页面需要全量加载站点的所有数据,对于网站的访问 ...
- python websocket实时消息推送
python websocket实时消息推送 十分想念顺店杂可... 本人写的渣,大神勿喷. 转载请附带本文链接,谢谢. 服务端代码 # -*- coding: utf-8 -*- # @Time : ...
- 『Python学习笔记』Python中的异步Web框架之fastAPI介绍RestAPI
Python中的异步Web框架之fastAPI介绍&RestAPI 文章目录 一. fastAPI简要介绍 1.1. 安装 1.2. 创建 1.3. get方法 1.4. post方法 1.5 ...
- python消息队列celery_【干货分享】NTI任务管理之django+python篇celery异步任务使用...
阅读: 3,538 新浪微博的新鲜事推送如何实现?大规模的服务器如何实现Crontab管理?里面的秘密就在于消息队列.Celery是一个使用Python开发的分布式任务调度模块,是一个简单.灵活.可靠 ...
- fastapi python 并发_FastAPI 异步代码、并发和并行
作者:麦克煎蛋 出处:https://www.cnblogs.com/mazhiyong/ 转载请保留这段声明,谢谢! 我们这里探讨下关于异步代码.并行和并发的一些概念. 一.初探 1.如果我们使 ...
- python websocket异步高并发_Python3.5异步和多个websocket服务器
我在Ubuntu上使用pythonwebsockets4.0.1.我想有2个websocket服务器运行.我可以通过为每个线程创建2个线程和独立的事件循环来实现这一点.我所说的"某种工作&q ...
最新文章
- 去掉字符串两端的全角空格和半角空格(含源代码)
- 添加MySql数据库超时设置的相关问题
- 基础-计算机及操作系统和应用程序的概念
- 【计算机网络】第五章 数据链路层(3)
- 查看mysql,apache,php,nginx编译参数
- SQL SERVER大话存储结构(2)
- php+mysql分库分表的哈希(hash)算法
- 网站不大但加载很慢怎么优化_博客网站首页加载优化
- python中with as用法_python 中关于with...as的用法
- 7 centos 查看程序文件数量_CentOS之使用Systemd添加自定义系统服务
- [css] 说说响应式设计(responsive design)和自适应设计(adaptive design)的区别?
- 神经网络激活函数=生物转换器?
- 操作系统 cpu调度_CPU调度| 操作系统
- python元组和列表字典_python:列表、元组和字典
- birt预览能有内容发布后没内容_VS Code 1.52 发布!
- Mac基础知识:Mac日历如何添加提醒事件的教程
- 转大白话系列之C#委托与事件讲解大结局
- 2019年苏大计算机考研872真题及解析
- 删除下拉框只找23火星软件_下拉框找20火星软件
- 华为任正非写的《致新员工书》