ET服务器框架学习笔记(十)

文章目录

  • ET服务器框架学习笔记(十)
  • 前言
  • 一、ET之Service
    • 1.Service主要数据
    • 2.Service主要逻辑
    • 3.Service其他逻辑
  • 二、ET之NetworkComponent与Session
    • 1.Session
      • Session的主要数据
      • Session的主要逻辑
    • 2.Session的其他方法
      • Session之Send:
      • Session之Call:
      • Session之其他类的Call:
      • Session之Reply:
  • 总结

前言

上篇简单介绍了ET的Channel,这篇主要记录Service。

一、ET之Service

ET的Service,主要是用来管理各个Channel的,这些Channel包括通过service主动发送连接的,也包含Service作为服务器,与他建立连接的Channel。
ET内的Service应该是一个网络组件包含一个,对应了一个网络服务。

1.Service主要数据

  1. Dictionary<long, TChannel> idChannels:字典类,用于管理各个Channel
  2. SocketAsyncEventArgs innArgs:socekt事件的处理类
  3. Socket acceptor:主要用于作为服务器端监听连接使用的socket
  4. RecyclableMemoryStreamManager MemoryStreamManager:管理MemoryStream的类
  5. HashSet needStartSendChannel,一个保存有需要发送数据Channel的id。
  6. int PacketSizeLength:一个消息体大小的头部字节数

2.Service主要逻辑

  1. 构造TService,新建socket,设置socket状态,添加socket事件Completed的监听OnComplete。
  2. 绑定socket端口,设置监听队列最大数量
  3. 开始异步监听连接者,异步的处理放方式,和channel相同,最后都会进入到OnAcceptComplete
  4. OnAcceptComplete内部,通过SocketAsyncEventArgs.AcceptSocket,拿到与TService的socket连接的socket,并实例化一个TChannel,将这个socket传入进去,来建立一个新的TChannel。
  5. 通过OnAccept方法,通知acceptCallback的事件触发,回调到上层逻辑,上层逻辑通过回调获取到TChannel,进行进一步控制(具体需要看NetworkComponent,与session)
  6. 每当这个TService有一个新的连接进来,都会走入OnAcceptComplete进行上面的步骤。

3.Service其他逻辑

其他逻辑有:
1.通过id获取一个channel
2.通过IP地址,或者一个string地址,新建一个连接channel
3.接收Channel发过来的发送需求id,并计入needStartSendChannel中
4.在Update中对needStartSendChannel中的发送需求,找到对应的channel,调用StartSend()触发发送逻辑

二、ET之NetworkComponent与Session

1.Session

每个Session都是对一个连接的封装,他基于channel之上,增加消息发送的额外功能,比如消息的序列化,与反序列化,IResponse类消息如何在处理完消息后立刻发送回来等等。
它是底层channel与实际NetworkComponent之间沟通的桥梁。

Session的主要数据

  • static int RpcId:静态的一个RpcID,用于处理,当发送需要带有返回协议的数据通过Session发出时(实际调用的底层channel进行发送),将这个RpcId自增,并添加到发送的数据中去,当对方将数据发送回来时,通过解析到的RpcId,就能得到相应处理。
  • AChannel channel:实际负责发送接收的底层处理层
  • Dictionary<int, Action> requestCallback,用于存放一个RpcId,对应一个代理的字典
  • readonly byte[] opcodeBytes:专门用来将高层协议号转成字节数组存入MemoryStream中的结构,可以反复使用提升性能。
  • NetworkComponent Network:上层逻辑一个网络组件会有多个Session通道连接(客户端的话一般只有1个)。
  • IPEndPoint RemoteAddress:与他连接的远程地址
  • ChannelType ChannelType:获取的是channel的连接类型,一个是connect,一个是accept,代表一个是以客户端方式发起连接,一个是作为服务端等待连接方式
  • MemoryStream Stream:channel的MemoryStream ,用于发送时,添加自己的内容进去(这里应该是将协议号添加到流里面)

Session的主要逻辑

  • Awake(AChannel aChannel):创建一个Session时,必定会创建一个channel给他,同时在channel.ReadCallback上添加自己的回调方法。这样当底层channel收到数据后,就会发送到session进行处理。
public void Awake(AChannel aChannel){this.channel = aChannel;this.requestCallback.Clear();long id = this.Id;channel.ErrorCallback += (c, e) =>{this.Network.Remove(id); };channel.ReadCallback += this.OnRead;}
  • void Run(MemoryStream memoryStream):主要用来处于底层传过来的数据内容,包括:
