import asyncio
import websockets
import queue
import time#服务端主逻辑,通过客户端发送过来的类型,返回出认证的类型,既代表客户端要服务端干的事情是啥类型,然后服务端跟类型去处理
#本服务拥有存储数据的功能,但是队列形式存储,会有容量上限,所以在跑大数据量分布式的脚本时候,最好是等send的数据先跑,在跑recv的数据
#防止send的数据<recv的数据,有做2s的等空判断#服务端发
data_queu=queue.Queue()
# 检测客户端权限,用户名密码通过才能退出循环
queu_dict={}#存储队列
async def check_permit(websocket):try:while True:recv_str = await websocket.recv()cred_dict = recv_str.split(":")#首先判断账号密码是否正确if cred_dict[0] == "root" and cred_dict[1] == "123456":#判断是用默认的队列还是客户端在服务端创建的队列,send表示接收数据,recv表示服务端向客户端发送数据if cred_dict[2]=="send":print("我是系统默认")response_str = "congratulation, you have connect with server\r\nnow, you can do something else"await websocket.send(response_str)return "send"elif cred_dict[2]=="recv":response_str = "congratulation, you have connect with server\r\nnow, you can do something else"await websocket.send(response_str)return "recv"#如果不是用默认的服务端队列,就用自定义的队列else:#send表示创建的队列,认证成功后会返回队列对象和方式if cred_dict[3]=="send":#判断这个队列是不是已经有人创建过了,如果没有创建就创建一个send队列if cred_dict[2] not  in queu_dict.keys():queu=queue.Queue()queu_dict.setdefault(cred_dict[2],queu)response_str = "congratulation, you have connect with server\r\nnow, you can do something else"await websocket.send(response_str)return queu,"send"else:#如果有人创建,直接拿取之前创建的队列进行返回queu=queu_dict[cred_dict[2]]response_str = "congratulation, you have connect with server\r\nnow, you can do something else"await websocket.send(response_str)return queu,"send"elif cred_dict[3]=="recv":if cred_dict[2] not  in queu_dict.keys():response_str = "sorry, No one created this queue!"await websocket.send(response_str)else:queu=queu_dict[cred_dict[2]]response_str = "congratulation, you have connect with server\r\nnow, you can do something else"await websocket.send(response_str)return queu,"recv"else:response_str = "sorry, Unable to understand the behavior!"await websocket.send(response_str)else:response_str = "sorry, the username or password is wrong, please submit again"await websocket.send(response_str)except Exception as error:print(1)print(error)# 接收客户端消息并处理
async def recv_msg(websocket,topic):try:#如果是默认的队列,就调sendif topic=="send":print("send")while True:recv_text = await websocket.recv()print("SERVER RECV CLIENT DATA:",recv_text)data_queu.put(recv_text)response_text = f"server is accepted"await websocket.send(response_text)print("SERVER  SEND CLIENT DATA:", recv_text)#如果是默认的队列,就调recvelif topic=="recv":while True:recv_text = await websocket.recv()print("客户端发送的数据:",recv_text)if data_queu.empty()!=True:response_text = f"{data_queu.get()}"await websocket.send(response_text)elif data_queu.empty()==True:#判断队列为空,如果队列为空了就尝试等待2s在中断连接print("等待二秒后获取队列的消息,防止服务端存储数据小于发送的数据速率")time.sleep(2)if data_queu.empty()==True:await websocket.send("exit")await websocket.close(reason="user exit")print("已断开连接!")returnelse:#用于执行客户端创建的队列,实现send发送和recv接收if topic[1]=="send":while True:recv_text = await websocket.recv()print("客户端发送的数据:", recv_text)topic[0].put(recv_text)response_text = f"server is accepted"await websocket.send(response_text)elif topic[1]=="recv":while True:recv_text = await websocket.recv()print("客户端发送的数据:", recv_text)if topic[0].empty() != True:response_text = f"your submit context: {topic[0].get()}"await websocket.send(response_text)elif topic[0].empty() == True:print("等待二秒后获取队列的消息,防止服务端存储数据小于发送的数据速率")time.sleep(2)if topic[0].empty() == True:await websocket.send("exit")await websocket.close(reason="user exit")print("已断开连接!")returnexcept Exception as error:print(error)# 服务器端主逻辑
# websocket和path是该函数被回调时自动传过来的,不需要自己传
async def main_logic(websocket, path):type=await check_permit(websocket)print("type",type)await recv_msg(websocket,type)# 把ip换成自己本地的ip
start_server = websockets.serve(main_logic, '127.0.0.1', 5678)
# 如果要给被回调的main_logic传递自定义参数,可使用以下形式
# 一、修改回调形式
# import functools
# start_server = websockets.serve(functools.partial(main_logic, other_param="test_value"), '10.10.6.91', 5678)
# 修改被回调函数定义,增加相应参数
# async def main_logic(websocket, path, other_param)asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()#该程序执行到进出结束,不会因为return而结束

上面是服务端的代码,实现的功能:在同个局域网内,可以接收客户端发送来的数据,进行存储,也可向Ng个客户端发送数据,因踩用的是queue,所以如果每个客户端拿的队列名称是一样的,只会把queue的数据分给各个客户端,不会出现重复的情况,对于测试人员需要处理大数据量的场景适合可适用!

