1.类

(1)socket IO操作内存管理类 BufferManager

    // This class creates a single large buffer which can be divided up // and assigned to SocketAsyncEventArgs objects for use with each // socket I/O operation.  // This enables bufffers to be easily reused and guards against // fragmenting heap memory.// // The operations exposed on the BufferManager class are not thread safe.public class BufferManager{//buffer缓冲区大小private int m_numBytes;//缓冲区private byte[] m_buffer;private Stack<int> m_freeIndexPool;private int m_currentIndex;private int m_bufferSize;public BufferManager(int totalBytes, int bufferSize){m_numBytes = totalBytes;m_currentIndex = 0;m_bufferSize = bufferSize;m_freeIndexPool = new Stack<int>();}/// <summary>/// 给buffer分配缓冲区/// </summary>public void InitBuffer(){m_buffer = new byte[m_numBytes];}/// <summary>///  将buffer添加到args的IO缓冲区中,并设置offset/// </summary>/// <param name="args"></param>/// <returns></returns>public bool SetBuffer(SocketAsyncEventArgs args){if (m_freeIndexPool.Count > 0){args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);}else{if ((m_numBytes - m_bufferSize) < m_currentIndex){return false;}args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);m_currentIndex += m_bufferSize;}return true;}/// <summary>/// 将buffer从args的IO缓冲区中释放/// </summary>/// <param name="args"></param>public void FreeBuffer(SocketAsyncEventArgs args){m_freeIndexPool.Push(args.Offset);args.SetBuffer(null, 0, 0);}/// <summary>/// 释放全部buffer缓存/// </summary>public void FreeAllBuffer(){m_freeIndexPool.Clear();m_currentIndex = 0;m_buffer = null;}}

  

(2)SocketAsyncEventArgsPool

    // Represents a collection of reusable SocketAsyncEventArgs objects.  public class SocketAsyncEventArgsPool{private Stack<SocketAsyncEventArgs> m_pool;// Initializes the object pool to the specified size//// The "capacity" parameter is the maximum number of // SocketAsyncEventArgs objects the pool can holdpublic SocketAsyncEventArgsPool(int capacity){m_pool = new Stack<SocketAsyncEventArgs>(capacity);}// Add a SocketAsyncEventArg instance to the pool////The "item" parameter is the SocketAsyncEventArgs instance // to add to the poolpublic void Push(SocketAsyncEventArgs item){if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }lock (m_pool){m_pool.Push(item);}}// Removes a SocketAsyncEventArgs instance from the pool// and returns the object removed from the poolpublic SocketAsyncEventArgs Pop(){lock (m_pool){return m_pool.Pop();}}/// <summary>/// 清空栈中元素/// </summary>public void Clear(){lock (m_pool){m_pool.Clear();}}// The number of SocketAsyncEventArgs instances in the poolpublic int Count{get { return m_pool.Count; }}}

  

(3)AsyncUserToken

    public class AsyncUserToken{private Socket socket = null;public Socket Socket { get => socket; set => socket = value; }}

  

