前面已经介绍了ITcp接口,而AgileTcp就是ESFramework给出的ITcp的参考实现。在之前,我曾经讲解过模拟完成端口的Tcp组件实现和异步Tcp组件实现,在它们的基础之上,我更改了处理策略,而形成了AgileTcp,目的是更清晰的结构、更高的效率。这个策略会在后面讲到。

Tcp组件主要控制着系统与终端用户的所有消息的进出,ITcp接口描述了这个组件的外貌,告诉外部如何使用Tcp组件、如何与Tcp组件交互。而从实现的角度来看,我们必须理清Tcp组件的职责:
(1) 管理所有已经建立的Tcp连接
(2) 管理与每个连接相对应接收缓冲区
(3) 管理所有的工作者线程
(4) 处理长度大于接收缓冲区的消息

我们来看看如何满足这些职责。
    由于每个连接都对应着一个接收缓冲区,所以可以将它们封装在一起形成ContextKey(连接上下文):
   

ContextKey
    public class ContextKey
    {        
        private byte[]  buffer ;          //封装接收缓冲区
        private ISafeNetworkStream netStream = null ;            
        private volatile bool      isDataManaging = false ;
        
        public ContextKey(ISafeNetworkStream net_Stream ,int buffSize)
        {
            this.netStream = net_Stream ;            
            this.buffer    = new byte[buffSize] ;            
        }

#region NetStream  
        public ISafeNetworkStream NetStream
        {
            get
            {
                return this.netStream ;
            }
        }

public byte[] Buffer
        {
            get
            {
                return this.buffer ;
            }            
        }

public bool IsDataManaging
        {
            get
            {
                return this.isDataManaging ;
            }
            set
            {
                this.isDataManaging = value ;
            }
        }

private bool firstMessageExist = false ;
        public  bool FirstMessageExist 
        {
            get
            {
                return this.firstMessageExist ;
            }
            set
            {
                this.firstMessageExist = value ;
            }
        }
        #endregion            
    }

ContextKey中封装的是ISafeNetworkStream而不是NetworkStream,原因可参见这里。
    IsDataManaging属性表明工作线程是否正在处理本连接对应的缓冲区中的数据,FirstMessageExist属性用于标志接收到的第一条消息,因为系统可能需要对接收到的第一条消息做一些特殊的处理。

任何时刻,可能都有成千上万个连接存活着;任何时刻,都可能有新的连接建立、或已有的连接被释放。所有这些ContextKey对象需要被管理起来,这就是上下文管理器IContextKeyManager:

    public interface IContextKeyManager
    {
        void InsertContextKey(ContextKey context_key) ;
        void DisposeAllContextKey() ;        
        void RemoveContextKey(int streamHashCode) ;
        ISafeNetworkStream GetNetStream(int streamHashCode) ;

int            ConnectionCount {get ;}
        ICollection ContextKeyList{get ;}

event CbSimpleInt StreamCountChanged ;        
    }

说到上下文管理器,先要讲讲如何标志每个不同的上下文了?使用连接字,连接字是Tcp连接的Hashcode,它们与连接一一对应,并且不会重复。所以在源码中你经常会看到名为“streamHashCode”的参数和变量。由于Tcp连接与streamHashCode一一对应,所以GetNetStream方法的实现就非常简单。不知道你是否记得,RoundedMessage类中有个ConnectID字段,它就是连接字,与streamHashCode意义一样。根据此字段,你可以清楚的知道这个RoundedMessage来自于哪个连接。

关于工作者线程,很幸运的是,我们可以直接使用.NET提供的后台线程池,而不必要再去手动管理,这可以省却一些麻烦。当然你也可以使用ThreadPool类,甚至你可以从头开始实现自己的线程池组件,这也是不困难的。
    
    我经常被问到,接收缓冲区应该开辟多大?这取决于你的应用,但是有一点是错不了的――缓冲区的大小至少要大于消息头Header的大小,否则麻烦就多了。根据我的经验,一般缓冲区的大小至少应该能容纳所有接收消息中的60%-80%。对于大于缓冲区大小的消息,ESFramework采用的策略是使用缓冲区池IBufferPool。

    public interface IBufferPool
    {
        byte[] RentBuffer(int minSize) ;
        void   GivebackBuffer(byte[] buffer) ;
    }