private void Run(MemoryStream memoryStream){memoryStream.Seek(Packet.MessageIndex, SeekOrigin.Begin);ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);#if !SERVERif (OpcodeHelper.IsClientHotfixMessage(opcode)){this.GetComponent<SessionCallbackComponent>().MessageCallback.Invoke(this, opcode, memoryStream);return;}
#endifobject message;try{OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();object instance = opcodeTypeComponent.GetInstance(opcode);message = this.Network.MessagePacker.DeserializeFrom(instance, memoryStream);if (OpcodeHelper.IsNeedDebugLogMessage(opcode)){Log.Msg(message);}}catch (Exception e){// 出现任何消息解析异常都要断开Session,防止客户端伪造消息Log.Error($"opcode: {opcode} {this.Network.Count} {e}, ip: {this.RemoteAddress}");this.Error = ErrorCode.ERR_PacketParserError;this.Network.Remove(this.Id);return;}if (!(message is IResponse response)){this.Network.MessageDispatcher.Dispatch(this, opcode, message);return;}Action<IResponse> action;if (!this.requestCallback.TryGetValue(response.RpcId, out action)){throw new Exception($"not found rpc, response message: {StringHelper.MessageToStr(response)}");}this.requestCallback.Remove(response.RpcId);action(response);}
  • OnRead(MemoryStream memoryStream)接收来自底层Channel传来的MemoryStream数据流,调用
    Run进行处理。

  • void Run(MemoryStream memoryStream),真正处理流数据的方法。
    (1).首先将流的读取位置定位到预定的位置:memoryStream.Seek(Packet.MessageIndex, SeekOrigin.Begin)
    (2).从流中读取一个UInt16的值,这个值就是协议号(为什么这个值是协议号,因为在封装数据时也将协议号以UInt16的方式最后封装进去了)
    (3).取得协议号后,拿到OpcodeTypeComponent,调用GetInstance即可拿到对应协议数据类型信息。
    (4).调用Network.MessagePacker.DeserializeFrom,从挂载的类(session的为NetworkComponent组件,)获取MessagePacker类型(这里由于用得是网络的所以是protobuf类)即调用到ProtobufPacker类的DeserializeFrom方法。
    最终调用到ProtobufHelper类的FromBytes方法,该方法实例化一个协议数据类型,然后第三方库的protobuf库MergeFrom方法将流中的数据,解析到实例化的协议数据类型中,然后返回给message
    (5).现在有了已经填充好数据的协议类型数据实例后,通过类型分两步处理。
    (6)如果不是IResponse类型,即对方发送过来的数据,并不是我之前发送出去后,需要等待回复的协议,那么直接走MessageDispatcher的转发出去即可。
    (7),如果是IResponse类型,说明之前这个session有发送过带需要回复的协议,那么通过回来的协议里面的RpcId,在字典requestCallback找到之前发送数据时留下的代理方法,调用代理方法,然后从字典中remove掉即可。

public static object FromBytes(Type type, byte[] bytes, int index, int count){object message = Activator.CreateInstance(type);((Google.Protobuf.IMessage)message).MergeFrom(bytes, index, count);ISupportInitialize iSupportInitialize = message as ISupportInitialize;if (iSupportInitialize == null){return message;}iSupportInitialize.EndInit();return message;}

2.Session的其他方法

上面提到的都是session如何接收数据,session还有一个重要能力就是发送数据出去。下面看看session发送数据相关的方法。和接收数据类型,session发送数据也分为两种,一种是call发送,这种需要带返回数据的,一种是直接send的方式。

Session之Send:

public void Send(IMessage message){OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();ushort opcode = opcodeTypeComponent.GetOpcode(message.GetType());Send(opcode, message);}

(1)获取协议OpcodeTypeComponent组件,通过传入的数据类型,获取对应的协议号,调用Send(ushort opcode, object message)

