最近写安卓手机客户端,要和后台通讯, python 写了个后台服务用于自测,感觉代码有通用性,发下吧。

设计:

分成三个部分, 报文设计,后台设计,后台测试用例。后台设计的比较挫,但是可以用。

细节:

报文部分

//包头和包体:sizeof(Pkg)=16+msgLen+extLen
struct Pkg{struct PkgHdr hdr;       //包头,固定长度sizeof(PkgHdr)uint8_t msg[msgLen];  //放置包体待解析消息,json,pb,tlv等,未使用则为空uint8_t ext[extLen]; //放置二进制扩展数据(文件或数据流),未使用则为空
};
//包头和包体:sizeof(Pkg)=16+msgLen+extLen
struct Pkg{struct PkgHdr hdr;       //包头,固定长度sizeof(PkgHdr)uint8_t msg[msgLen];  //放置包体待解析消息,json,pb,tlv等,未使用则为空uint8_t ext[extLen]; //放置二进制扩展数据(文件或数据流),未使用则为空
};


后台模块
# coding=utf-8
import argparse
import logging
import os
import time
import uuid
import json
import threading
import multiprocessing
import random
import select
import socket
import queue
import uuid
import structfrom enum import Enum, uniqueimport tornado.ioloopg_select_timeout = 10class Server(object):def __init__(self, host='192.168.100.41', port=33333, timeout=2, client_nums=10, speech_recognizer=None):self.__host = hostself.__port = portself.__timeout = timeoutself.__client_nums = client_numsself.__buffer_size = 1024self.__frame_length = 16self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server.setblocking(0)self.server.settimeout(self.__timeout)self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) #keepaliveself.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #端口复用server_host = (self.__host, self.__port)try:self.server.bind(server_host)self.server.listen(self.__client_nums)except:raise#网络相关处理self.inputs = [self.server]   self.outputs = []             self.message_queues = {}      self.client_info = {}#业务相关处理self.store_dir_path = ""self.filedesc_wavpath_dict = {}self.speech_recognizer = speech_recognizerdef store_data(self, file_desc, data):file_desc_list = self.filedesc_wavpath_dict.keys()if file_desc not in file_desc_list:random_str_wav = str(_get_ms_time_int_part()) + '-' + str(uuid.uuid4()) + "test.wav"wav_path = os.path.join(self.store_dir_path, random_str_wav)self.filedesc_wavpath_dict[file_desc] = wav_pathlogging.info("random_str:{}".format(random_str_wav))logging.info("file desc:{}".format(file_desc))logging.info("file path:{}".format(wav_path))with open(self.filedesc_wavpath_dict[file_desc], "ab+") as inputwav:inputwav.write(data)return self.filedesc_wavpath_dict[file_desc]def readMSGHead(self, filedesc):'''//数据包头:sizeof(PkgHdr)=16struct PkgHdr{uint16_t magic;     //magic number:0xfffeuint16_t bodyLen;   //包体长度,即msg和ext总长,不含包头uint16_t pkgType;   //0,请求包 1,返回包 ...后续可扩展uint16_t cmd;       //请求和返回命令字,目前默认填0uint16_t retCode;   //0成功, 其它失败,返回包填写uint16_t msgFmt;    //消息格式0:json 1:protobuf ...uint16_t msgLen;    //消息长度uint16_t extLen;    //扩展包长度};'''frame_length_bytes = filedesc.recv(self.__frame_length)magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLen = struct.unpack('>HHHHHHHH', frame_length_bytes)return magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLendef logMSG(self, msg ):msg_head = msg[0:16]magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLen = struct.unpack('>HHHHHHHH', msg_head)logging.info("========================================")logging.info("msg_head: {}".format(msg_head))logging.info("bodyLen:  {}".format(bodyLen))logging.info("pkgType:  {}".format(pkgType))logging.info("cmd:      {}".format(cmd))logging.info("retCode:  {}".format(retCode))logging.info("msgFmt:   {}".format(msgFmt))logging.info("msgLen:   {}".format(msgLen))logging.info("extLen:   {}".format(extLen))logging.info("msgjson:  {}".format(msg[16:16+msgLen]))logging.info("========================================")def readMSGMsg(self, filedesc, msgLen):msg = filedesc.recv(msgLen)msg = msg.decode()msg_json = json.loads(msg)return msg_jsondef readMSGExt(self, filedesc, extLen):ext = filedesc.recv(extLen)return extdef decodeMSG(self, filedesc):MSGHeadInfo = self.readMSGHead(filedesc)logging.info("decodeMSG MSGHeadInfo:{}".format(MSGHeadInfo))msgLen = MSGHeadInfo[6]extLen = MSGHeadInfo[7]logging.info("decodeMSG msgLen:{}".format(msgLen))logging.info("decodeMSG extLen:{}".format(extLen))msg_json = self.readMSGMsg(filedesc, msgLen)Ext = self.readMSGExt(filedesc, extLen)return msg_json, Extdef encodeMSG(self, pkgType_input, errCode, errMsg, refresh, text, ext):magic = 0xfffebodyLen = 0pkgType = pkgType_inputcmd = 0retCode = 0msgFmt = 0msgLen = 0extLen = 0msg_json_data = {"errCode": errCode, "errMsg": errMsg, "refresh": refresh, "text": text}msg = json.dumps(msg_json_data)if msg:msgLen = len(msg)if ext:extLen = len(ext)bodyLen = msgLen + extLenmsgHeadByteArray = bytearray(struct.pack('>HHHHHHHH', magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLen))msgMsgByteArray = bytearray(str(msg).encode())msg = msgHeadByteArray + msgMsgByteArrayif ext:msgExtByteArray = bytearray(ext)msg = msg + msgExtByteArrayreturn msgdef call_asr(self, wav_file_path ):decode_result = self.speech_recognizer.wav_to_txt(wav_file_path)return decode_resultdef run(self):while True:readable , writable , exceptional = select.select(self.inputs, self.outputs, self.inputs, g_select_timeout)if not (readable or writable or exceptional) :continuefor s in readable :if s is self.server:#是客户端连接connection, client_address = s.accept()#print "connection", connectionprint( "%s connect. " %str(client_address) )connection.setblocking(False) self.inputs.append(connection) #客户端添加到inputsself.client_info[connection] = str(client_address)self.message_queues[connection] = queue.Queue()  #每个客户端一个消息队列else:#是client, 数据发送过来receiveMSG = Nonetry:receiveMSG = self.decodeMSG(s)except Exception as e:err_msg = "Client Error!!!"logging.error(err_msg)logging.error(str(e))if receiveMSG :msg_json, Ext = receiveMSGlogging.info("receive msg_json:   {}".format(msg_json))logging.info("receiveExt length:  {}".format(len(Ext)))wav_file_path = self.store_data(s, Ext)dataoutput = "%s %s " % (time.strftime("%Y-%m-%d %H:%M:%S"), self.client_info[s])#dataoutput = "message from server"self.message_queues[s].put(dataoutput) if s not in self.outputs: self.outputs.append(s)else: #Interpret empty result as closed connectionprint ("Client:%s Close." % str( self.client_info[s]) )if s in self.outputs :self.outputs.remove(s)self.inputs.remove(s)s.close()del self.message_queues[s]del self.client_info[s]if s in self.filedesc_wavpath_dict.keys():del self.filedesc_wavpath_dict[s]for s in writable: #outputs 有消息就要发出去了try:next_msg = self.message_queues[s].get_nowait()  #非阻塞获取except queue.Empty:err_msg = "Output Queue is Empty!"#g_logFd.writeFormatMsg(g_logFd.LEVEL_INFO, err_msg)self.outputs.remove(s)except Exception as e:  #发送的时候客户端关闭了则会出现writable和readable同时有数据,会出现message_queues的keyerrorerr_msg = "Send Data Error! ErrMsg:%s" % str(e)logging.error(err_msg)if s in self.outputs:self.outputs.remove(s)else:try:cli = spkgType_input = 1errCode = 0errMsg = "OK"refresh = 1text = next_msgext = None'''logging.info("errCode   :{}".format(errCode)) logging.info("errMsg    :{}".format(errMsg))logging.info("refresh   :{}".format(refresh))logging.info("text      :{}".format(text))logging.info("ext       :{}".format(ext))'''msgresp = self.encodeMSG(pkgType_input, errCode, errMsg, refresh, text, ext) self.logMSG(msgresp)cli.send(msgresp)except Exception as e: #发送失败就关掉err_msg = "Send Data to %s  Error! ErrMsg:%s" % (str(self.client_info[cli]), str(e))logging.error(err_msg)print( "Client: %s Close Error." % str(self.client_info[cli]) )if cli in self.inputs:self.inputs.remove(cli)cli.close()if cli in self.outputs:self.outputs.remove(s)if cli in self.message_queues:del self.message_queues[s]del self.client_info[cli]del self.filedesc_wavpath_dict[s]for s in exceptional:logging.error("Client:%s Close Error." % str(self.client_info[cli]))if s in self.inputs:self.inputs.remove(s)s.close()if s in self.outputs:self.outputs.remove(s)if s in self.message_queues:del self.message_queues[s]del self.client_info[s]del self.filedesc_wavpath_dict[s]if "__main__" == __name__:logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s",filename='realtime_asr_server.log',level=logging.INFO)Server().run()

