2019独角兽企业重金招聘Python工程师标准>>>

最近用了Flask-SocketIO,因为要和一些性能比较差的机器通信,所以数据格式并没有采用传统的json,而是采用Google的跨平台序列化工具FlatBuffers,它的结构化数据都以二进制形式保存,所以需要用SocketIO传输二进制格式,这个Flask-SocketIO支持的。

但在写单元测试的的时候发现Flask-SocketIO的单元测试会异常,代码和异常如下:

  • test.py
from flask import Flask, render_template
from flask_socketio import SocketIO, emitapp = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socket_io = SocketIO(binary=True)
socket_io.init_app(app)@app.route('/')
def index():return 'hello'@socket_io.on('event')
def test_message(message):print("Success: %s", str(message))if __name__ == '__main__':socket_io.run(app)
  • test.py
import unittest
from test_app import app, socket_ioclass FlaskrTestCase(unittest.TestCase):def setUp(self):self.app = appself.app.config['TESTING'] = Trueself.socket_io_client = socket_io.test_client(self.app)def test_socket_io(self):self.socket_io_client.emit('event', bytes(b'1111111'), binary=True)def tearDown(self):passif __name__ == '__main__':unittest.main()
  • 异常信息
Testing started at 5:39 PM ...
decode: encoded_packet[['51-["event",{"_placeholder":true,"num":0}]', b'1111111']]Error
Traceback (most recent call last):File "/mnt/hgfs/Share/fogstor-server/test.py", line 15, in test_socket_ioself.socket_io_client.emit('event', bytes(b'1111111'), binary=True)File "/home/jeff/develop-env/fosstor_3.5/lib/python3.5/site-packages/flask_socketio/test_client.py", line 117, in emitself.socketio.server._handle_eio_message(self.sid, pkt.encode())File "/home/jeff/develop-env/fosstor_3.5/lib/python3.5/site-packages/socketio/server.py", line 522, in _handle_eio_messageraise ValueError('Unknown packet type.')
ValueError: Unknown packet type.Process finished with exit code 0

这是由于Flask-SocketIO在二进制传输应用不多,所以代码并没有很好的覆盖到,只需要将

# flask_socketio文件夹下的test_client.py
......def emit(self, event, *args, **kwargs):"""Emit an event to the server.:param event: The event name.:param *args: The event arguments.:param callback: ``True`` if the client requests a callback, ``False``if not. Note that client-side callbacks are notimplemented, a callback request will just tell theserver to provide the arguments to invoke thecallback, but no callback is invoked. Instead, thearguments that the server provided for the callbackare returned by this function.:param namespace: The namespace of the event. The global namespace isassumed if this argument is not provided."""namespace = kwargs.pop('namespace', None)callback = kwargs.pop('callback', False)id = Noneif callback:self.callback_counter += 1id = self.callback_counterpkt = packet.Packet(packet.EVENT, data=[event] + list(args),namespace=namespace, id=id, binary=False)self.ack = Nonewith self.app.app_context():self.socketio.server._handle_eio_message(self.sid, pkt.encode())if self.ack is not None:return self.ack['args'][0] if len(self.ack['args']) == 1 \else self.ack['args']......

修改为

# flask_socketio文件夹下的test_client.py
......def emit(self, event, *args, **kwargs):"""Emit an event to the server.:param event: The event name.:param *args: The event arguments.:param callback: ``True`` if the client requests a callback, ``False``if not. Note that client-side callbacks are notimplemented, a callback request will just tell theserver to provide the arguments to invoke thecallback, but no callback is invoked. Instead, thearguments that the server provided for the callbackare returned by this function.:param namespace: The namespace of the event. The global namespace isassumed if this argument is not provided."""namespace = kwargs.pop('namespace', None)callback = kwargs.pop('callback', False)binary = kwargs.pop('binary', False)id = Noneif callback:self.callback_counter += 1id = self.callback_counterpkt = packet.Packet(packet.EVENT, data=[event] + list(args),namespace=namespace, id=id, binary=binary)self.ack = Nonewith self.app.app_context():if binary:encoded_packet = pkt.encode()self.socketio.server._handle_eio_message(self.sid, encoded_packet[0])self.socketio.server._handle_eio_message(self.sid, encoded_packet[1])else:self.socketio.server._handle_eio_message(self.sid, pkt.encode())if self.ack is not None:return self.ack['args'][0] if len(self.ack['args']) == 1 \else self.ack['args']......

就可以单元测试了:

Testing started at 5:40 PM ...
decode: encoded_packet[51-["event",{"_placeholder":true,"num":0}]]
reconstruct_binary: data[['event', {'_placeholder': True, 'num': 0}]], attachments[[b'1111111']]
Success: %s b'1111111'Process finished with exit code 0

转载于:https://my.oschina.net/jianming/blog/1600716