通过上面的介绍我们已经知道如何满足Tcp组件的职责,现在我们来看看更细的实现策略:
(1) 使用Checker线程。
    使用Checker线程是AgileTcp组件的区别于模拟完成端口的Tcp组件实现和异步Tcp组件的主要特色。当AgileTcp启动时,Checker线程也随之启动,这个线程的主要工作就是检查已经存在的每个连接上是否有数据要接收(还记得Select网络模型),这可以通过NetworkStream.DataAvailable属性知道。如果发现某个连接上有待接收的数据,就将其放到工作者线程中去处理,并设置前面提到的ContextKey.IsDataManaging属性,然后再判断下个连接,如此循环下去。

        private void TaskChecker()
        {
            while(! this.stop)
            {
                foreach(ContextKey key in this.contextKeyManager.ContextKeyList)
                {
                    if(this.stop)
                    {
                        break ;
                    }

if((! key.IsDataManaging) && key.NetStream.DataAvailable)
                    {                        
                        key.IsDataManaging = true ;    
                        CbContextKey cb = new CbContextKey(this.DataManaging) ;
                        cb.BeginInvoke(key ,null ,null ) ;
                    }                    
                }

System.Threading.Thread.Sleep(50) ;
            }
        }

(2) 将消息头的解析置于Tcp组件之中
    将消息头解析置于Tcp组件之中这个方案我层考虑了非常久,原因是,这会破坏Tcp组件的单纯性,使得Tcp组件与协议(Contract)有所关联。最终采用这个策略的第一个理由是清晰,第二个理由是效率。清晰在于简化了ContextKey结构,避免了使用消息分裂器这样复杂的算法组件(如果大家看过我以前关于通信方案的文章,一定会得到这样的答案)。效率在于,当在此解析了Header之后,后面所有的处理器都可以使用这个Header对象了,而不用在自己去解析。这也是NetMessage类中有个Header字段的原因。

(3) 针对于某个连接,只有当上一个消息处理完并将回复发送后(如果有回复的话),才会去接收下一个消息。
    这个策略会使很多事情变得简单,而且不会影响任何有用的特性。由于不会在处理消息的时候去接收下一个消息,所以可以直接处理接收缓冲区中的数据,而不需要将数据从接收缓冲区拷贝到另外的地方去处理。这又对效率提高有所帮助。

综上所述,我们可以总结工作者线程要做的事情:首先,从连接上接收MessageHeaderSize个字节,解析成Header,然后在接收Header. MessageBodyLength个字节,即是Body,接着构造成RoundedMessage对象交给消息分配器去处理,最后将得到的处理结果发送出去。代码如下所示:

