____tz_zs
本次,我将从主流的三方框架使用出发,带大家熟悉和使用 Python 中常见的 websocket 库。

一、websocket-client 库

websocket-client 库是一个简单好用的同步的 websocket 的客户端的库,基于回调的方式使用。

pypi地址: https://pypi.org/project/websocket-client/
GitHub地址: https://github.com/websocket-client/websocket-client
文档地址: https://websocket-client.readthedocs.io/en/latest/

websocket-client 库也是我们诸多项目中正在使用的 websocket 库,这个库开箱即用,非常的方便,其 WebSocketApp 适合于建立长期连接。

我们使用对象的函数作为 WebSocketApp 的回调函数,可以在全局变量中缓存和共享数据,持有 websocket 引用以便使用和关闭,维护长链接和心跳等,这种方式在构建应用程序时具有更大的灵活性。

下方为 WebSocketApp 的一个使用例子,创建 WebSocketApp 对象,传入 url 地址,指定 on_open 等回调函数,调用 run_forever 运行。

tips:
1、 websocket.enableTrace(True) 可以开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。
2、也可以先创建对象,之后再使用如 self.ws.on_open = self.on_open 来指定回调函数。只需在 run_forever() 之前指定回调函数即可。
3、控制台输出的 send: b'\x8a\x80\xf4-\xd9\x8b' 等形式的信息,是框架的开启运行状态追踪的输出信息(框架的输出log)

(一)简单的使用demo

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):super(Test, self).__init__()self.url = "ws://echo.websocket.org/"self.ws = Nonedef on_message(self, message):print("####### on_message #######")print("message:%s" % message)def on_error(self, error):print("####### on_error #######")print("error:%s" % error)def on_close(self):print("####### on_close #######")def on_ping(self, message):print("####### on_ping #######")print("ping message:%s" % message)def on_pong(self, message):print("####### on_pong #######")print("pong message:%s" % message)def on_open(self):print("####### on_open #######")thread.start_new_thread(self.run, ())def run(self, *args):while True:time.sleep(1)input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")if input_msg == "close":self.ws.close()  # 关闭print("thread terminating...")breakelse:self.ws.send(input_msg)def start(self):websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。self.ws = WebSocketApp(self.url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。self.ws.run_forever()if __name__ == '__main__':Test().start()"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: AXR9yvs3Ucn9LE35KkhXfw==
Sec-WebSocket-Version: 13
Connection: upgrade-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 06:29:05 GMT
Sec-WebSocket-Accept: WoOPLeAQpWaV2Bqd4sDOFkSpUuw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
aaadbbbbb
send: b'\x81\x89\x82-\xdfj\xe3L\xbe\x0e\xe0O\xbd\x08\xe0'
####### on_message #######
message:aaadbbbbb
输入要发送的消息(ps:输入关键词 close 结束程序):
sakdnakjf
send: b'\x81\x89\xa8\xe0g\x8b\xdb\x81\x0c\xef\xc6\x81\x0c\xe1\xce'
####### on_message #######
message:sakdnakjf
输入要发送的消息(ps:输入关键词 close 结束程序):
123456
send: b'\x81\x86(\x84>\xb7\x19\xb6\r\x83\x1d\xb2'
####### on_message #######
message:123456
输入要发送的消息(ps:输入关键词 close 结束程序):
send: b'\x8a\x80.\xf3`+'
send: b'\x8a\x80P\x0c\xc6W'
send: b'\x8a\x807j\x03l'
send: b'\x8a\x80\xd0\xac%v'
send: b'\x8a\x80\xb9\x9do\x08'
send: b'\x8a\x80s\xbb\xad\x8f'
send: b'\x8a\x80\xf4-\xd9\x8b'
close
send: b'\x88\x82\xf5L>\xc4\xf6\xa4'
####### on_close #######Process finished with exit code 0"""

(二)通过 HTTP 或 SOCKS 代理进行连接

很多时候,特别是在线下测试环境中时,我们常常需要代理去访问某些服务器。如下 demo 中分别使用 http 和 socks5。
http_proxy_host 参数传入代理的 host 地址。
http_proxy_port 参数传入代理的端口号。
proxy_type 参数如果不填,默认为 "http",可选参数值有 'http', 'socks4', 'socks5', 'socks5h'

