c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据
本文基于networkcomms2.3.1开源版本 gplv3协议
在networkcomms通信系统中,服务器端收到某连接上的数据后,数据会暂时存放在"数据包创建器"(PacketBuilder)中,PacketBuilder类似一个流动的容器,收到的数据被服务器处理完成后,相应在二进制数据,会从存储他的PacketBuilder中删除。
我们知道在networkcomms的消息体系中,传送的数据的第一个字节用来存储数据包包头长度,解析出数据包包头后,包头中包含数据包长度。所以在读入进入PacketBuilder中的数据,会根据第一个字节中存储的数据,解析出包头的长度,进一步解析出数据包的长度。如果PacketBuilder中的存储的字节大小,大于数据包的长度,那么主程序将会对PacketBuilder中的数据,进行提取,并进行处理。
此节,我们暂时不讨论PacketBuilder的细节,先看一下服务器如何对PacketBuilder中的数据进行处理
protected void IncomingPacketHandleHandOff(PacketBuilder packetBuilder) { try { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... checking for completed packet with " + packetBuilder.TotalBytesCached.ToString() + " bytes read."); if (packetBuilder.TotalPartialPacketCount == 0) throw new Exception("Executing IncomingPacketHandleHandOff when no packets exist in packetbuilder."); //循环,直到我们完成对packetBuilder的处理 //Loop until we are finished with this packetBuilder int loopCounter = 0; while (true) { //If we have ended up with a null packet at the front, probably due to some form of concatentation we can pull it off here //It is possible we have concatenation of several null packets along with real data so we loop until the firstByte is greater than 0 //此处,可能是处理心跳检测时对方发来的0字节数据 if (packetBuilder.FirstByte() == 0) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... null packet removed in IncomingPacketHandleHandOff() from " + ConnectionInfo + ", loop index - " + loopCounter.ToString()); packetBuilder.ClearNTopBytes(1); //Reset the expected bytes to 0 so that the next check starts from scratch packetBuilder.TotalBytesExpected = 0; //If we have run out of data completely then we can return immediately if (packetBuilder.TotalBytesCached == 0) return; } else { //First determine the expected size of a header packet //获取数据包包头长度 int packetHeaderSize = packetBuilder.FirstByte() + 1; //Do we have enough data to build a header? //如果没有足够的二级制数据来还原出一个数据包包头 if (packetBuilder.TotalBytesCached < packetHeaderSize) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet header."); //Set the expected number of bytes and then return packetBuilder.TotalBytesExpected = packetHeaderSize; return; } //We have enough for a header //有足够的数据来还原出数据包包头 PacketHeader topPacketHeader; using(MemoryStream headerStream = packetBuilder.ReadDataSection(1, packetHeaderSize - 1)) //数据包包头 topPacketHeader = new PacketHeader(headerStream, NetworkComms.InternalFixedSendReceiveOptions); //Idiot test if (topPacketHeader.PacketType == null) throw new SerialisationException("packetType value in packetHeader should never be null"); //We can now use the header to establish if we have enough payload data //First case is when we have not yet received enough data //如果没有足够的数据来还原出数据包 if (packetBuilder.TotalBytesCached < packetHeaderSize + topPacketHeader.PayloadPacketSize) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... ... more data required for complete packet payload. Expecting " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " total packet bytes."); //Set the expected number of bytes and then return packetBuilder.TotalBytesExpected = packetHeaderSize + topPacketHeader.PayloadPacketSize; return; } //Second case is we have enough data //有足够的数据还原出数据包 else if (packetBuilder.TotalBytesCached >= packetHeaderSize + topPacketHeader.PayloadPacketSize) { //We can either have exactly the right amount or even more than we were expecting //We may have too much data if we are sending high quantities and the packets have been concatenated //no problem!! SendReceiveOptions incomingPacketSendReceiveOptions = IncomingPacketSendReceiveOptions(topPacketHeader); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Debug("Received packet of type '" + topPacketHeader.PacketType + "' from " + ConnectionInfo + ", containing " + packetHeaderSize.ToString() + " header bytes and " + topPacketHeader.PayloadPacketSize.ToString() + " payload bytes."); //If this is a reserved packetType we call the method inline so that it gets dealt with immediately //判断是否为保留类型,保留类型会优先处理 //保留类型包括心跳检测、连接建立等 bool isReservedType = false; foreach (var tName in NetworkComms.reservedPacketTypeNames) { //isReservedType |= topPacketHeader.PacketType == tName; if (topPacketHeader.PacketType == tName) { isReservedType = true; break; } } //Only reserved packet types get completed inline //如果数据包类型为保留类型 设定为高优先级 if (isReservedType) {#if WINDOWS_PHONE var priority = QueueItemPriority.Normal;#else var priority = (QueueItemPriority)Thread.CurrentThread.Priority;#endif //创建优先级队列项目 PriorityQueueItem item = new PriorityQueueItem(priority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type '" + topPacketHeader.PacketType + "' inline. Loop index - " + loopCounter.ToString()); NetworkComms.CompleteIncomingItemTask(item); } else { //创建优先级队列项目 QueueItemPriority itemPriority = (incomingPacketSendReceiveOptions.Options.ContainsKey("ReceiveHandlePriority") ? (QueueItemPriority)Enum.Parse(typeof(QueueItemPriority), incomingPacketSendReceiveOptions.Options["ReceiveHandlePriority"]) : QueueItemPriority.Normal); //把数据包包头,和数据包数据 赋值给优先级队列项 PriorityQueueItem item = new PriorityQueueItem(itemPriority, this, topPacketHeader, packetBuilder.ReadDataSection(packetHeaderSize, topPacketHeader.PayloadPacketSize), incomingPacketSendReceiveOptions); //QueueItemPriority.Highest is the only priority that is executed inline#if !WINDOWS_PHONE //如果是最高优先级,当前线程处理 if (itemPriority == QueueItemPriority.Highest) { if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... handling packet type '" + topPacketHeader.PacketType + "' with priority HIGHEST inline. Loop index - " + loopCounter.ToString()); NetworkComms.CompleteIncomingItemTask(item); } else { //不是最高优先级 交给自定义线程池处理 此线程池支持优先级处理 int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + "."); }#else int threadId = NetworkComms.CommsThreadPool.EnqueueItem(item.Priority, NetworkComms.CompleteIncomingItemTask, item); if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace(" ... added completed " + item.PacketHeader.PacketType + " packet to thread pool (Q:" + NetworkComms.CommsThreadPool.QueueCount.ToString() + ", T:" + NetworkComms.CommsThreadPool.CurrentNumTotalThreads.ToString() + ", I:" + NetworkComms.CommsThreadPool.CurrentNumIdleThreads.ToString() + ") with priority " + itemPriority.ToString() + (threadId > 0 ? ". Selected threadId=" + threadId.ToString() : "") + ". Loop index=" + loopCounter.ToString() + ".");#endif } //从PacketBuilder中删除已经处理过的数据 //We clear the bytes we have just handed off if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Trace("Removing " + (packetHeaderSize + topPacketHeader.PayloadPacketSize).ToString() + " bytes from incoming packet buffer from connection with " + ConnectionInfo +"."); packetBuilder.ClearNTopBytes(packetHeaderSize + topPacketHeader.PayloadPacketSize); //Reset the expected bytes to 0 so that the next check starts from scratch packetBuilder.TotalBytesExpected = 0; //If we have run out of data completely then we can return immediately if (packetBuilder.TotalBytesCached == 0) return; } else throw new CommunicationException("This should be impossible!"); } loopCounter++; } } catch (Exception ex) { //Any error, throw an exception. if (NetworkComms.LoggingEnabled) NetworkComms.Logger.Fatal("A fatal exception occured in IncomingPacketHandleHandOff(), connection with " + ConnectionInfo + " be closed. See log file for more information."); NetworkComms.LogError(ex, "CommsError"); CloseConnection(true, 45); } }
在上面的数据处理过程中,不知道您是否注意到一个重要的概念,即“自定义线程池”,
CommsThreadPool此线程池,可以说是Networkcomms通信框架中一颗璀璨的明珠,由他负责处理接收到的所有数据,此线程池支持优先级,即高优先级的数据会被优先处理www.networkcomms.cnwww.cnblogs.com/networkcomms
转载于:https://www.cnblogs.com/Jeely/p/10972299.html
c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据相关推荐
- c#网络通信框架networkcomms内核解析之七 数据包创建器(PacketBuilder)
本文基于networkcomms2.3.1开源版本 gplv3协议 PacketBuilder 数据包创建器,用于辅助创建数据包. 程序把Tcp连接上收到的二进制数据暂时存储在 packetBuil ...
- Spark内核解析之六:Spark 内存管理
前言 在执行Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并 ...
- 蚂蚁集团网络通信框架 SOFABolt 功能介绍及协议框架解析 | 开源
简介:开源网络通信框架 SOFABolt 首次线上直播文字回顾. ,有趣实用的分布式架构频道. 回顾视频以及 PPT 查看地址见文末.欢迎加入直播互动钉钉群 : 30315793,不错过每场直播. 大 ...
- ESFramework网络通信框架 4.0 性能测试
本实验用于测试ESFramework网络通信框架服务端引擎的性能,测试程序使用ESFramework 4.0版本. 一.准备工作 测试的机器总共有3台,都是普通的PC,一台作为服务器,两台作为客户端. ...
- ESFramework网络通信框架介绍之(2)――网络通信消息NetMessage
ESFramework网络通信框架与元数据 较之C++而言,.NET是一个更加"动态"的平台,其动态能力建立在反射机制之上,而反射的基础是"元数据". 上文已经 ...
- ESFramework网络通信框架介绍之(3)――消息处理器和处理器工厂
一.ESFramework网络通信框架与消息处理器 无论是服务端还是客户端,都需要对接收到的消息进行处理,在ESFramework网络通信框架中,处理消息的组件称为消息处理器,所有的消息处理器都实现了 ...
- HP-Socket v5.4.4 发布,高性能跨平台网络通信框架
百度智能云 云生态狂欢季 热门云产品1折起>>> HP-Socket提供以下几类组件,详细内容请参考<HP-Socket网络通信框架开发指南>: Server:基于I ...
- php 框架源码分析,Laravel框架源码解析之模型Model原理与用法解析
本文实例讲述了Laravel框架源码解析之模型Model原理与用法.分享给大家供大家参考,具体如下: 前言 提前预祝猿人们国庆快乐,吃好.喝好.玩好,我会在电视上看着你们. 根据单一责任开发原则来讲, ...
- 05_大数据技术之Spark内核解析(1.1)
1. Spark 内核概述 Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制.Spark任务调度机制.Spark内存管理机制.Spark核心功能的运行原理等,熟练掌握Spa ...
- 【Android源码】源码分析深度好文+精编内核解析分享
阅读Android源码的好处有很多,比如:可以加深我们对系统的了解:可以参考牛人优雅的代码实现:可以从根本上找出一些bug的原因-我们应该庆幸Android是开源的,所有的功能都可以看到实现,所有的b ...
最新文章
- 一个很不错的让ie6的js兼容 ie7,ie8的库
- 更换ubuntu软件源的方法
- 重读《从菜鸟到测试架构师》--黑色的盒子里有什么(中)
- 更新wordpress遇到prepare警告问题的解决
- 使用 session_destroy() 销毁session文件时 报 Trying to destroy uninitialized session 错误解决办法
- 代码投毒、删库跑路,开源生态链安全该如何保证?
- 抛弃扎克伯格!拦不住的 Facebook 离职潮
- 01-HTML基础与进阶-day3-录像236
- c语言推箱子代码_C语言烂大街的东西都学不会!C语言多关卡推箱子制作教程
- java轻量级和重量级_Java 偏向锁、轻量级锁和重量级锁
- 3.3 Zend_Db_Table
- 基于AMPL的tsp旅行商问题
- 微信dat文件用什么软件打开方式_dat文件用什么打开 微信电脑图片dat转为jpg
- 大学生安卓期末设计之本地音乐播放器
- spyder python教程_Spyder
- 什么是网络空间测绘?到底有什么作用?
- 使用Texmacs帮助您写格式规范统一的BLOG
- FPGA Verilog语言常用语法
- 自定义 iPhone 铃声
- 营销邮件这样写 客户打开率会更高
热门文章
- 使用C语言构造一个简单计算器
- 2022年各省高考查分时间、入口汇总
- mysql 富文本 字段,富文本引起MYSQL出错
- NLP入门(七)中文预处理之繁简体转换及获取拼音
- 基于[三星6818]I2C驱动开发的0.96寸oled屏
- 微型计算机常见的输入与输出设备,微型计算机的输入输出设备.doc
- android 指纹是否设置,检查Android是否支持指纹识别以及是否已经录入指纹
- Quartus18.1安装USB Blaster驱动蓝屏
- linux最大限度压缩文件,在Linux系统上使用最高的zip压缩级别
- 此生未完成 --- 于娟