DataManaging
        private void DataManaging(ContextKey key)
        {    
            int streamHashCode = key.NetStream.GetHashCode() ;    
            int headerLen = this.contractHelper.MessageHeaderLength ;
            
            while((key.NetStream.DataAvailable) && (! this.stop))
            {
                byte[] rentBuff = null ;//每次分派的消息中,最多有一个rentBuff

try
                {
                    #region 构造 RoundedMessage
                    NetHelper.RecieveData(key.NetStream ,key.Buffer ,0 ,headerLen) ;
                    IMessageHeader header = this.contractHelper.ParseMessageHeader(key.Buffer ,0) ;    
                    if(! this.contractHelper.ValidateMessageToken(header))
                    {
                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageTokenInvalid) ;
                        return ;
                    }

RoundedMessage requestMsg = new RoundedMessage() ;
                    requestMsg.ConnectID      = streamHashCode ;
                    requestMsg.Header         = header ;
                    
                    if(! key.FirstMessageExist)
                    {
                        requestMsg.IsFirstMessage = true ;
                        key.FirstMessageExist     = true ;
                    }

if((headerLen + header.MessageBodyLength) > this.maxMessageSize)
                    {
                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageSizeOverflow) ;
                        return ;
                    }
                
                    if(header.MessageBodyLength >0 )
                    {
                        if((header.MessageBodyLength + headerLen) <= this.recieveBuffSize)
                        {
                            NetHelper.RecieveData(key.NetStream ,key.Buffer ,0 ,header.MessageBodyLength) ;
                            requestMsg.Body = key.Buffer ;                            
                        }
                        else
                        {                        
                            rentBuff = this.bufferPool.RentBuffer(header.MessageBodyLength) ;

NetHelper.RecieveData(key.NetStream ,rentBuff ,0 ,header.MessageBodyLength) ;
                            requestMsg.Body = rentBuff ;                            
                        }
                    }
                    #endregion                    
                
                    bool closeConnection = false ;
                    NetMessage resMsg = this.tcpStreamDispatcher.DealRequestData(requestMsg ,ref closeConnection) ;

if(rentBuff != null)
                    {
                        this.bufferPool.GivebackBuffer(rentBuff) ;
                    }

if(closeConnection)
                    {
                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.OtherCause) ;
                        return ;
                    }

if((resMsg != null) &&(! this.stop))
                    {                    
                        byte[] bRes = resMsg.ToStream() ;
                        key.NetStream.Write(bRes ,0 ,bRes.Length) ;

if(this.ServiceCommitted != null)
                        {                                
                            this.ServiceCommitted(streamHashCode ,resMsg) ;
                        }
                    }
                }
                catch(Exception ee)
                {
                    if(ee is System.IO.IOException) //正在读写流的时候,连接断开
                    {
                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.NetworkError) ;
                        break ;
                    }
                    else
                    {
                        this.esbLogger.Log(ee.Message ,"ESFramework.Network.Tcp.AgileTcp" ,ErrorLevel.Standard) ;
                    }

ee = ee ;                    
                }

}

key.IsDataManaging = false ;
        }

AgileTcp组件的主要原理差不多就这些了,这种实现有个缺点,不知大家发现没有。那就是当客户端主动断开连接或掉线时,AgileTcp组件可能感受不到(除非对应的连接上正在发送或接收数据,此时会抛出异常),因为当连接断开时,key.NetStream.DataAvailable不会抛出异常,而是仍然返回false。这是个问题,幸好有补救的办法,一是要求客户端下线的时候给服务器发送Logoff消息,二是使用定时掉线检查器(IUserOnLineChecker)。当服务器检查或发现某用户下线时,即可调用ITcpClientsController.DisposeOneConnection方法来释放对应的连接和Context。(你应该还记得ITcp接口是从ITcpClientsController继承的)。关于这个问题,你有更好的解决办法吗?
    感谢关注!

上一篇文章:ESFramework介绍之(21)-- Tcp组件接口ITcp介绍

转到  :ESFramework 可复用的通信框架(序)

转载于:https://www.cnblogs.com/zhuweisky/archive/2006/04/13/374025.html