使用 socks5 可能会少包 PySocks module not found,注意不要下错包了 pip install PySocks (No module named ‘socks’)

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):super(Test, self).__init__()self.url = "ws://echo.websocket.org/"self.ws = Nonedef on_message(self, message):print("####### on_message #######")print("message:%s" % message)def on_error(self, error):print("####### on_error #######")print("error:%s" % error)def on_close(self):print("####### on_close #######")def on_ping(self, message):print("####### on_ping #######")print("ping message:%s" % message)def on_pong(self, message):print("####### on_pong #######")print("pong message:%s" % message)def on_open(self):print("####### on_open #######")thread.start_new_thread(self.run, ())def run(self, *args):# for i in range(3):#     time.sleep(1)#     self.ws.send("Hello %d" % i)while True:time.sleep(1)input_msg = input("输入要发送的消息(ps:输入关键词 close 结束程序):\n")if input_msg == "close":self.ws.close()  # 关闭print("thread terminating...")breakelse:self.ws.send(input_msg)def start(self):websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。self.ws = WebSocketApp(self.url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# self.ws.on_open = self.on_open  # 也可以先创建对象再这样指定回调函数。run_forever 之前指定回调函数即可。self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=8123, proxy_type='http')# self.ws.run_forever(http_proxy_host="192.168.1.110", http_proxy_port=1080, proxy_type='socks5')if __name__ == '__main__':Test().start()"""
Connecting proxy...
--- request header ---
CONNECT echo.websocket.org:80 HTTP/1.0-----------------------
--- response header ---
HTTP/1.1 200 Connection established
-----------------------
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: JvORoQC9F6tb639eFo5s+Q==
Sec-WebSocket-Version: 13
Connection: upgrade-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 07:23:32 GMT
Sec-WebSocket-Accept: LyeriRX6MoTWDOUIwu3T1AhurSQ=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的消息(ps:输入关键词 close 结束程序):
124
send: b'\x81\x83b\x83c\xd5S\xb1W'
####### on_message #######
message:124
输入要发送的消息(ps:输入关键词 close 结束程序):
743
send: b'\x81\x83*\\\xe8d\x1dh\xdb'
####### on_message #######
message:743
输入要发送的消息(ps:输入关键词 close 结束程序):
close
send: b'\x88\x82\xaeS\xd6\x94\xad\xbb'
####### on_close #######thread terminating...Process finished with exit code 0
"""

源码 site-packages/websocket/_http.py

class proxy_info(object):def __init__(self, **options):self.type = options.get("proxy_type") or "http"if not(self.type in ['http', 'socks4', 'socks5', 'socks5h']):raise ValueError("proxy_type must be 'http', 'socks4', 'socks5' or 'socks5h'")self.host = options.get("http_proxy_host", None)if self.host:self.port = options.get("http_proxy_port", 0)self.auth = options.get("http_proxy_auth", None)self.no_proxy = options.get("http_no_proxy", None)else:self.port = 0self.auth = Noneself.no_proxy = None

(三)ping 和 pong

WebSocket 规范将 ping 和 pong 消息操作码定义为协议的一部分。使即使服务器和客户端之间没有传输数据,也可以保持长期连接处于活动状态。

1、自动响应

框架接收到服务器发来的 ping 帧时,会立刻自动调用下方的函数,将数据使用 pong 帧原样发送给服务器,然后才回调 on_ping 将数据给使用者。所以,服务器发送到 ping 帧,我们一般不需要处理,框架自动回应了。

源码 site-packages/websocket/_core.py 297行

def pong(self, payload):"""send pong data.payload: data payload to send server."""if isinstance(payload, six.text_type):payload = payload.encode("utf-8")self.send(payload, ABNF.OPCODE_PONG)

2、ping_interval

设置参数 ping_interval 框架将每间隔时间后自动发送空内容的 ping 帧。如果不设置参数(默认参数为 0),则不自动发送。

源码 site-packages/websocket/_core.py 287行

def ping(self,payload=""):"""send ping data.payload: data payload to send server."""if isinstance(payload, six.text_type):payload = payload.encode("utf-8")self.send(payload, ABNF.OPCODE_PING)

3、ping_timeout

但是,因为框架不是异步的,如果发生阻塞事件,ping/pong 可能会出现一些问题。所以一般需要设置 ping_timeout 超时时间。
注意,当 ping_interval 和 ping_timeout 参数都设置了的时候,框架要求参数 ping_interval 的值需大于参数 ping_timeout。

if ping_timeout and ping_interval and ping_interval <= ping_timeout:raise WebSocketException("Ensure ping_interval > ping_timeout")

源码中发送空 ping 帧时,会保存发送时间 self.last_ping_tm ,接收到服务器返回的 pong 帧时,会保存接收时间 self.last_pong_tm,如果同时满足(1)当前的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout,(2)上次发送 ping 之后没有收到 pong,或接收到 pong 的时间距离上次发送 ping 的时间间隔大于参数 ping_timeout。

源码 site-packages/websocket/_app.py 294行

def check():if (ping_timeout):has_timeout_expired = time.time() - self.last_ping_tm > ping_timeouthas_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeoutif (self.last_ping_tmand has_timeout_expiredand (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):raise WebSocketTimeoutException("ping/pong timed out")return True

4、例子

如下 demo 中,设置 ping_interval 为 20, ping_timeout 为 10。

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApp, ABNFtry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):super(Test, self).__init__()self.url = "ws://echo.websocket.org/"self.ws = Nonedef on_message(self, message):print("####### on_message #######")print("message:%s" % message)def on_error(self, error):print("####### on_error #######")print("error:%s" % error)def on_close(self):print("####### on_close #######")def on_ping(self, message):print("####### on_ping #######")print("ping time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))print("ping message:%s" % message)def on_pong(self, message):print("####### on_pong #######")print("pong time:%s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))print("pong message:%s" % message)def on_open(self):print("####### on_open #######")thread.start_new_thread(self.run, ())def run(self, *args):# for i in range(3):#     time.sleep(1)#     self.ws.send("Hello %d" % i)while True:time.sleep(1)input_msg = input("输入要发送的ping消息(ps:输入关键词 close 结束程序):\n")if input_msg == "close":self.ws.close()  # 关闭print("thread terminating...")breakelse:self.ws.send(input_msg, ABNF.OPCODE_PING)# self.ws.send(input_msg)def start(self):websocket.enableTrace(True)  # 开启运行状态追踪。debug 的时候最好打开他,便于追踪定位问题。self.ws = WebSocketApp(self.url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close,on_ping=self.on_ping,on_pong=self.on_pong)self.ws.run_forever(ping_interval=20, ping_timeout=10)if __name__ == '__main__':Test().start()

框架每 20 秒发送空内容的 ping,控制台输出如下:

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: MDsoARDQEI8/4YILiLGwHw==
Sec-WebSocket-Version: 13
Connection: upgrade-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 08:20:42 GMT
Sec-WebSocket-Accept: QiNgZdxqfcpJIeKlV6Byezls2Gw=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
####### on_open #######
输入要发送的ping消息(ps:输入关键词 close 结束程序):
send: b'\x89\x80J\x1cn\x8c'
####### on_pong #######
pong time:2021-08-04 16:21:12
pong message:b''
send: b'\x89\x80_\xad\xa1\xd9'
####### on_pong #######
pong time:2021-08-04 16:21:32
pong message:b''
send: b'\x89\x80\xb6$\xc9\x9d'
####### on_pong #######
pong time:2021-08-04 16:21:52
pong message:b''
send: b'\x89\x80)\xc3\x1f\xdc'
####### on_pong #######
pong time:2021-08-04 16:22:12
pong message:b''
send: b'\x89\x80\xcf|\xfa&'
####### on_pong #######
pong time:2021-08-04 16:22:32
pong message:b''
send: b'\x89\x80\xe5\x19/\xf9'
####### on_pong #######
pong time:2021-08-04 16:22:52
pong message:b''
send: b'\x89\x80\xfb\x9bA8'
####### on_pong #######
pong time:2021-08-04 16:23:12
pong message:b''
send: b'\x89\x80\xfc\xaa\xa6}'
####### on_pong #######
pong time:2021-08-04 16:23:32
pong message:b''
send: b'\x89\x80O\xa0\x0e\xb6'
####### on_pong #######
pong time:2021-08-04 16:23:52
pong message:b''"""

