1.类

(1)socket IO操作内存管理类 BufferManager

    // This class creates a single large buffer which can be divided up // and assigned to SocketAsyncEventArgs objects for use with each // socket I/O operation.  // This enables bufffers to be easily reused and guards against // fragmenting heap memory.// // The operations exposed on the BufferManager class are not thread safe.public class BufferManager{//buffer缓冲区大小private int m_numBytes;//缓冲区private byte[] m_buffer;private Stack<int> m_freeIndexPool;private int m_currentIndex;private int m_bufferSize;public BufferManager(int totalBytes, int bufferSize){m_numBytes = totalBytes;m_currentIndex = 0;m_bufferSize = bufferSize;m_freeIndexPool = new Stack<int>();}/// <summary>/// 给buffer分配缓冲区/// </summary>public void InitBuffer(){m_buffer = new byte[m_numBytes];}/// <summary>///  将buffer添加到args的IO缓冲区中,并设置offset/// </summary>/// <param name="args"></param>/// <returns></returns>public bool SetBuffer(SocketAsyncEventArgs args){if (m_freeIndexPool.Count > 0){args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);}else{if ((m_numBytes - m_bufferSize) < m_currentIndex){return false;}args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);m_currentIndex += m_bufferSize;}return true;}/// <summary>/// 将buffer从args的IO缓冲区中释放/// </summary>/// <param name="args"></param>public void FreeBuffer(SocketAsyncEventArgs args){m_freeIndexPool.Push(args.Offset);args.SetBuffer(null, 0, 0);}/// <summary>/// 释放全部buffer缓存/// </summary>public void FreeAllBuffer(){m_freeIndexPool.Clear();m_currentIndex = 0;m_buffer = null;}}

  

(2)SocketAsyncEventArgsPool

    // Represents a collection of reusable SocketAsyncEventArgs objects.  public class SocketAsyncEventArgsPool{private Stack<SocketAsyncEventArgs> m_pool;// Initializes the object pool to the specified size//// The "capacity" parameter is the maximum number of // SocketAsyncEventArgs objects the pool can holdpublic SocketAsyncEventArgsPool(int capacity){m_pool = new Stack<SocketAsyncEventArgs>(capacity);}// Add a SocketAsyncEventArg instance to the pool////The "item" parameter is the SocketAsyncEventArgs instance // to add to the poolpublic void Push(SocketAsyncEventArgs item){if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }lock (m_pool){m_pool.Push(item);}}// Removes a SocketAsyncEventArgs instance from the pool// and returns the object removed from the poolpublic SocketAsyncEventArgs Pop(){lock (m_pool){return m_pool.Pop();}}/// <summary>/// 清空栈中元素/// </summary>public void Clear(){lock (m_pool){m_pool.Clear();}}// The number of SocketAsyncEventArgs instances in the poolpublic int Count{get { return m_pool.Count; }}}

  

(3)AsyncUserToken

    public class AsyncUserToken{private Socket socket = null;public Socket Socket { get => socket; set => socket = value; }}

  