ESFramework介绍之(23)―― AgileTcp相关推荐

  1. ESFramework介绍之(28)―― Udp组件

        ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍 和 ESFramework介绍之(23)―― AgileT ...

  2. ESFramework介绍之(16)―― Tcp数据自动发送器ITcpAutoSender

    我们已经积累了这样的经验:如果有一个大块的数据需要通过Tcp发送,我们会采用异步的方式以避免当前工作线程阻塞.那么,如果我们有多个线程需要同时发送大块的数据了?每个线程都在NetworkStream或 ...

  3. ESFramework介绍之(31)―― 消息分类及对应的处理器

        这是一篇迟到了很久的文章,要不是今天看到Mediar朋友写的"基于ESFramewok的 客户端和客户端通迅"的文章,我也许还不会想起写这篇应该很早就发表的Blog,它可以 ...

  4. ESFramework介绍之(30)―― 消息侦察者 INetMessageSpy

        (本文适用于ESFramework V0.2+) 现在我们回想一下,当网络组件(Tcp/Udp组件)接收到一个消息后,这个消息会流经哪些组件,然后再通过网络组件发送出去了.如果你研究过ESFr ...

  5. ESFramework介绍之(14)-- AS与FS通信方案

    前面我们已经多次提到,每个AS都有一组FS为之服务(回顾),AS将接收到的功能请求通过Tcp连接池 或Remoting转发给某个FS处理.下面我们将深入讨论AS和FS之间的通信机制. 首先要解决第一个 ...

  6. ESFramework介绍之(34)―― ITcpServerAgent和IUdpServerAgent组件关系图

        (本文适用于ESFramework V0.3+) 在ESFramework介绍之(7)-- 服务器代理IServerAgent(2006.06.06修正) 的介绍中,我们已经认识了IServe ...

  7. ESFramework介绍之(21)-- Tcp组件接口ITcp介绍

            写了这么多篇介绍ESFramework的文章才想起来还有一些很基础的内容没有介绍,前面介绍的一些组件.框架基本上是与协议无关的(比如无论是Tcp还是Udp甚至是Remoting.Web ...

  8. 麒麟Linux强制修改只读文件,中标麒麟Linux系统文本和文件处理介绍(23页)-原创力文档...

    中标麒麟Linux服务器操作系统培训系列 中标麒麟Linux系统文本和文件处理介绍 技术创新,变革未来 本章目标 •使用工具来抽取.分析和处理文本数据 •使用文件查询命令 查看文件内容 •cat ∶ ...

  9. ESFramework网络通信框架介绍之(3)――消息处理器和处理器工厂

    一.ESFramework网络通信框架与消息处理器 无论是服务端还是客户端,都需要对接收到的消息进行处理,在ESFramework网络通信框架中,处理消息的组件称为消息处理器,所有的消息处理器都实现了 ...

最新文章

  1. LeetCode-位运算-36. 只出现一次的数字
  2. 模块隐藏(LDR_MODULE链 与 PE特征)
  3. wordpress 主机伪静态404.php seo,wordpress开启伪静态之后,出现404是什么原因?
  4. Spring中调用远程EJB的配置
  5. 如何使用Java keytool命令行创建一个自签名证书
  6. 14.JAVA整型变量
  7. 人生几张纸,看透一辈子
  8. 计算机二级考vb试题,2017计算机二级考试VB练习题及答案
  9. rose服务器系统,ROSE HA 服务器集群软件
  10. 计算机科学与技术与微电子科学与工程,微电子科学与工程专业就业前景如何 有前途吗...
  11. 新网域名转出及阿里云域名转入
  12. 如何让语音芯片与功放芯片之间更好地配合让音效更好
  13. Foreign Language_english_补语
  14. 【读书笔记】之【瞬变】
  15. 思科模拟器启用CHAP协议
  16. 邮箱客户端 gmail支持_如何联系Gmail支持
  17. win10网络计算机打不开,win10网络和internet设置打不开怎么办_win10网络设置打不开无法打开的解决方法...
  18. 三菱fx2n64mr说明书_FX2N-64MR-001手册三菱FX2N-64MR-001使用说明书 - 广州凌控
  19. 1.第一节课,从头开始学C语言
  20. 25套五彩缤纷的矢量网页背景图片素材【免费下载】

热门文章

  1. C语言代码规范(十)花里胡哨代码鉴赏
  2. 数字图像处理小练习存档1
  3. Python字符串格式:%vs.format
  4. Ubuntu的中文是哪种字体?python的词云分析和 三国演义人物出场统计
  5. C语言随机数生成超详解
  6. visual studio 2015 配置好qt5后, 第一次运行出现 无法打开源文件“QtWidgets/QApplication”和无法运行rc.exe的解决方案
  7. 【c语言】棋盘游戏--三子棋
  8. 解决iex -S mix报错
  9. C++ STL之Set
  10. 【C++基础】常见面试问题(二)