分布式处理数据:server相关推荐

  1. ZipKin原理学习(分布式实时数据追踪系统)+window部署zipkin+k8s部署zipkin

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 ZipKin入门介绍 Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google D ...

  2. 分布式处理数据:client

    基于上篇:分布式处理数据:server,提供client调用代码 import asyncio import websockets#async 为异步调用,可以理解为线程,但不等于线程,一公有10个活 ...

  3. 分布式大数据多维分析引擎:Kylin 在百度地图的实践

    2019独角兽企业重金招聘Python工程师标准>>> 1. 前言 百度地图开放平台业务部数据智能组主要负责百度地图内部相关业务的大数据计算分析,处理日常百亿级规模数据,为不同业务提 ...

  4. db2 分布式_DB2 Connect统一了对分布式异构数据的访问

    在本系列的第1部分中,我们介绍了DB2 Connect提供的不同编程接口以及实现这些接口的驱动程序. 在最后几节中,我们介绍了DB2 Connect提供的通信基础结构的表面,并了解了该基础结构如何极大 ...

  5. PB级分布式大数据的处理和分析应用

    文章讲的是PB级分布式大数据的处理和分析应用,对于大数据,串行的处理方式难以满足人们的要求,现在主要采用并行计算方式.现有的并行计算可以分为两种: ·细粒度的并行计算.这里细粒度主要是指指令或进程级别 ...

  6. 偏移出来的数据不准_独家解读!京东高可用分布式流数据存储的架构设计

    作者 | 李玥 编辑 | Vincent AI 前线导读:每天,超过千亿交易相关的数据在京东数千个系统中高速流转,确保数据的高可靠.高可用.一致性对京东的消息中间件系统是一项艰巨的技术挑战.为高性能. ...

  7. 王家林 云计算分布式大数据Hadoop实战高手之路第七讲Hadoop图文训练课程:通过HDFS的心跳来测试replication具体的工作机制和流程...

    这一讲主要深入使用HDFS命令行工具操作Hadoop分布式集群,主要是通过实验的配置hdfs-site.xml文件的心跳来测试replication具体的工作和流程. 通过HDFS的心跳来测试repl ...

  8. 分布式大数据多维分析(OLAP)引擎Apache Kylin安装配置及使用示例【转】

    Kylin 麒麟官网:http://kylin.apache.org/cn/download/ 关键字:olap.Kylin Apache Kylin是一个开源的分布式分析引擎,提供Hadoop之上的 ...

  9. Apache Gobblin 分布式大数据集成框架

    Apache Gobblin 是一个分布式大数据集成框架,用于流式和批处理数据生态系统.该项目 2014 年起源于 LinkedIn,2015 年开源,并于 2017 年 2 月进入 Apache 孵 ...

  10. 分布式内存数据技术为查询提速

    背景和需求 中国铁路客户服务中心网站(www.12306.cn)是世界规模最大的实时交易系统之一,媲美Amazon.com,节假日尤其是春节的访问高峰,网站压力巨大.据统计, 在2012年初的春运高峰 ...

最新文章

  1. 【视频】SQL Server 2008 R2 StreamInsight - 多源复杂事件处理
  2. jmh气象传真图网站_在冬奥滑雪场,他凭着气象信息,念好一本生意经
  3. Android下/data/data/package_name/files读写权限
  4. Vue.js 服务端渲染
  5. 数据中心柴油发电机组功率
  6. Mysql中文乱码问题解决
  7. 问题 1044: [编程入门]三个字符串的排序
  8. Spring自学日志03(作用域,自动装配)
  9. ie支持html5代码,使用 HTML5 Shiv 让 IE 支持 HTML5
  10. KOYO 光洋PLC
  11. 两阶段最小二乘法原理_两阶段最小二乘法第一阶段为什么加入原模型外生变量...
  12. ROS- 激光雷达测距原理及主要参数
  13. 1569 B.Chess Tournament
  14. echarts 堆叠柱状图label显示总和
  15. OBS无法捕捉显示屏
  16. Q1 不同企业的竞争优势(待完成)
  17. c java sha1加密解密_java HMACSHA1加密算法
  18. Vue和微信小程序绑定样式的区别
  19. Bandizip将大文件分卷压缩为多个小文件
  20. python飞机大战子弹不显示_飞机大战游戏 飞机打出一发子弹后就不出现子弹了...

热门文章

  1. 2020年起重机司机(限门式起重机)多少分及格及起重机司机(限门式起重机)考试内容
  2. 什么是mysql的慢查询,看完你就懂了
  3. java断言--assertThat用法
  4. 姚期智亲任主编,正规军的高中AI教材来了
  5. 图片的降噪处理 java_OpenCV去除图像中的噪声
  6. 【程序人生】虚拟现实(VR)版霍兰德职业兴趣岛测试
  7. 一加7t人脸识别_10月换新推荐:一加7T/荣耀20青春版领衔好手机
  8. Django创建数据库(Django数据库字段类型)
  9. 统计学(二)之一般线性模型(一)
  10. 回归和分类的线性模型