后台测试代码
import sys
import time
import socket
import threading
import logging
import json
import structclass Client(object):def __init__(self, host, port=33333, timeout=1, reconnect=2):self.__host = hostself.__port = portself.__timeout = timeoutself.__buffer_size = 1024self.__flag = 1self.client = Noneself.__lock = threading.Lock()self.__frame_length = 16def readMSGHead(self, filedesc):'''//数据包头:sizeof(PkgHdr)=16struct PkgHdr{uint16_t magic;     //magic number:0xfffeuint16_t bodyLen;   //包体长度,即msg和ext总长,不含包头uint16_t pkgType;   //0,请求包 1,返回包 ...后续可扩展uint16_t cmd;       //请求和返回命令字,目前默认填0uint16_t retCode;   //0成功, 其它失败,返回包填写uint16_t msgFmt;    //消息格式0:json 1:protobuf ...uint16_t msgLen;    //消息长度uint16_t extLen;    //扩展包长度};'''frame_length_bytes = filedesc.recv(self.__frame_length)magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLen = struct.unpack('>HHHHHHHH', frame_length_bytes)return magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLendef readMSGMsg(self, filedesc, msgLen):msg = filedesc.recv(msgLen)msg = msg.decode()msg_json = json.loads(msg)return msg_jsondef readMSGExt(self, filedesc, extLen):ext = filedesc.recv(extLen)return extdef decodeMSG(self, filedesc):MSGHeadInfo = self.readMSGHead(filedesc)msgLen = MSGHeadInfo[6]extLen = MSGHeadInfo[7]logging.info("msgLen:{}".format(msgLen))logging.info("extLen:{}".format(extLen))msg_json = self.readMSGMsg(filedesc, msgLen)Ext = self.readMSGExt(filedesc, extLen)return msg_json, Extdef encodeMSG(self, pkgType_input, errCode, errMsg, refresh, text, ext):magic = 0xfffebodyLen = 0pkgType = pkgType_inputcmd = 0retCode = 0msgFmt = 0msgLen = 0extLen = 0msg_json_data = {"errCode": errCode, "errMsg": errMsg, "refresh": refresh, "text": text}msg = json.dumps(msg_json_data)if msg:msgLen = len(msg)if ext:extLen = len(ext)bodyLen = msgLen + extLenmsgHeadByteArray = bytearray(struct.pack('>HHHHHHHH', magic, bodyLen, pkgType, cmd, retCode, msgFmt, msgLen, extLen))msgMsgByteArray = bytearray(str(msg).encode())msg = msgHeadByteArray + msgMsgByteArrayif ext:msgExtByteArray = bytearray(ext)msg = msg + msgExtByteArrayreturn msg@propertydef flag(self):return self.__flag@flag.setterdef flag(self, new_num):self.__flag = new_numdef __connect(self):client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.bind(('0.0.0.0', 12345,))client.setblocking(True)client.settimeout(self.__timeout)client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 端口复用server_host = (self.__host, self.__port)try:client.connect(server_host)except:raisereturn clientdef send_msg(self):if not self.client:returnwhile True:time.sleep(0.1)#data = sys.stdin.readline().strip()#data = input("input string:")data = "hello from client"if "exit" == data.lower():with self.__lock:self.flag = 0breakdata = self.encodeMSG(0, 0, "OK", 0, data, b"xiaojiba")logging.info("client sendall")self.client.sendall(data)returndef recv_msg(self):if not self.client:returnwhile True:data = Nonewith self.__lock:if not self.flag:print('ByeBye~~')breaktry:logging.info("client recv")msg_json, Ext = self.decodeMSG(self.client)text = msg_json['text']isend = msg_json['refresh']logging.info("text:{}".format(text))logging.info("isend:{}".format(isend))except socket.timeout:continueexcept:raiseif data:print("%s\n" % text)time.sleep(0.1)returndef run(self):self.client = self.__connect()send_proc = threading.Thread(target=self.send_msg)recv_proc = threading.Thread(target=self.recv_msg)recv_proc.start()send_proc.start()recv_proc.join()send_proc.join()self.client.close()if "__main__" == __name__:logging.basicConfig(format="%(asctime)s %(name)s %(levelname)s %(message)s",#filename='realtime_asr_server.log',level=logging.INFO)Client('192.168.100.41').run()

												