(4)服务器端操作类TcpServiceSocketAsync

    public class TcpServiceSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//监听socketprivate Socket listenSocket = null;//允许连接到tcp服务器的tcp客户端数量private int numConnections = 0;//用于socket发送和接受的缓存区大小private int bufferSize = 0;//socket缓冲区管理对象private BufferManager bufferManager = null;//SocketAsyncEventArgs池private SocketAsyncEventArgsPool socketAsyncEventArgsPool = null;//当前连接的tcp客户端数量private int numberAcceptedClients = 0;//控制tcp客户端连接数量的信号量private Semaphore maxNumberAcceptedClients = null;//用于socket发送数据的SocketAsyncEventArgs集合private List<SocketAsyncEventArgs> sendAsyncEventArgs = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="numConnections">允许连接到tcp服务器的tcp客户端数量</param>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpServiceSocketAsync(int numConnections, int bufferSize){if (numConnections <= 0 || numConnections > int.MaxValue)throw new ArgumentOutOfRangeException("_numConnections is out of range");if (bufferSize <= 0 || bufferSize > int.MaxValue)throw new ArgumentOutOfRangeException("_receiveBufferSize is out of range");this.numConnections = numConnections;this.bufferSize = bufferSize;bufferManager = new BufferManager(numConnections * bufferSize * 2, bufferSize);socketAsyncEventArgsPool = new SocketAsyncEventArgsPool(numConnections);maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);sendAsyncEventArgs = new List<SocketAsyncEventArgs>();}/// <summary>/// 初始化数据(bufferManager,socketAsyncEventArgsPool)/// </summary>public void Init(){numberAcceptedClients = 0;bufferManager.InitBuffer();SocketAsyncEventArgs readWriteEventArg;for (int i = 0; i < numConnections * 2; i++){readWriteEventArg = new SocketAsyncEventArgs();readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);readWriteEventArg.UserToken = new AsyncUserToken();bufferManager.SetBuffer(readWriteEventArg);socketAsyncEventArgsPool.Push(readWriteEventArg);}}/// <summary>///  开启tcp服务器,等待tcp客户端连接/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);listenSocket.Bind(endpoint);//绑定地址listenSocket.Listen(int.MaxValue);//开始监听StartAccept(null);}catch (Exception ex){throw ex;}}/// <summary>/// 关闭tcp服务器/// </summary>public void CloseSocket(){if (listenSocket == null)return;try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Shutdown(SocketShutdown.Both);}listenSocket.Shutdown(SocketShutdown.Both);}catch { }try{foreach (var e in sendAsyncEventArgs){((AsyncUserToken)e.UserToken).Socket.Close();}listenSocket.Close();}catch { }try{foreach (var e in sendAsyncEventArgs){e.Dispose();}}catch { }sendAsyncEventArgs.Clear();socketAsyncEventArgsPool.Clear();bufferManager.FreeAllBuffer();maxNumberAcceptedClients.Release(numberAcceptedClients);}/// <summary>/// 重新启动tcp服务器/// </summary>public void Restart(){CloseSocket();Init();Start(ip, port);}/// <summary>/// 开始等待tcp客户端连接/// </summary>/// <param name="acceptEventArg"></param>private void StartAccept(SocketAsyncEventArgs acceptEventArg){if (acceptEventArg == null){acceptEventArg = new SocketAsyncEventArgs();acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);}else{// socket must be cleared since the context object is being reusedacceptEventArg.AcceptSocket = null;}maxNumberAcceptedClients.WaitOne();bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);if (!willRaiseEvent){ProcessAccept(acceptEventArg);}}/// <summary>/// Socket.AcceptAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e){ProcessAccept(e);}/// <summary>/// 接受到tcp客户端连接,进行处理/// </summary>/// <param name="e"></param>private void ProcessAccept(SocketAsyncEventArgs e){Interlocked.Increment(ref numberAcceptedClients);//设置用于接收的SocketAsyncEventArgs的socket,可以接受数据SocketAsyncEventArgs recvEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)recvEventArgs.UserToken).Socket = e.AcceptSocket;//设置用于发送的SocketAsyncEventArgs的socket,可以发送数据SocketAsyncEventArgs sendEventArgs = socketAsyncEventArgsPool.Pop();((AsyncUserToken)sendEventArgs.UserToken).Socket = e.AcceptSocket;sendAsyncEventArgs.Add(sendEventArgs);StartRecv(recvEventArgs);StartAccept(e);}/// <summary>/// tcp服务器开始接受tcp客户端发送的数据/// </summary>private void StartRecv(SocketAsyncEventArgs e){bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.ReceiveAsync(e);if (!willRaiseEvent){ProcessReceive(e);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp客户端数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred));StartRecv(e);}else{CloseClientSocket(e);}}/// <summary>/// 处理tcp服务器发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);CloseClientSocket(e);}}/// <summary>/// 关闭一个与tcp客户端连接的socket/// </summary>/// <param name="e"></param>private void CloseClientSocket(SocketAsyncEventArgs e){AsyncUserToken token = e.UserToken as AsyncUserToken;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放token.Socket.Shutdown(SocketShutdown.Both);}catch { }token.Socket.Close();Interlocked.Decrement(ref numberAcceptedClients);socketAsyncEventArgsPool.Push(e);maxNumberAcceptedClients.Release();if (e.LastOperation == SocketAsyncOperation.Send)sendAsyncEventArgs.Remove(e);}/// <summary>/// 给全部tcp客户端发送数据/// </summary>/// <param name="message"></param>public void SendMessageToAllClients(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");foreach (var e in sendAsyncEventArgs){byte[] buff = Encoding.UTF8.GetBytes(message);if (buff.Length > bufferSize)throw new ArgumentOutOfRangeException("message is out off range");buff.CopyTo(e.Buffer, e.Offset);e.SetBuffer(e.Offset, buff.Length);bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.SendAsync(e);if (!willRaiseEvent){ProcessSend(e);}}}}

    