Flask-SocketIO传输二进制单元测试的Bug和修改相关推荐

  1. json传输二进制的方案(python版)

    json传输二进制的方案(python版) 1.json不能直接传二进制文件 json只能传递基本的数型(如:int,long,string等),但不能传递byte类型.但是有时候我们想在json中传 ...

  2. Nginx + uWSGI + flask + socketio 部署解决方案

    Nginx + uWSGI + flask + socketio 部署解决方案 参考文章: (1)Nginx + uWSGI + flask + socketio 部署解决方案 (2)https:// ...

  3. HTTP传输二进制初探

    [转]HTTP传输二进制初探 http://www.51testing.com/?uid-390472-action-viewspace-itemid-233993 [转]HTTP传输二进制初探 上一 ...

  4. c++传输二进制数据

    文章目录 一.前提 二.二进制和指针相关概念 1.二进制数据传输的本质 2.指针相关概念 (1)float* 和char*类型的指针有什么区别吗 (2)c语言关于指针和长度的理解 3.二进制传输步骤 ...

  5. micropython flask_在Python的Flask框架中实现单元测试的教程

    概要 在前面的章节里我们专注于在我们的小应用程序上一步步的添加功能上.到现在为止我们有了一个带有数据库的应用程序,可以注册用户,记录用户登陆退出日志以及查看修改配置文件. 在本节中,我们不为应用程序添 ...

  6. flask+socketio+echarts3 服务器监控程序(基于后端数据推送)

    本文地址:http://www.cnblogs.com/hhh5460/p/7397006.html 说明 以前的那个例子的思路是后端监控数据存入数据库:前端ajax定时查询数据库. 这几天在看web ...

  7. Flask视频流传输

    参考资料 在 Flask 里产生流式响应 使用 multipart/x-mixed-replace 实现 http 实时视频流 使用 Flask 进行视频流传输 重新审视 Flask 视频流 目录 流 ...

  8. json传输二进制的方案【转】

    本文转自:http://wiyi.org/binary-to-string.html json 是一种很简洁的协议,但可惜的是,它只能传递基本的数型(int,long,string等),但不能传递by ...

  9. flask json传输失败_GO小知识之实例演示 json 如何转化为 map 和 struct

    简单谈一些 JSON 数据处理的小知识.近期工作中,因为要把数据库数据实时更新到 elasticsearch,在实践过程中遇到了一些 JSON 数据处理的问题. 实时数据 实时数据获取是通过阿里开源的 ...

  10. 在计算机网络的s,在计算机网络中传输二进制信息时,经常使用的速率单位有“kb/s”、“Mb/s”等。其中,1Mb/s=1000kb/s...

    [判断题]定量负荷运动时,运动员的机能反应较小. [判断题]磁标势的引入和电标势的引入一样,是不需要附加条件的. ( ) [单选题]一人站在转动的转台上,在他伸出的两手中各握有一个重物,若此人向着胸部 ...

最新文章

  1. java二分法找数数_JavaSE语言基础之数组二分法查找
  2. linux网口驱动实现(待续)
  3. Web socket广播
  4. jdk 安全属性_系统属性的JDK 12 Javadoc标记
  5. 检查Red Hat JBoss BRMS部署架构的规则和事件(第二部分)
  6. VSCode 调试 Egg 完美版 - 进化史 #25
  7. 弥补Web开发缺陷 实战HTML 5中存储API
  8. C语言标准库stdlib.h
  9. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯
  10. php调用7天内容,如何使用JS取得最近7天与最近3天日期
  11. ubuntu 远程 搭建 Jupyter Notebook 服务器配置
  12. spring mvc 上传文件
  13. 北大青鸟汉字注释机内码_北大青鸟主机汉子机内码 汉字机内码在线转换
  14. iptable 简析
  15. 若程序员们的“反996协议”实行,会发生什么
  16. 从0开始的TypeScriptの八:类
  17. 【详细教程·本人亲测】解决win10家庭版系统C:\Users用户名中有中文,更改为英文的问题
  18. 电脑连接手机热点时,电脑能够搜到但是却连接不上的问题
  19. 安卓获取手机视频和图片
  20. ar 微信小程序_微信小程序开放AR功能,全面提升交互体验

热门文章

  1. Atitit 计算机系统结构 计算机系统结构 Cpu 存储 cache 指令系统 目录 Line 56: 第2章指令系统设计 指令格式 寻址方式 1 Line 64: 第3章CPU及其实现
  2. Atitit 手机号码规范 数字化管理 靓号 吉祥号码 洋马儿 规范 attilax总结
  3. Atitit.软件GUI按钮与仪表盘(01)--js区-----js格式化的使用
  4. paip.python 调用qt ui 总结
  5. GSM/GPRS MODEM 的上网设置
  6. Julia: PkgMirrors,提升库下载速度
  7. 白话关于API与SDK的区别
  8. 阿里云高级专家王林平:云数据库的运维体系构建
  9. 边缘计算对于基础架构和运营领导者意味着什么
  10. 【OFDM通信】基于matlab块状导频的信道估计算法仿真【含Matlab源码 1817期】