手动发送有内容的 ping,控制台输出如下:

"""
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: echo.websocket.org
Origin: http://echo.websocket.org
Sec-WebSocket-Key: Ovizd9gdVze4BCLhymQ92Q==
Sec-WebSocket-Version: 13
Connection: upgrade-----------------------
--- response header ---
####### on_open #######
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Date: Wed, 04 Aug 2021 08:26:07 GMT
Sec-WebSocket-Accept: vTtPyDWKViUA88UgBaLep+qi+CI=
Server: Kaazing Gateway
Upgrade: websocket
-----------------------
输入要发送的ping消息(ps:输入关键词 close 结束程序):
ping_test
send: b'\x89\x89+\x81\x19$[\xe8wCt\xf5|W_'
####### on_pong #######
pong time:2021-08-04 16:26:24
pong message:b'ping_test'
输入要发送的ping消息(ps:输入关键词 close 结束程序):
ping1111111111
send: b'\x89\x8e\xe3\x05\xbck\x93l\xd2\x0c\xd24\x8dZ\xd24\x8dZ\xd24'
####### on_pong #######
pong time:2021-08-04 16:26:32
pong message:b'ping1111111111'
输入要发送的ping消息(ps:输入关键词 close 结束程序):
send: b'\x89\x80#\x0e\xa8\xdd'
####### on_pong #######
pong time:2021-08-04 16:26:37
pong message:b''
close
send: b'\x88\x82-0\xd6\xbc.\xd8'
####### on_close #######Process finished with exit code 0"""