(5)客户端操作类TcpClientSocketAsync

    public class TcpClientSocketAsync{//接收数据事件public Action<string> recvMessageEvent = null;//发送结果事件public Action<int> sendResultEvent = null;//接受缓存数组private byte[] recvBuff = null;//发送缓存数组private byte[] sendBuff = null;//连接socketprivate Socket connectSocket = null;//用于发送数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs sendEventArg = null;//用于接收数据的SocketAsyncEventArgsprivate SocketAsyncEventArgs recvEventArg = null;//tcp服务器ipprivate string ip = "";//tcp服务器端口private int port = 0;/// <summary>/// 构造函数/// </summary>/// <param name="bufferSize">用于socket发送和接受的缓存区大小</param>public TcpClientSocketAsync(int bufferSize){//设置用于发送数据的SocketAsyncEventArgssendBuff = new byte[bufferSize];sendEventArg = new SocketAsyncEventArgs();sendEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);sendEventArg.SetBuffer(sendBuff, 0, bufferSize);//设置用于接受数据的SocketAsyncEventArgsrecvBuff = new byte[bufferSize];recvEventArg = new SocketAsyncEventArgs();recvEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);recvEventArg.SetBuffer(recvBuff, 0, bufferSize);}/// <summary>/// 开启tcp客户端,连接tcp服务器/// </summary>/// <param name="ip"></param>/// <param name="port"></param>public void Start(string ip, int port){if (string.IsNullOrEmpty(ip))throw new ArgumentNullException("ip cannot be null");if (port < 1 || port > 65535)throw new ArgumentOutOfRangeException("port is out of range");this.ip = ip;this.port = port;try{connectSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);IPAddress address = IPAddress.Parse(ip);IPEndPoint endpoint = new IPEndPoint(address, port);//连接tcp服务器SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();connectEventArg.Completed += ConnectEventArgs_Completed;connectEventArg.RemoteEndPoint = endpoint;//设置要连接的tcp服务器地址bool willRaiseEvent = connectSocket.ConnectAsync(connectEventArg);if (!willRaiseEvent)ProcessConnect(connectEventArg);}catch (Exception ex){throw ex;}}/// <summary>/// 发送数据到tcp服务器/// </summary>/// <param name="message">要发送的数据</param>public void SendMessage(string message){if (string.IsNullOrEmpty(message))throw new ArgumentNullException("message cannot be null");if (connectSocket == null)throw new Exception("socket cannot be null");byte[] buff = Encoding.UTF8.GetBytes(message);buff.CopyTo(sendBuff, 0);sendEventArg.SetBuffer(0, buff.Length);bool willRaiseEvent = connectSocket.SendAsync(sendEventArg);if (!willRaiseEvent){ProcessSend(sendEventArg);}}/// <summary>/// 关闭tcp客户端/// </summary>public void CloseSocket(){if (connectSocket == null)return;try{//关闭socket时,单独使用socket.close()通常会造成资源提前被释放,应该在关闭socket之前,先使用shutdown进行接受或者发送的禁用,再使用socket进行释放connectSocket.Shutdown(SocketShutdown.Both);}catch { }try{connectSocket.Close();}catch { }}/// <summary>/// 重启tcp客户端,重新连接tcp服务器/// </summary>public void Restart(){CloseSocket();Start(ip, port);}/// <summary>/// Socket.ConnectAsync完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e){ProcessConnect(e);}private void ProcessConnect(SocketAsyncEventArgs e){StartRecv();}/// <summary>/// tcp客户端开始接受tcp服务器发送的数据/// </summary>private void StartRecv(){bool willRaiseEvent = connectSocket.ReceiveAsync(recvEventArg);if (!willRaiseEvent){ProcessReceive(recvEventArg);}}/// <summary>/// socket.sendAsync和socket.recvAsync的完成回调函数/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void IO_Completed(object sender, SocketAsyncEventArgs e){switch (e.LastOperation){case SocketAsyncOperation.Receive:ProcessReceive(e);break;case SocketAsyncOperation.Send:ProcessSend(e);break;default:throw new ArgumentException("The last operation completed on the socket was not a receive or send");}}/// <summary>/// 处理接受到的tcp服务器数据/// </summary>/// <param name="e"></param>private void ProcessReceive(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success){if (recvMessageEvent != null)//一定要指定GetString的长度,否则整个bugger都要转成stringrecvMessageEvent(Encoding.UTF8.GetString(e.Buffer, 0, e.BytesTransferred));StartRecv();}else{Restart();}}/// <summary>/// 处理tcp客户端发送的结果/// </summary>/// <param name="e"></param>private void ProcessSend(SocketAsyncEventArgs e){AsyncUserToken token = (AsyncUserToken)e.UserToken;if (e.SocketError == SocketError.Success){if (sendResultEvent != null)sendResultEvent(e.BytesTransferred);}else{if (sendResultEvent != null)sendResultEvent(-1);Restart();}}}

   

2.使用

(1)服务器端

    public partial class Form1 : Form{TcpServiceSocketAsync tcpServiceSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;public Form1(){InitializeComponent();tcpServiceSocket = new TcpServiceSocketAsync(10, 1024);tcpServiceSocket.recvMessageEvent += new Action<string>(Recv);tcpServiceSocket.Init();}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpServiceSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpServiceSocket.SendMessageToAllClients(message);tbSend.Text = "";}}

  