python 网络 select相关推荐

  1. python php 全双工,Python网络编程之使用select实现socket全双工异步通信功能

    这篇文章主要介绍了Python网络编程使用select实现socket全双工异步通信功能,在这里分享给大家,有需要的朋友可以参考下 本文实例讲述了Python网络编程使用select实现socket全 ...

  2. python网络开发框架_greenev首页、文档和下载 - Python网络服务框架 - OSCHINA - 中文开源技术交流社区...

    greenev是一个基于greenlet协程,事件驱动,非阻塞socket模型的Python网络服务框架,它使得可以编写同步的代码,却得到异步执行的优点.reactor模式采用基于epoll, kqu ...

  3. python网络编程证书_《Python网络编程基础》笔记

    python网络编程基础 ================== Author: lujun9972 Date: 2013-03-08 22:29:20 CST Table of Contents == ...

  4. python网络编程项目_python网络编程详解

    最近在看<UNIX网络编程 卷1>和<FREEBSD操作系统设计与实现>这两本书,我重点关注了TCP协议相关的内容,结合自己后台开发的经验,写下这篇文章,一方面是为了帮助有需要 ...

  5. 开源 Python网络爬虫框架 Scrapy

    开源 Python 网络爬虫框架 Scrapy:http://blog.csdn.net/zbyufei/article/details/7554322 介绍 所谓网络爬虫,就是一个在网上到处或定向抓 ...

  6. python中select模块_基于python select.select模块通信的实例讲解 如何用python写个串口通信的程序...

    python socket怎么利用select实现双工通信 方法: Before : 0000000000000000000000000000000000000000 After pack: 0100 ...

  7. python网络编程内容_图解Python网络编程

    Python Python开发 Python语言 图解Python网络编程 本篇索引 (1)基本原理 本篇指的网络编程,仅仅是指如何在两台或多台计算机之间,通过网络收发数据包:而不涉及具体的应用层功能 ...

  8. 读书笔记 - -《Python网络编程》重点

    文章目录 一.前言 二.客户/服务器网络编程简介 三.UDP 3.1 端口号 3.2 套接字 3.3 UDP分组 3.4 小结 四.TCP 4.1 TCP工作原理 4.2 绑定接口 4.3 死锁 4. ...

  9. Python网络数据爬取及分析-智联招聘

    python网络数据爬取及分析-智联招聘 一. 数据爬取 智联招聘是一家面向大型公司和快速发展的中小企业提供一站式专业人力资源的公司,可在智联招聘网站上根据不同城市.不同职位需求搜索得到相关招聘信息. ...

  10. Python 网络爬虫工具:httpx 和 parsel(对比测评)

    Python 网络爬虫领域两个最新的比较火的工具莫过于 httpx 和 parsel 了. httpx 号称下一代的新一代的网络请求库,不仅支持 requests 库的所有操作,还能发送异步请求,为编 ...