Python websocket之 websocket-client 库的使用相关推荐

  1. python websocket库有什么_常用Python爬虫与Web开发库有哪些?

    Python爬虫和Web开发均是与网页相关的知识技能,无论是自己搭建的网站还是爬虫爬去别人的网站,都离不开相应的Python库,以下是常用的Python爬虫与Web开发库. **1.爬虫库** bea ...

  2. WebSocket 入门及开源库

    转载自公众号:FightingCoder WebSocket 协议和知识 WebSocket是一种在单个TCP连接上进行全双工通信的协议.WebSocket通信协议于2011年被IETF定为标准RFC ...

  3. 基于python以及AIUI WebSocket,WeChatPYAPI实现的微信聊天机器人

    基于python以及AIUI WebSocket,WeChatPYAPI实现的微信聊天机器人 做此文的目的首先是学习Markdown的用法哈哈哈哈,其实也是记录自己学习的一个过程. 以后我也会将自己在 ...

  4. python socketio_flask-socketio实现WebSocket的方法

    [flask-socektio] 之前不知道在哪个场合下提到过如何从web后台向前台推送消息.听闻了反向ajax技术这种模式之后,大呼神奇,试了一下之后发现也确实可以用.不过,反向ajax的代价也很明 ...

  5. websocket python unity_Unity中Websocket的简单使用

    首先我们需要一个websocket服务器,之前的博文中有做 Tomcat架设简单Websocket服务器 用的时候打开就行了,先不管它 Unity中新建场景 建UI(UGUI) 有一个连接按钮Butt ...

  6. Python Django使用websocket channels3.0.3

    Channels3.0.3 网上的教程是2.4.0,本文照着官方文档整合. Installation - Channels 3.0.3 documentation https://channels.r ...

  7. python常用的量化金融库

    下面是常用的量化金融常用的库,以及与量化金融有关的支持库,有些需要科学上网才能打开. 文章目录 python基本的数值库和数据结构 金融工具和定价 指标 交易和回溯测试 风险分析 因素分析 时间序列( ...

  8. Python中运用的基础库

    库名称简介 Chardet字符编码探测器,可以自动检测文本.网页.xml的编码. colorama主要用来给文本添加各种颜色,并且非常简单易用. Prettytable主要用于在终端或浏览器端构建格式 ...

  9. 用python画股票分时图 github_用python的matplotlib和numpy库绘制股票K线均线和成交量的整合效果(含量化验证交易策略代码)...

    在用python的matplotlib和numpy库绘制股票K线均线的整合效果(含从网络接口爬取数据和验证交易策略代码)一文里,我讲述了通过爬虫接口得到股票数据并绘制出K线均线图形的方式,在本文里,将 ...

  10. Python读写Excel文件第三方库汇总

    常见库简介 xlrd xlrd是一个从Excel文件读取数据和格式化信息的库,支持.xls以及.xlsx文件. http://xlrd.readthedocs.io/en/latest/     1. ...

最新文章

  1. RxJava 中的Map函数原理分析
  2. C#微信公众号开发系列教程三(消息体签名及加解密)
  3. 表格布局页面_对于表格布局管理器的回顾以及接下来的目标
  4. java使用zmodem_SecureCRT 中使用zmodem和Linux服务器交换文件
  5. C++ 11 深度学习(一)auto、头文件防卫、引用、常量
  6. BZOJ1823[JSOI2010]满汉全席——2-SAT+tarjan缩点
  7. TextView内可以有多种样式吗?
  8. 8代cpu能装linux 系统吗,Intel支持八九代酷睿的B365芯片组将登场亮相
  9. 构造方法(设计一个Fan类来表示一个风扇)
  10. ansible meta目录
  11. 按头安利 好看又实用的毛笔书法字体素材看这里
  12. Keep your fork synced
  13. 云洲智能,能否成为科创板无人船艇第一股?
  14. 前端知识-CSS定位机制:标准流、浮动、定位
  15. java代码中实现excel表下载
  16. Sata接口读取新硬盘读不出问题解决
  17. idea设置代码提示
  18. Android java.lang.RuntimeException: Canvas: trying to use a recycled bitmap android.graphics.Bitmap@
  19. 大厂可能真不像你想象的那样系列之阿里
  20. 植物蛋白食品的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告

热门文章

  1. 微信小游戏-海盗来了打金初体验
  2. puzzle(0914)方块识途、拾穗方块
  3. 2020年程序员互联网薪资出炉!你能猜到有多少呢?
  4. HTML5+CSS3基础学习笔记:2
  5. 经典分频器——奇数分频(3分频,5分频,任意分频)
  6. 显示网格(grid)
  7. 【C语言】BC64牛牛的快递(DAY 5)
  8. Ubuntu好用的截图软件推荐
  9. 生成对抗网络(GAN)研究年度进展评述 2017
  10. ABB 机器人二次开发另一种方式 socket通信