(2)客户端

    public partial class Form1 : Form{private TcpClientSocketAsync tcpClientSocket = null;private readonly string ip = "192.168.172.142";private readonly int port = 8090;private readonly int buffSize = 1024;public Form1(){InitializeComponent();tcpClientSocket = new TcpClientSocketAsync(buffSize);tcpClientSocket.recvMessageEvent += new Action<string>(Recv);}private void Recv(string message){this.BeginInvoke(new Action(() =>{tbRecv.Text += message + "\r\n";}));}private void btnStart_Click(object sender, EventArgs e){tcpClientSocket.Start(ip, port);}private void btnSend_Click(object sender, EventArgs e){string message = tbSend.Text.Trim();if (string.IsNullOrEmpty(message))return;tcpClientSocket.SendMessage(message);tbSend.Text = "";}}

   

3.演示

参考:

https://docs.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socketasynceventargs?view=netframework-4.8

转载于:https://www.cnblogs.com/yaosj/p/11170244.html

C#实现异步阻塞TCP(SocketAsyncEventArgs,SendAsync,ReceiveAsync,AcceptAsync,ConnectAsync)...相关推荐

  1. 一文读懂并发与并行,同步与异步阻塞

    并发与并行 并发:指的是任务数多余cpu核数,通过操作系统的各种任务调度算法, 实现用多个任务"一起"执行(实际上总有一些任务不在执行,因为切换任务的速度相当快, 看上去一起执行而 ...

  2. 【gev】 Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库

    gev 轻量.快速的 Golang 网络库 https://github.com/Allenxuxu/gev gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 ...

  3. golang mysql 非阻塞_Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库...

    gev 轻量.快速的 Golang 网络库 gev 是一个轻量.快速的基于 Reactor 模式的非阻塞 TCP 网络库,底层并不使用 golang net 库,而是使用 epoll 和 kqueue ...

  4. Nginx讲解(一)Nginx介绍以及同步异步阻塞非阻塞介绍

    一.Nginx概念讲解 Nginx是一个高性能的HTTP和反向代理服务,也是一个IMAP/POP3/SMTP服务 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3) ...

  5. Java网络编程------IO模型的同步/异步/阻塞/非阻塞(1)

    IO模型的同步/异步/阻塞/非阻塞 一.同步/异步/阻塞/非阻塞 1.同步和异步 2.阻塞和非阻塞 3.同步.异步和阻塞.非阻塞组合 二.IO 1.I/O 2.阻塞IO和非阻塞IO 3.同步IO和同步 ...

  6. 老张喝茶 教你同步异步 阻塞与非阻塞(转)

    原文 老张爱喝茶,废话不说,煮开水. 出场人物:老张,水壶两把(普通水壶,简称水壶:会响的水壶,简称响水壶). 1 老张把水壶放到火上,立等水开.(同步阻塞) 老张觉得自己有点傻 2 老张把水壶放到火 ...

  7. pythontcp服务器如何关闭阻塞_python实现单线程多任务非阻塞TCP服务端

    本文实例为大家分享了python实现单线程多任务非阻塞TCP服务端的具体代码,供大家参考,具体内容如下 # coding:utf-8 from socket import * # 1.创建服务器soc ...

  8. 迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章(快快珍藏)...

    常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数据,然后使用适合的视图展示详情数据. 如果网速很慢,代码发起一个HTTP请求后,就卡住不动了,直到十几秒后才拿到HTT ...

  9. 同步 异步 阻塞 非阻塞概念区分

    老张爱喝茶,废话不说,煮开水. 提前剧透一下:同步和非同步主要用来形容被调用线程,阻塞非阻塞用来形容主线程的. 出场人物:老张(主线程),水壶(被调用线程)两把(普通水壶,简称水壶:会响的水壶,简称响 ...

最新文章

  1. 天体摇摆仪的工作原理-测量电路中的部分电路波形
  2. es6 name属性
  3. Google 2020游戏开发者峰会回顾、比尔•盖茨的夏日书单、Libaom 2.0.0发布等|Decode the Week...
  4. 在MAC下安装一些软件时提示来自身份不明开发者
  5. ActiveMQ运行失败 not running
  6. PHP面向对象处理请求,PHP面向对象之命令模式
  7. hbase常用命令及操作
  8. 国内外智慧医疗云平台调研
  9. 第一篇博客--大学成长指南
  10. vue 图表组件_基于Vue.js中可用的JUI图表的Vue组件
  11. 2015年阿里巴巴校招面试经验汇总
  12. 一个新进前端小白实习僧的初次探索
  13. 液化气瓶监管技术如何做到正确选型
  14. python:迭代器对象,迭代器和迭代
  15. Unexpected token u in JSON at position 0报错如何解决?
  16. 2019年双十一倒计时JS代码
  17. rk3568can设置异常
  18. Python HTTP代理的优缺点?芝麻代理豌豆代理熊猫代理讯代理?
  19. 命令行方式生成BOOT.BIN文件
  20. EasyPoi横向遍历的用法(不懂就问)

热门文章

  1. Linux下文件内容更新了,文件夹时间戳却没变?
  2. 深度学习之GPU编程知识总结
  3. 用EasyGBD做国标GB28181协议级联
  4. 备课大师控件开发流程
  5. 微信小程序使用图片标签出现白底
  6. leslie模型matlab代码6,leslie模型人口预测程序,请求大家!
  7. ios开发之app内启动用户评价
  8. 直播风口,是什么在支撑教育、电商、泛娱乐等场景?
  9. Windows CMD 黑客常用命令
  10. 小学生用计算机来计算可以吗,餐桌上关于小学生是否可以使用计算器的争论