(4)服务器端操作类TcpServiceSocketAsync

    public class TcpServiceSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//监听socketprivate Socket listenSocket = null;//允许连接到tcp服务器的tcp客户端数量private int numConnections = 0;//用于socket发送和接受的缓存区大小private int bufferSize = 0;//socket缓冲区管理对象private BufferManager bufferManager = null;//SocketAsyncEventArgs池private SocketAsyncEventArgsPool socketAsyncEventArgsPool = null;//当前连接的tcp客户端数量private int numberAcceptedClients = 0;//控制tcp客户端连接数量的信号量private Semaphore maxNumberAcceptedClients = null;//用于socket发送数据的SocketAsyncEventArgs集合private List<SocketAsyncEventArgs> sendAsyncEventArgs = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="numConnections">允许连接到tcp服务器的tcp客户端数量</param>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpServiceSocketAsync(int numConnections, int bufferSize){if (numConnections <= 0 || numConnections > int.MaxValue)throw new ArgumentOutOfRangeException("_numConnections is out of range");if (bufferSize <= 0 || bufferSize > int.MaxValue)throw new ArgumentOutOfRangeException("_receiveBufferSize is out of range");this.numConnections = numConnections;this.bufferSize = bufferSize;bufferManager = new BufferManager(numConnections * bufferSize * 2, bufferSize);socketAsyncEventArgsPool = new SocketAsyncEventArgsPool(numConnections);maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);sendAsyncEventArgs = new List<SocketAsyncEventArgs>();}/// <summary>/// 初始化数据(bufferManager,socketAsyncEventArgsPool)/// </summary>public void Init(){numberAcceptedClients = 0;bufferManager.InitBuffer();SocketAsyncEventArgs readWriteEventArg;for (int i = 0; i < numConnections * 2; i++){readWriteEventArg = new SocketAsyncEventArgs();readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);readWriteEventArg.UserToken = new AsyncUserToken();bufferManager.SetBuffer(readWriteEventArg);socketAsyncEventArgsPool.Push(readWriteEventArg);}}/// <summary>///  开启tcp服务器,等待tcp客户端连接/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);listenSocket.Bind(endpoint);//绑定地址listenSocket.Listen(int.MaxValue);//开始监听StartAccept(null);}catch (Exception ex){throw ex;}}/// <summary>/// 关闭tcp服务器/// </summary>public void CloseSocket(){if (listenSocket == null)return;try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Shutdown(SocketShutdown.Both);}listenSocket.Shutdown(SocketShutdown.Both);}catch { }try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Close();}listenSocket.Close();}catch { }try{foreach (var e in sendAsyncEventArgs){e.Dispose();}}catch { }sendAsyncEventArgs.Clear();socketAsyncEventArgsPool.Clear();bufferManager.FreeAllBuffer();maxNumberAcceptedClients.Release(numberAcceptedClients);}/// <summary>/// 重新启动tcp服务器/// </summary>public void Restart(){CloseSocket();Init();Start(ip, port);}/// <summary>/// 开始等待tcp客户端连接/// </summary>/// <param name="acceptEventArg"></param>private void StartAccept(SocketAsyncEventArgs acceptEventArg){if (acceptEventArg == null){acceptEventArg = new SocketAsyncEventArgs();acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);}else{// socket must be cleared since the context object is being reusedacceptEventArg.AcceptSocket = null;}maxNumberAcceptedClients.WaitOne();bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);if (!willRaiseEvent){ProcessAccept(acceptEventArg);}}/// <summary>/// Socket.AcceptAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e){ProcessAccept(e);}/// <summary>/// 接受到tcp客户端连接,进行处理/// </summary>/// <param name="e"></param>private void ProcessAccept(SocketAsyncEventArgs e){Interlocked.Increment(ref numberAcceptedClients);//设置用于接收的SocketAsyncEventArgs的socket,可以接受数据SocketAsyncEventArgs recvEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)recvEventArgs.UserToken).Socket = e.AcceptSocket;//设置用于发送的SocketAsyncEventArgs的socket,可以发送数据SocketAsyncEventArgs sendEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)sendEventArgs.UserToken).Socket = e.AcceptSocket;sendAsyncEventArgs.Add(sendEventArgs);StartRecv(recvEventArgs);StartAccept(e);}/// <summary>/// tcp服务器开始接受tcp客户端发送的数据/// </summary>private void StartRecv(SocketAsyncEventArgs e){bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.ReceiveAsync(e);if (!willRaiseEvent){ProcessReceive(e);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp客户端数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred));StartRecv(e);}else{CloseClientSocket(e);}}/// <summary>/// 处理tcp服务器发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);CloseClientSocket(e);}}/// <summary>/// 关闭一个与tcp客户端连接的socket/// </summary>/// <param name="e"></param>private void CloseClientSocket(SocketAsyncEventArgs e){AsyncUserToken token = e.UserToken as AsyncUserToken;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放token.Socket.Shutdown(SocketShutdown.Both);}catch { }token.Socket.Close();Interlocked.Decrement(ref numberAcceptedClients);socketAsyncEventArgsPool.Push(e);maxNumberAcceptedClients.Release();if (e.LastOperation == SocketAsyncOperation.Send)sendAsyncEventArgs.Remove(e);}/// <summary>/// 给全部tcp客户端发送数据/// </summary>/// <param name="message"></param>public void SendMessageToAllClients(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");foreach (var e in sendAsyncEventArgs){byte[] buff = Encoding.UTF8.GetBytes(message);if (buff.Length > bufferSize)throw new ArgumentOutOfRangeException("message is out off range");buff.CopyTo(e.Buffer, e.Offset);e.SetBuffer(e.Offset, buff.Length);bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.SendAsync(e);if (!willRaiseEvent){ProcessSend(e);}}}}

    