最新文章

  1. 用可组合的构建块丰富用户界面?谷歌提出「可解释性」的最新诠释
  2. 深入理解 Java 虚拟机 学习笔记
  3. de casteljau算法_泊松分布算法的应用:开一家4S店
  4. Spring Boot——SpringMVC带URL前缀的静态资源解决方案
  5. python内存回收垃圾有哪些_[Python之路] 内存管理垃圾回收
  6. 微软为一人收购一公司?破解索尼程序、写黑客小说,看他彪悍的程序人生!...
  7. Python中的枚举enumerate
  8. 捕鱼达人python游戏项目,少儿编程体验课程项目,源码免费分享,内置详细注释,可更改游戏参数;关注获取更多资源
  9. Turbo C 2.0
  10. C语言 修改JPEG图片属性
  11. 常见分布式任务调度工具分析
  12. 谢耳朵最萌最贱表情, 哈哈
  13. 华为防火墙配置(防火墙NAT)
  14. Android实现二维码扫描功能(三)-闪光灯控制
  15. 智能家居控制系统MECOOL KA1智能音响
  16. 推荐BMS锂电池管理使用KT6368A蓝牙模块芯片
  17. Mozilla 即谋智人
  18. Ubuntu 安装 eyeOS
  19. 如何配置数据库密码加密访问数据库
  20. JavaScript在线编程

热门文章

  1. 习题2_2、韩信点兵
  2. 修马达的php源码,无刷电机控制基本原理(示例代码)
  3. 基于Spring Boot企业微信点餐系统项目总结
  4. M3U8 Downloader的使用
  5. Awesome Crowd Counting
  6. 移动端产品设计(02)-移动APP产品结构
  7. 近期java面试总结
  8. opencv-python实际演练(二)军棋自动裁判(4)棋子图像提取算法的改进
  9. html box 竖线,【CSS】这种竖线效果如何实现呢
  10. R语言使用dplyr包进行数据聚合统计计算滑动窗口统计值(Window Statistics)、计算滑动分组四分位差(IQR、四分位距)并合并生成的统计数据到原数据集中