public void Send(ushort opcode, object message){if (this.IsDisposed){throw new Exception("session已经被Dispose了");}if (OpcodeHelper.IsNeedDebugLogMessage(opcode) ){#if !SERVERif (OpcodeHelper.IsClientHotfixMessage(opcode)){}else
#endif{Log.Msg(message);}}MemoryStream stream = this.Stream;stream.Seek(Packet.MessageIndex, SeekOrigin.Begin);stream.SetLength(Packet.MessageIndex);this.Network.MessagePacker.SerializeTo(message, stream);stream.Seek(0, SeekOrigin.Begin);opcodeBytes.WriteTo(0, opcode);Array.Copy(opcodeBytes, 0, stream.GetBuffer(), 0, opcodeBytes.Length);#if SERVER// 如果是allserver,内部消息不走网络,直接转给session,方便调试时看到整体堆栈if (this.Network.AppType == AppType.AllServer){Session session = this.Network.Entity.GetComponent<NetInnerComponent>().Get(this.RemoteAddress);session.Run(stream);return;}
#endifthis.Send(stream);}

(2)验证本session是否已经释放,如果释放了就不发送,
(3)查看是否需要打印协议,如果是服务器则打印协议内容,方便定位问题
(4)获取session内置的MemoryStream,设置操作的的位置,留出两个字节的位置,用于存放协议号,设置流的长度。
(5)通过MessagePacker.SerializeTo方法将message写入流,实际还是会调用到ProtobufHelper的ToStream方法,这里就不详细列举出来了。
(6)设置流操作位置为0,然后将协议号以二进制方式写入流。首先通过opcodeBytes类写入session的opcodeBytes中,使用Array.Copy方法,将opcodeBytes数据写入到真正的流中。
(7)最终通过session关联的底层channel,调用Send将其数据发送出去。
(8)这里有个优化,即当服务器走的是单一全局服务器,然后又是内网消息的话,所有session都在NetInnerComponent组件进行管理,所以可以直接通过远程地址获取一个session,然后调用他的Run就可以了,而不用通过网络方式,即己方session->己方channel->己方socket->网卡->对方socket->对方channel->对方session的方式。

Session之Call:

Session的Call写了几个重载的方法,大体流程差不多,之类选public ETTask Call(IRequest request)方法来讲解。

public ETTask<IResponse> Call(IRequest request){int rpcId = ++RpcId;var tcs = new ETTaskCompletionSource<IResponse>();this.requestCallback[rpcId] = (response) =>{if (ErrorCode.IsRpcNeedThrowException(response.Error)){tcs.SetException(new Exception($"Rpc Error: {request.GetType().FullName} {response.Error}"));return;}tcs.SetResult(response);};request.RpcId = rpcId;this.Send(request);return tcs.Task;}

(1)首先应该看到的是这个方法是一个异步方法,即调用他的地方,可以用await方式以同步方式编写代码,内部返回一个TTaskCompletionSource()的Task.
(2)设置rpcId值,通过Session的静态属性RpcId++获得,这样所有通过session发送的协议都有一个唯一的标识了。
(3)由于是期望带返回的协议,所以使用lambda表达式向requestCallback字典类里面存放一个回调函数。这样当收到对方发来的数据时,设置tcs.SetResult(response),即可唤醒在调用这个call的异步方法重新继续执行下去,通过rpcId标识这个回调方法。
(4)将rpcId赋值给发送的协议RpcId,这样发送的协议内部包含了这个唯一RpcId,接收数据的session在回复协议时,也会将这个RpcId赋值到IResponse类数据中,这样受到IResponse后session能够通道这个RpcId找到回调函数,调用tcs.SetResult通知之前的调用方法。
(5)最终也是通过session的Send,将数据发送出去

Session之其他类的Call:

  • public ETTask CallWithoutException(IRequest request, CancellationToken cancellationToken):带取消回调功能的Call,即当调用这个方法发送数据后,再未接受到返回的协议时(接收到再取消没啥意义),可以通过cancellationToken.cancel函数,取消掉异步回调:cancellationToken.Register(() => this.requestCallback.Remove(rpcId));貌似这个函数本身的意图是不捕捉异常的,没看到调用的地方,可能用处不大。
  • public ETTask Call(IRequest request, CancellationToken cancellationToken):貌似与上面那个是一样的,可能处于某种原因改了上面的函数。

Session之Reply:

可以通过这个方法,快速发送回复协议出去,内部就是判断一下session是否释放,然后调用send。这个方法的封装,用来处理数据的时候,直接reply比较方便,同时对代码阅读方面也有好处。

总结

本篇主要记录了Session处理发送与接收数据的方法,核心要注意的就是一个session对应一个连接,他起到对下层channel的调用,对上层NetworkComponent提供接口方法,处于一个通信中间层的关系。
而sevice则是对channel的具体管理,一个service管理多个channel,同时自身是一个以监听方式作为socket服务端使用的。
真正对sevice以及session使用的类是NetworkComponent,NetworkComponent是一个上层业务逻辑,针对NetworkComponent准备下篇再进行记录。

ET服务器框架学习笔记(十)相关推荐

  1. ET6.0服务器框架学习笔记(二、一条登录协议)

    ET6.0服务器框架学习笔记(二.一条登录协议) 上一篇主要记录ET6.0的服务器启动功能,本篇主要记录ET6.0完整的一条协议,从配置到生成协议数据,到从客户端发送给服务端,再发送回客户端的流程 文 ...

  2. ET服务器框架学习笔记

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.ET服务器怎么看? 二.APP部分 1.program代码 2.game 3.回到program 总结 前言 今天 ...

  3. Python语言入门这一篇就够了-学习笔记(十二万字)

    Python语言入门这一篇就够了-学习笔记(十二万字) 友情提示:先关注收藏,再查看,12万字保姆级 Python语言从入门到精通教程. 文章目录 Python语言入门这一篇就够了-学习笔记(十二万字 ...

  4. python表单提交的两种方式_Flask框架学习笔记之表单基础介绍与表单提交方式

    本文实例讲述了Flask框架学习笔记之表单基础介绍与表单提交方式.分享给大家供大家参考,具体如下: 表单介绍 表单是HTML页面中负责数据采集功能的部件.由表单标签,表单域和表单按钮组成.通过表单,将 ...

  5. PyTorch框架学习二十——模型微调(Finetune)

    PyTorch框架学习二十--模型微调(Finetune) 一.Transfer Learning:迁移学习 二.Model Finetune:模型的迁移学习 三.看个例子:用ResNet18预训练模 ...

  6. mysql 临时表 事务_MySQL学习笔记十:游标/动态SQL/临时表/事务

    逆天十三少 发表于:2020-11-12 08:12 阅读: 90次 这篇教程主要讲解了MySQL学习笔记十:游标/动态SQL/临时表/事务,并附有相关的代码样列,我觉得非常有帮助,现在分享出来大家一 ...

  7. Samza框架-----学习笔记

    Samza框架-----学习笔记 基本概念: 作业:是对一组输入流进行处理转化成输出流的程序. 分区: Samza的流数据单位既不是Storm中的元组,也不是Spark Streaming中的DStr ...

  8. Spring框架学习笔记(三)(AOP,事务管理)

    Spring框架学习笔记(三) 九.AOP 9.1 AOP的注解配置 (1) 新建计算器核心功能(模拟:不能在改动核心代码) (2) 建立一个普通的Java类写增强代码(面向切面编程),使用Sprin ...

  9. php 里的cl框架手册,CI框架学习笔记(一)

    本文是CI框架学习笔记的第一篇,主要介绍了CI框架的环境安装,基本术语以及框架流程,非常的详细,有需要的朋友可以参考下 最开始使用CI框架的时候,就打算写一个CI源码阅读的笔记系列,可惜虎头蛇尾,一直 ...

最新文章

  1. 得到windows系统图标的解决方案(转)
  2. SFB 项目经验-57-Skype for business-录音系统-你拥有吗(模拟线路)
  3. Linux三大主流网站构建平台,Linux快速构建LAMP网站平台
  4. Zigbee中添加用户任务
  5. MyBaties异常之 ORA-00918: 未明确定义列
  6. 怎么样才能写出出色的代码
  7. 2017-2018-1 20155222 201552228 实验五 通讯协议设计
  8. 51 SD配置-定价配置-维护定价过程
  9. 一张图带你了解python
  10. 使用XStream注解处理复杂xml的属性及数据集合(xml转对象)
  11. SparkStreaming之transform
  12. php读取大文件某行内容,PHP读取和修改大文件的某行内容_PHP教程
  13. Python——装饰器的学习笔记(legacy)
  14. .NET Mail : 注意Win 7 不再包含SMTP服务
  15. UTM投影带号计算以及投影具体操作
  16. apk java反编译_【Apk反编译】如何反编译Apk得到Java源代码
  17. 雷电4.0 Fiddler https抓包详解(绝对可行)
  18. ENVI5.3.1使用Landsat 8影像进行监督分类实例操作
  19. 计算机造句英语,电子计算机造句_造句大全
  20. 电脑小技巧:关于修复只能看无法拖…

热门文章

  1. redis如何使用命令清空所有key
  2. DPDK 中断机制 eal_intr_handle_interrupts
  3. Paper阅读:VOLDOR: Visual Odometry from Log-logistic Dense Optical flow Residuals
  4. keil编译栏只有仿真才会出现 Keil工具栏不见了 Keil工具栏消失
  5. 机房以太网温湿度传感器和485信号温湿度传感器的区别分析
  6. ORA-32036: 不支持 WITH 子句中串联式查询名的形式 后台报错问题
  7. android开发脚本之几个常用脚本sh
  8. android 破碎酷炫动画,Android特效——玻璃破碎效果
  9. 关于粒子群惯性权重的描述
  10. Exynos 4412的启动过程分析[2]