(5)客户端操作类TcpClientSocketAsync

    public class TcpClientSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//接受缓存数组private byte[] recvBuff = null;//发送缓存数组private byte[] sendBuff = null;//连接socketprivate Socket connectSocket = null;//用于发送数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs sendEventArg = null;//用于接收数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs recvEventArg = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpClientSocketAsync(int bufferSize){//设置用于发送数据的SocketAsyncEventArgssendBuff = new byte[bufferSize];sendEventArg = new SocketAsyncEventArgs();sendEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);sendEventArg.SetBuffer(sendBuff, 0, bufferSize);//设置用于接受数据的SocketAsyncEventArgsrecvBuff = new byte[bufferSize];recvEventArg = new SocketAsyncEventArgs();recvEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);recvEventArg.SetBuffer(recvBuff, 0, bufferSize);}/// <summary>/// 开启tcp客户端,连接tcp服务器/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{connectSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);//连接tcp服务器SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();connectEventArg.Completed += ConnectEventArgs_Completed;connectEventArg.RemoteEndPoint = endpoint;//设置要连接的tcp服务器地址bool willRaiseEvent = connectSocket.ConnectAsync(connectEventArg);if (!willRaiseEvent)ProcessConnect(connectEventArg);}catch (Exception ex){throw ex;}}/// <summary>/// 发送数据到tcp服务器/// </summary>/// <param name="message">要发送的数据</param>public void SendMessage(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");if (connectSocket == null)throw new Exception("socket cannot be null");byte[] buff = Encoding.UTF8.GetBytes(message);buff.CopyTo(sendBuff, 0);sendEventArg.SetBuffer(0, buff.Length);bool willRaiseEvent = connectSocket.SendAsync(sendEventArg);if (!willRaiseEvent){ProcessSend(sendEventArg);}}/// <summary>/// 关闭tcp客户端/// </summary>public void CloseSocket(){if (connectSocket == null)return;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放connectSocket.Shutdown(SocketShutdown.Both);}catch { }try{connectSocket.Close();}catch { }}/// <summary>/// 重启tcp客户端,重新连接tcp服务器/// </summary>public void Restart(){CloseSocket();Start(ip, port);}/// <summary>/// Socket.ConnectAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e){ProcessConnect(e);}private void ProcessConnect(SocketAsyncEventArgs e){StartRecv();}/// <summary>/// tcp客户端开始接受tcp服务器发送的数据/// </summary>private void StartRecv(){bool willRaiseEvent = connectSocket.ReceiveAsync(recvEventArg);if (!willRaiseEvent){ProcessReceive(recvEventArg);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp服务器数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度,否则整个bugger都要转成stringrecvMessageEvent(Encoding.UTF8.GetString(e.Buffer, 0, e.BytesTransferred));StartRecv();}else{Restart();}}/// <summary>/// 处理tcp客户端发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(-1);Restart();}}}

   

2.使用

(1)服务器端

    public partial class Form1 : Form{TcpServiceSocketAsync tcpServiceSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;public Form1(){InitializeComponent();tcpServiceSocket = new TcpServiceSocketAsync(10, 1024);tcpServiceSocket.recvMessageEvent += new Action<string>(Recv);tcpServiceSocket.Init();}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpServiceSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpServiceSocket.SendMessageToAllClients(message);tbSend.Text = "";}}

  

(2)客户端

    public partial class Form1 : Form{private TcpClientSocketAsync tcpClientSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;private readonly int buffSize = 1024;public Form1(){InitializeComponent();tcpClientSocket = new TcpClientSocketAsync(buffSize);tcpClientSocket.recvMessageEvent += new Action<string>(Recv);}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpClientSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpClientSocket.SendMessage(message);tbSend.Text = "";}}

   

3.演示

参考:

https://docs.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socketasynceventargs?view=netframework-4.8

转载于:https://www.cnblogs.com/yaosj/p/11170244.html

C#实现异步阻塞TCP(SocketAsyncEventArgs,SendAsync,ReceiveAsync,AcceptAsync,ConnectAsync)...相关推荐

  1. 一文读懂并发与并行,同步与异步阻塞

    并发与并行 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法, 实现用多个任务"一起"执行(实际上总有一些任务不在执行,因为切换任务的速度相当快, 看上去一起执行而 ...

  2. 【gev】 Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

    gev 轻量.快速的 Golang 网络库 https://github.com/Allenxuxu/gev gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 ...

  3. golang mysql 非阻塞_Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库...

    gev 轻量.快速的 Golang 网络库 gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue ...

  4. Nginx讲解(一)Nginx介绍以及同步异步阻塞非阻塞介绍

    一.Nginx概念讲解 Nginx是一个高性能的HTTP和反向代理服务,也是一个IMAP/POP3/SMTP服务 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3) ...

  5. Java网络编程------IO模型的同步/异步/阻塞/非阻塞(1)

    IO模型的同步/异步/阻塞/非阻塞 一.同步/异步/阻塞/非阻塞 1.同步和异步 2.阻塞和非阻塞 3.同步.异步和阻塞.非阻塞组合 二.IO 1.I/O 2.阻塞IO和非阻塞IO 3.同步IO和同步 ...

  6. 老张喝茶 教你同步异步 阻塞与非阻塞(转)

    原文 老张爱喝茶,废话不说,煮开水. 出场人物:老张,水壶两把(普通水壶,简称水壶:会响的水壶,简称响水壶). 1 老张把水壶放到火上,立等水开.(同步阻塞) 老张觉得自己有点傻 2 老张把水壶放到火 ...

  7. pythontcp服务器如何关闭阻塞_python实现单线程多任务非阻塞TCP服务端

    本文实例为大家分享了python实现单线程多任务非阻塞TCP服务端的具体代码,供大家参考,具体内容如下 # coding:utf-8 from socket import * # 1.创建服务器soc ...

  8. 迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章(快快珍藏)...

    常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数据,然后使用适合的视图展示详情数据. 如果网速很慢,代码发起一个HTTP请求后,就卡住不动了,直到十几秒后才拿到HTT ...

  9. 同步 异步 阻塞 非阻塞概念区分

    老张爱喝茶,废话不说,煮开水. 提前剧透一下:同步和非同步主要用来形容被调用线程,阻塞非阻塞用来形容主线程的. 出场人物:老张(主线程),水壶(被调用线程)两把(普通水壶,简称水壶:会响的水壶,简称响 ...

最新文章

  1. Hadoop SequnceFile.Writer 压缩模式及压缩库浅析
  2. 设计模式C++实现(7)——观察者模式
  3. 宿舍管理系统设计分析图
  4. asp.net(C#)页面事件顺序
  5. A few thoughts about Open Source Software
  6. python制作表情,使用Python制作滑稽表情
  7. 洛谷p5369[PKUSC2018]最大前缀和
  8. 百度智能api接口汇总
  9. Blob对象判断是不是图片类型以及Blob数据下载
  10. 今年的第几天?(p16)模拟
  11. 腾讯2018秋招笔试真题-小Q的歌单
  12. [心情]享受堕落的乐趣
  13. Android开发的各个领域发展前景?路线?规划未来看这篇
  14. 三个蛤蜊三块肉才能称得上“老板”
  15. 连锁不平衡的计算以及LDSC分析多基因遗传
  16. 镜头跑焦测试软件,简易的测试镜头跑焦和矫正AF微调办法
  17. Leetcode典型题解答和分析、归纳和汇总——T155(最小栈)
  18. iOS-直播中粒子效果
  19. android 飞机动画,Android实现纸飞机的简单操作
  20. 为什么用恒流电源驱动LED灯具

热门文章

  1. 高中体测数据可视化(体测分数_男生,体测分数-女生)
  2. 基于SSM框架实现文件上传并插入数据库
  3. 数据库学习笔记(一) | 数据(Data)的定义
  4. TRIE_End-to-End Text Reading and Information Extraction for Document Understand 稿
  5. 项城户口迁移联系电话
  6. 如何在海思 Hi3519AV100上移植YOLOV3 (3)
  7. Python渗透测试之ARP毒化和协议应用
  8. (转) 实时SLAM的未来及与深度学习的比较
  9. 如何成为优秀的软件人才(转贴)
  10. GCP认证考试之Storage专题