一直以来很想梳理下我在开发过程中使用异步编程的心得和体会,但是由于我是APM异步编程模式的死忠,当TAP模式和TPL模式出现的时候我并未真正的去接纳这两种模式,所以导致我一直没有花太多心思去整理这两部分异步编程模型。今天在CodeProject上面偶然间看到一篇关于异步编程的文章,概括总结的非常好,省去了我自己写的麻烦,索性翻译过来,以飨各位。

在阻塞和并行编程过程中,异步和多线程是非常重要的特性。异步编程可以涉及到多线程,也可以不用涉及。如果我们能够把二者放到一起来讲解的话,我们会理解的更快一些。所以今天在这里,我主要想讲解的内容是:

  1. 异步编程
  2. 是否需要多线程
  3. TAP编程模型
  4. 并行编程

首先来说下异步编程

所谓的异步编程就是指 当前的操作独立于主操作流程。在C#编程中,我们总是从进入Main方法开始 ,Main方法返回结束。在开始和结束之间的所有操作,都会挨个的执行,下一个操作必须等到上一个操作完毕才开始执行,我们看看如下代码:

static void Main(string[] args){DoTaskOne();DoTaskTwo();}

“DoTaskOne”方法必须在“DoTaskTwo”方法之前执行。也就是发生了阻塞现象。

但是在异步编程中,方法的调用并不会阻塞主线程。当方法被调用后,主线程仍会继续执行其他的任务。这种情形,一般都是使用Thread或者是Task来实现的。

在上面的场景中,如果我们让方法“DoTaskOne”异步执行,那么主线程将会立即返回,然后执行“DoTaskTwo”方法。

在.NET中,我们可以利用Thread类或者异步模式来创建我们自己的线程来实现异步编程。在.NET中有三种不同的异步模型:

1.APM模型

2.EAP模型

但是,遗憾的是,上面的这两种模型都不是微软所推荐的,所以我们将不对其做具体的讨论。如果你对这两种模型感兴趣,请移步:

https://msdn.microsoft.com/en-us/library/ms228963(v=vs.110).aspx

https://msdn.microsoft.com/en-us/library/ms228969(v=vs.110).aspx

3.TAP模型:这是微软所推荐的,我们稍后将会详细的进行讲解。

我们是否需要启用多线程

如果我们在.NET 4.5版本中使用异步编程模型,绝大部分情况下我们无需手动创建线程。因为编译器已经为我们做了足够多的工作了。

创建一个新线程不仅耗费资源而且花费时间,除非我们真的需要去控制一个多线程,否则是不需要去创建的。因为TAP模型和TPL模型已经为异步编程和并行编程提供了绝好的支持。TAP模型和TPL模型使用Task来进行操作,而Task类使用线程池中的线程来完成操作的。

Task可以在如下场景中运行:

  1. 当前线程
  2. 新线程
  3. 线程池中的线程
  4. 没有线程

在我们使用Task过程中,我们无需担心线程的创建或者是使用,因为.NET framework已经为我们处理好了各种细节。但是如果我们需要控制线程的话,比如说以下的场景:

  1. 为线程设置名称
  2. 为线程设置优先级
  3. 设置线程为后台线程

那么我们不得不使用Thread类来控制。

async和await关键字

.NET framework引入了两个新的关键字“async”和“await”来执行异步编程。当在方法上使用await关键字的时候,我们需要同时使用async关键字。await关键字是在调用异步方法之前被使用的。它主要是使方法的后续执行编程异步的,例如:

private asyncstatic void CallerWithAsync()// async modifier is used
{// await is used before a method call. It suspends //execution of CallerWithAsync() method and control returs to the calling thread that can//perform other task.string result = await GetSomethingAsync();// this line would not be executed before  GetSomethingAsync() //method completesConsole.WriteLine(result);
}

在这里,async关键字只能够被那些返回Task或者是void的方法所使用。它不能被用于Main入口方法上。

我们不能够在所有的方法上使用await关键字,因为一旦方法上有了await关键字,那么我们的返回类型就变成了“awaitable “类型。下面的几种类型是” awaitable “的:

  1. Task
  2. Task<T>

TAP模型

首先,我们需要一个能够返回Task或者Task<T>的异步方法。我们可以通过如下方式来创建Task:

  1. Task.Factory.StartNew方法:在.net 4.5版本之前(.net 4)中,这是默认的创建和组织task的方式。
  2. Task.Run或者Task.Run<T>方法:从.net 4.5版本开始,这个方法被引进来了。这个方法足以能够应付大多数的场景。
  3. Task.FromResult方法:如果方法已经执行完毕并返回结果,我们可以使用这个方法来创建一个task。

Task.Factory.StartNew还有一些高级应用场景,请移步:http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx

下面的链接则展示了创建Task的几种方式: 
http://dotnetcodr.com/2014/01/01/5-ways-to-start-a-task-in-net-c/

Task的创建和等待

我们接下来将会利用Task.Run<T>方法来创建我们自己的Task。它会将某个特殊的方法放入位于ThreadPool的执行队列中,然后会为这些方法返回一个task句柄。下面的步骤将会为你展示我们如何为一个同步方法创建异步的Task的:

  1. 我们有一个同步的方法,需要耗费一些时间来执行完毕:
   static string Greeting(string name){Thread.Sleep(3000);return string.Format("Hello, {0}", name);}
  1. 为了让此方法一步执行,我们需要对其进行异步方法的包装。这个异步方法我们暂定为“GreetingAsync“。
static Task<string> GreetingAsync(string name)
{return Task.Run<string>(() =>{return Greeting(name);});
}

现在我们可以使用await关键字来调用GreetingAsync方法

private asyncstatic void CallWithAsync(){//some other tasksstring result = awaitGreetingAsync("Bulbul");//We can add  multiple ‘await’ in same ‘async’ method//string result1 = await GreetingAsync(“Ahmed”);//string result2 = await GreetingAsync(“Every Body”);Console.WriteLine(result);}

当“CallWithAsync”方法被调用的时候,它首先像正常的同步方法被调用,直到遇到await关键字。当它遇到await关键字的时候,它会暂定方法的执行,然后等待“GreetingAsync(“Bulbul”)”方法执行完毕。在此期间,这个控制流程会返回到“CallWithAsync”方法上,然后调用者就可以做其他的任务了。

当“GreetingAsync(“Bulbul”)”方法执行完毕以后,“CallWithAsync”方法就会唤醒在await关键字后面的其他的方法,所以在这里它会继续执行“Console.WriteLine(result)”方法。

  1. 任务的继续执行: “ContinueWith”方法则表明任务的持续执行。

private static void CallWithContinuationTask(){Task<string> t1 = GreetingAsync("Bulbul");t1.ContinueWith(t =>{string result = t.Result;Console.WriteLine(result);});}

当我们使用ContinueWith方法的时候,我们无需使用await关键字。因为编译器会自动的将await关键字放到正确的位置上。

等待多个异步方法

让我们先看下面的代码:

  private asyncstatic void CallWithAsync(){string result = awaitGreetingAsync("Bulbul");string result1 = awaitGreetingAsync("Ahmed");Console.WriteLine(result);Console.WriteLine(result1);}

在这里,我们在顺序的等待两个方法被执行。GreetingAsync("Ahmed")方法将会在GreetingAsync("Bulbul")执行后,再执行。但是,如果result和result1彼此不是独立的,那么await关键字这样用是不合适的。

在上面的场景中,我们其实是无需使用await关键字的。所以方法可以被改成如下的样子:

 private async static void MultipleAsyncMethodsWithCombinators(){Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);Console.WriteLine("Finished both methods.\n " +"Result 1: {0}\n Result 2: {1}", t1.Result, t2.Result);}

在这里我们用了Task.WhenAll方法,它主要是等待所有的Task都完成工作后再触发。Task类还有另一个方法:WhenAny,它主要是等待任何一个Task完成就会触发。

异常处理

当进行错误处理的时候,我们不得不将await关键字放到try代码块中。

private asyncstatic void CallWithAsync()
{try{string result = awaitGreetingAsync("Bulbul");}catch (Exception ex){Console.WriteLine(&ldquo;handled {0}&rdquo;, ex.Message);}
}

但是,如果我们有多个await关键字存在于try代码快中,那么只有第一个错误被处理,因为第二个await是无法被执行的。如果我们想要所有的方法都被执行,甚至当其中一个抛出异常的时候,我们不能使用await关键字来调用他们,我们需要使用Task.WhenAll方法来等待所有的task执行。

private asyncstatic void CallWithAsync(){try{Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await Task.WhenAll(t1, t2);}catch (Exception ex){Console.WriteLine(&ldquo;handled {0}&rdquo;, ex.Message);}}

尽管所有的任务都会完成,但是我们可以从第一个task那里看到错误。虽然它不是第一个抛出错误的,但是它是列表中的第一个任务。

如果想要从所有的任务中获取错误,那么有一个方式就是将其在try代码块外部进行声明。然后检查Task方法的“IsFaulted”属性。如果有错误抛出,那么其“IsFaulted”属性为true。

示例如下:

 static async void ShowAggregatedException(){Task taskResult = null;try {Task<string> t1 = GreetingAsync("Bulbul");Task<string> t2 = GreetingAsync("Ahmed");await (taskResult = Task.WhenAll(t1, t2));}catch (Exception ex){Console.WriteLine("handled {0}", ex.Message);foreach (var innerEx in taskResult.Exception.InnerExceptions){Console.WriteLine("inner exception {0}", nnerEx.Message);}}}

Task的取消执行

如果直接使用ThreadPool中的Thread,我们是无法进行取消操作的。但是现在Task类提供了一种基于CancellationTokenSource类的方式来取消任务的执行,可以按照如下步骤来进行:

  1. 异步方法需要附带一个“CancellationToken”类型。
  2. 创建CancellationTokenSource类的实例:
  3. 将CancellationToken传递给异步方法:
  4. 对于长时间执行的方法,如果想取消的话,我们需要调用CancellationToken的ThrowIfCancellationRequested方法。
  5. 捕捉Task抛出的 OperationCanceledException。
  6. 现在,如果我们通过调用CancellationTokenSource的cancel方法来取消当前的操作的话,对于那些长时间运行的操作,将会抛出OperationCanceledException错误。我们也可以通过设置超时时间来取消任务。更多关于CancellationTokenSource类的信息,请移步:https://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource%28v=vs.110%29.aspx

var cts = new CancellationTokenSource();Task<string> t1 = GreetingAsync("Bulbul", cts.Token);
static string Greeting(string name, CancellationToken token)
{Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);
}

下面让我们看看如何设置超时时间来取消任务的执行:

static void Main(string[] args){CallWithAsync();Console.ReadKey();           }async static void CallWithAsync(){try{CancellationTokenSource source = new CancellationTokenSource();source.CancelAfter(TimeSpan.FromSeconds(1));var t1 = await GreetingAsync("Bulbul", source.Token);}catch (OperationCanceledException ex){Console.WriteLine(ex.Message);}}static Task<string> GreetingAsync(string name, CancellationToken token){return Task.Run<string>(() =>{return Greeting(name, token);});}static string Greeting(string name, CancellationToken token){Thread.Sleep(3000);token.ThrowIfCancellationRequested();return string.Format("Hello, {0}", name);}

并行编程

在.net 4.5中,存在一个叫做“Parallel”的类,这个类可以进行并行操作。当然这种并行和那些充分利用cpu计算能力的Thread 是有差别的,简单说起来,它有两种表现方式:

1.数据并行。 如果我们有很多的数据需要计算,我们需要他们并行的进行,那么我们可以使用For或者ForEach方法来进行:

ParallelLoopResult result =Parallel.For(0, 100, async (int i) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);});

如果我们在计算的过程中,想要中断并行,我们可以把ParallelLoopState当做参数传递进去,我们就可以实现认为和中断这种循环:

ParallelLoopResult result =Parallel.For(0, 100, async (int i, ParallelLoopState pls) =>{Console.WriteLine("{0}, task: {1}, thread: {2}", i,Task.CurrentId, Thread.CurrentThread.ManagedThreadId);await Task.Delay(10);if (i > 5) pls.Break();});

需要注意的是,当我们需要中断循环的时候,由于其运行在诸多个线程之上,如果线程数多于我们设定的中断数时,上述的执行可能就不太准确。

  1. 任务并行。如果我们想要多个任务并行处理,那么我们可以使用Parallel.Invoke方法来接受Action委托数组。例如:
static void ParallelInvoke()
{Parallel.Invoke(MethodOne, MethodTwo);
}

参考文章:http://www.codeproject.com/Articles/996857/Asynchronous-programming-and-Threading-in-Csharp-N?bmkres=exist#_articleTop

引用:https://www.cnblogs.com/scy251147/p/4597615.html

异步编程总结

最近在为公司的分布式服务框架做支持异步调用的开发,这种新特性的上线需要进行各种严格的测试。在并发性能测试时,性能一直非常差,而且非常的不稳定。经过不断的分析调优,发现Socket通信和多线程异步回调存在较为严重的性能问题。经过多方优化,性能终于达标。下面是原版本、支持异步最初版本和优化后版本的性能比较。差异还是非常巨大的。另外说明一下,总耗时是指10000次请求累计执行时间。

从上图可以看到,支持异步的版本,在单线程模式下,性能的表现与老版本差异并不明显,但是10线程下差异就非常巨大,而100线程的测试结果反而有所好转。通过分析,两个版本的性能差异如此巨大,主要是因为:

  1. 同步模式会阻塞客户端请求,说白了,在线程内就是串行请求的。但是在异步模式中,线程内的请求不再阻塞,网络流量、后台计算压力瞬间暴涨,峰值是同步模式的100倍。网络传输变成瓶颈点。
  2. 在压力暴涨的情况下,CPU资源占用也会突变, 并且ThreadPool、Task、异步调用的执行都将变慢。

在网络通信方面,把原先半异步的模式调整为了SocketAsyncEventArgs 模式。下面是Socket通信的几种模型的介绍和示例,总结一下,与大家分享。下次再与大家分享,并发下异步调用的性能优化方案。

APM方式: Asynchronous Programming Model

    异步编程模型是一种模式,该模式允许用更少的线程去做更多的操作,.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式。NET Framework中的APM也称为Begin/End模式。此种模式下,调用BeginXXX方法来启动异步操作,然后返回一个IAsyncResult 对象。当操作执行完成后,系统会触发IAsyncResult 对象的执行。 具体可参考: https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/asynchronous-programming-model-apm

.net中的Socket异步模式也支持APM,与同步模式或Blocking模式相比,可以更好的利用网络带宽和系统资源编写出具有更高性能的程序。参考具体代码如下:

服务端监听:

Socket serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

//本机预使用的IP和端口

IPEndPoint serverIP = new IPEndPoint(IPAddress.Any, 9050);

//绑定服务端设置的IP

serverSocket.Bind(serverIP);

//设置监听个数

serverSocket.Listen(1);

//异步接收连接请求

serverSocket.BeginAccept(ar =>

{

    base.communicateSocket = serverSocket.EndAccept(ar);

   AccessAciton();

 }, null);

客户端连接:

var communicateSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

   communicateSocket.Bind(new IPEndPoint(IPAddress.Any, 9051));

            

        //服务器的IP和端口

        IPEndPoint serverIP;

        try

        {

            serverIP = new IPEndPoint(IPAddress.Parse(IP), 9050);

        }

        catch

        {

            throw new Exception(String.Format("{0}不是一个有效的IP地址!", IP));

        }

            

        //客户端只用来向指定的服务器发送信息,不需要绑定本机的IP和端口,不需要监听

        try

        {

           communicateSocket.BeginConnect(serverIP, ar =>

            {

                AccessAciton();

            }, null);

        }

        catch

        {

            throw new Exception(string.Format("尝试连接{0}不成功!", IP));

        }

客户端请求:

if (communicateSocket.Connected == false)

        {

            throw new Exception("还没有建立连接, 不能发送消息");

        }

        Byte[] msg = Encoding.UTF8.GetBytes(message);

        communicateSocket.BeginSend(msg,0, msg.Length, SocketFlags.None,

            ar => {

                

            }, null);

服务端响应:

Byte[] msg = new byte[1024];

        //异步的接受消息

        communicateSocket.BeginReceive(msg, 0, msg.Length, SocketFlags.None,

            ar => {

                //对方断开连接时, 这里抛出Socket Exception              

                    communicateSocket.EndReceive(ar);

                ReceiveAction(Encoding.UTF8.GetString(msg).Trim('\0',' '));

                Receive(ReceiveAction);

            }, null);

注意:异步模式虽好,但是如果进行大量异步套接字操作,是要付出很高代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响系统性能。如需要更好的理解APM模式,最了解EAP模式:Event-based Asynchronous Pattern:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/event-based-asynchronous-pattern-eap 。

 

TAP 方式: Task-based Asynchronous Pattern

基于任务的异步模式,该模式主要使用System.Threading.Tasks.Task和Task<T>类来完成异步编程,相对于APM 模式来讲,TAP使异步编程模式更加简单(因为这里我们只需要关注Task这个类的使用),同时TAP也是微软推荐使用的异步编程模式。APM与TAP的本质区别,请参考我的一篇历史博客:http://www.cnblogs.com/vveiliang/p/7943003.html

TAP模式与APM模式是两种异步模式的实现,从性能上看没有本质的差别。TAP的资料可参考:https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap 。参考具体代码如下:

服务端:

publicclassStateContext

{

// Client socket.

publicSocketWorkSocket =null;

// Size of receive buffer.

publicconstintBufferSize = 1024;

// Receive buffer.

publicbyte[] buffer =newbyte[BufferSize];

// Received data string.

publicStringBuildersb =newStringBuilder(100);

}

publicclassAsynchronousSocketListener

{

// Thread signal.

publicstaticManualResetEventreSetEvent =newManualResetEvent(false);

publicAsynchronousSocketListener()

{

}

publicstaticvoidStartListening()

{

// Data buffer for incoming data.

byte[] bytes =newByte[1024];

// Establish the local endpoint for the socket.

IPAddressipAddress =IPAddress.Parse("127.0.0.1");

IPEndPointlocalEndPoint =newIPEndPoint(ipAddress, 11000);

// Create a TCP/IP socket.

Socketlistener =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);

// Bind the socket to the local

try

{

listener.Bind(localEndPoint);

listener.Listen(100);

while(true)

{

// Set the event to nonsignaled state.

reSetEvent.Reset();

// Start an asynchronous socket to listen for connections.

Console.WriteLine("Waiting for a connection...");

listener.BeginAccept(newAsyncCallback(AcceptCallback), listener);

// Wait until a connection is made before continuing.

reSetEvent.WaitOne();

}

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

Console.WriteLine("\nPress ENTER to continue...");

Console.Read();

}

publicstaticvoidAcceptCallback(IAsyncResultar)

{

// Signal the main thread to continue.

reSetEvent.Set();

// Get the socket that handles the client request.

Socketlistener = (Socket)ar.AsyncState;

Sockethandler = listener.EndAccept(ar);

// Create the state object.

StateContextstate =newStateContext();

state.WorkSocket = handler;

handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);

}

publicstaticvoidReadCallback(IAsyncResultar)

{

Stringcontent =String.Empty;

StateContextstate = (StateContext)ar.AsyncState;

Sockethandler = state.WorkSocket;

// Read data from the client socket.

intbytesRead = handler.EndReceive(ar);

if(bytesRead > 0)

{

// There might be more data, so store the data received so far.

state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));

// Check for end-of-file tag. If it is not there, read

// more data.

content = state.sb.ToString();

if(content.IndexOf("<EOF>") > -1)

{

Console.WriteLine("读取 {0} bytes. \n 数据: {1}", content.Length, content);

Send(handler, content);

}

else

{

handler.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReadCallback), state);

}

}

}

privatestaticvoidSend(Sockethandler,Stringdata)

{

byte[] byteData =Encoding.ASCII.GetBytes(data);

handler.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), handler);

}

privatestaticvoidSendCallback(IAsyncResultar)

{

try

{

Sockethandler = (Socket)ar.AsyncState;

intbytesSent = handler.EndSend(ar);

Console.WriteLine("发送 {0} bytes.", bytesSent);

handler.Shutdown(SocketShutdown.Both);

handler.Close();

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

publicstaticintMain(String[] args)

{

StartListening();

return0;

}

客户端:

publicclassAsynchronousClient

{

// The port number for the remote device.

privateconstintport = 11000;

// ManualResetEvent instances signal completion.

privatestaticManualResetEventconnectResetEvent =newManualResetEvent(false);

privatestaticManualResetEventsendResetEvent =newManualResetEvent(false);

privatestaticManualResetEventreceiveResetEvent =newManualResetEvent(false);

privatestaticStringresponse =String.Empty;

privatestaticvoidStartClient()

{

try

{

IPAddressipAddress =IPAddress.Parse("127.0.0.1");

IPEndPointremoteEP =newIPEndPoint(ipAddress, port);

// Create a TCP/IP socket.

Socketclient =newSocket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);

// Connect to the remote endpoint.

client.BeginConnect(remoteEP,newAsyncCallback(ConnectCallback), client);

connectResetEvent.WaitOne();

Send(client,"This is a test<EOF>");

sendResetEvent.WaitOne();

Receive(client);

receiveResetEvent.WaitOne();

Console.WriteLine("Response received : {0}", response);

// Release the socket.

client.Shutdown(SocketShutdown.Both);

client.Close();

Console.ReadLine();

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

privatestaticvoidConnectCallback(IAsyncResultar)

{

try

{

Socketclient = (Socket)ar.AsyncState;

client.EndConnect(ar);

Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());

connectResetEvent.Set();

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

privatestaticvoidReceive(Socketclient)

{

try

{

StateContextstate =newStateContext();

state.WorkSocket = client;

client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

privatestaticvoidReceiveCallback(IAsyncResultar)

{

try

{

StateContextstate = (StateContext)ar.AsyncState;

Socketclient = state.WorkSocket;

intbytesRead = client.EndReceive(ar);

if(bytesRead > 0)

{

state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));

client.BeginReceive(state.buffer, 0,StateContext.BufferSize, 0,newAsyncCallback(ReceiveCallback), state);

}

else

{

if(state.sb.Length > 1)

{

response = state.sb.ToString();

}

receiveResetEvent.Set();

}

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

privatestaticvoidSend(Socketclient,Stringdata)

{

byte[] byteData =Encoding.ASCII.GetBytes(data);

client.BeginSend(byteData, 0, byteData.Length, 0,newAsyncCallback(SendCallback), client);

}

privatestaticvoidSendCallback(IAsyncResultar)

{

try

{

Socketclient = (Socket)ar.AsyncState;

intbytesSent = client.EndSend(ar);

Console.WriteLine("Sent {0} bytes to server.", bytesSent);

sendResetEvent.Set();

}

catch(Exceptione)

{

Console.WriteLine(e.ToString());

}

}

publicstaticintMain(String[] args)

{

StartClient();

return0;

}

}

SAEA方式: SocketAsyncEventArgs

APM模式、TAP模式虽然解决了Socket的并发问题,但是在大并发下还是有较大性能问题的。这主要是因为上述两种模式都会生产 IAsyncResult 等对象 ,而大量垃圾对象的回收会非常影响系统的性能。为此,微软推出了 SocketAsyncEventArgs 。SocketAsyncEventArgs 是 .NET Framework 3.5 开始支持的一种支持高性能 Socket 通信的实现。SocketAsyncEventArgs 相比于 APM 方式的主要优点可以描述如下,无需每次调用都生成 IAsyncResult 等对象,向原生 Socket 更靠近一些。这是官方的解释:

The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.

SocketAsyncEventArgs主要为高性能网络服务器应用程序而设计,避免了在异步套接字 I/O 量非常大时,大量垃圾对象创建与回收。使用此类执行异步套接字操作的模式包含以下步骤,具体说明可参考:https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx 。

  1. 分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
  2. 将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
  3. 调用适当的套接字方法 (xxxAsync) 以启动异步操作。
  4. 如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
  5. 如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
  6. 将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。

下面是封装的一个组件代码:

classBufferManager

{

intm_numBytes;                // the total number of bytes controlled by the buffer pool

byte[] m_buffer;               // the underlying byte array maintained by the Buffer Manager

Stack<int> m_freeIndexPool;    //

intm_currentIndex;

intm_bufferSize;

publicBufferManager(inttotalBytes,intbufferSize)

{

m_numBytes = totalBytes;

m_currentIndex = 0;

m_bufferSize = bufferSize;

m_freeIndexPool =newStack<int>();

}

// Allocates buffer space used by the buffer pool

publicvoidInitBuffer()

{

// create one big large buffer and divide that

// out to each SocketAsyncEventArg object

m_buffer =newbyte[m_numBytes];

}

// Assigns a buffer from the buffer pool to the

// specified SocketAsyncEventArgs object

//

// <returns>true if the buffer was successfully set, else false</returns>

publicboolSetBuffer(SocketAsyncEventArgsargs)

{

if(m_freeIndexPool.Count > 0)

{

args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);

}

else

{

if((m_numBytes - m_bufferSize) < m_currentIndex)

{

returnfalse;

}

args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);

m_currentIndex += m_bufferSize;

}

returntrue;

}

// Removes the buffer from a SocketAsyncEventArg object.

// This frees the buffer back to the buffer pool

publicvoidFreeBuffer(SocketAsyncEventArgsargs)

{

m_freeIndexPool.Push(args.Offset);

args.SetBuffer(null, 0, 0);

}

}

///<summary>

///This class is used to communicate with a remote application over TCP/IP protocol.

///</summary>

classTcpCommunicationChannel

{

#regionPrivate fields

///<summary>

///Size of the buffer that is used to receive bytes from TCP socket.

///</summary>

privateconstintReceiveBufferSize = 8 * 1024;//4KB

///<summary>

///This buffer is used to receive bytes

///</summary>

privatereadonlybyte[] _buffer;

///<summary>

///Socket object to send/reveice messages.

///</summary>

privatereadonlySocket_clientSocket;

///<summary>

///A flag to control thread's running

///</summary>

privatevolatilebool_running;

///<summary>

///This object is just used for thread synchronizing (locking).

///</summary>

privatereadonlyobject_syncLock;

privateBufferManagerreceiveBufferManager;

privateSocketAsyncEventArgsreceiveBuff =null;

#endregion

#regionConstructor

///<summary>

///Creates a new TcpCommunicationChannel object.

///</summary>

///<param name="clientSocket">A connected Socket object that is

///used to communicate over network</param>

publicTcpCommunicationChannel(SocketclientSocket)

{

_clientSocket = clientSocket;

_clientSocket.Blocking =false;

_buffer =newbyte[ReceiveBufferSize];

_syncLock =newobject();

Init();

}

privatevoidInit()

{

//初始化接收Socket缓存数据

receiveBufferManager =newBufferManager(ReceiveBufferSize*2, ReceiveBufferSize);

receiveBufferManager.InitBuffer();

receiveBuff =newSocketAsyncEventArgs();

receiveBuff.Completed += ReceiveIO_Completed;

receiveBufferManager.SetBuffer(receiveBuff);

//初始化发送Socket缓存数据

}

#endregion

#regionPublic methods

///<summary>

///Disconnects from remote application and closes channel.

///</summary>

publicvoidDisconnect()

{

_running =false;

receiveBuff.Completed -= ReceiveIO_Completed;

receiveBuff.Dispose();

if(_clientSocket.Connected)

{

_clientSocket.Close();

}

_clientSocket.Dispose();

}

#endregion

publicvoidStartReceive()

{

_running =true;

boolresult = _clientSocket.ReceiveAsync(receiveBuff);

}

privatevoidReceiveIO_Completed(objectsender,SocketAsyncEventArgse)

{

if(e.BytesTransferred > 0 && e.SocketError ==SocketError.Success && _clientSocket.Connected ==true&& e.LastOperation ==SocketAsyncOperation.Receive)

{

if(!_running)

{

return;

}

//Get received bytes count

DateTimereceiveTime =DateTime.Now;

//Copy received bytes to a new byte array

varreceivedBytes =newbyte[e.BytesTransferred];

Array.Copy(e.Buffer, 0, receivedBytes, 0, e.BytesTransferred);

//处理消息....

if(_running)

{

StartReceive();

}

}

}

///<summary>

///Sends a message to the remote application.

///</summary>

///<param name="message">Message to be sent</param>

publicvoidSendMessage(byte[] messageBytes)

{

//Send message

if(_clientSocket.Connected)

{

SocketAsyncEventArgsdata =newSocketAsyncEventArgs();

data.SocketFlags =SocketFlags.None;

data.Completed += (s, e) =>

{

e.Dispose();

};

data.SetBuffer(messageBytes, 0, messageBytes.Length);

//Console.WriteLine("发送:" + messageBytes.LongLength);

_clientSocket.SendAsync(data);

}

}

}

【转】.Net中的异步编程总结相关推荐

  1. 一文说通C#中的异步编程补遗

    前文写了关于C#中的异步编程.后台有无数人在讨论,很多人把异步和多线程混了. 文章在这儿:一文说通C#中的异步编程 所以,本文从体系的角度,再写一下这个异步编程.   一.C#中的异步编程演变 1. ...

  2. 一文说通C#中的异步编程

    天天写,不一定就明白. 又及,前两天看了一个关于同步方法中调用异步方法的文章,里面有些概念不太正确,所以整理了这个文章.   一.同步和异步. 先说同步. 同步概念大家都很熟悉.在异步概念出来之前,我 ...

  3. 了解和使用DotNetCore和Blazor中的异步编程

    目录 介绍 您对异步编程了解什么? 那么,什么是异步编程? 我们什么时候应该使用它? 任务.线程.计划.上下文 到底是怎么回事? Asnyc编码模式 命名约定 异步任务模式 任务模式 事件模式 阻塞与 ...

  4. .NET中的异步编程——常见的错误和最佳实践

    目录 背景 async void 没有线程 Foreach和属性 始终异步 在这篇文章中,我们将通过使用异步编程的一些最常见的错误来给你们一些参考. 背景 在之前的文章中,我们开始分析.NET世界中的 ...

  5. C#中的异步编程(Async)

    文章目录 C#中的异步编程(Async) 前言 示例代码 C#中的异步编程(Async) 前言 所谓的异步,就是指代码在运行的过程中,不会发生阻塞,例如我们玩游戏的时候,游戏在下载资源或者在加载本地资 ...

  6. .N“.NET研究”ET中的异步编程(二)- 传统的异步编程

    在上一篇文章中,我们从构建响应灵敏的界面以及构建高可伸缩性的服务应用来讨论我们为什么需要异步编程,异步编程能给我们带来哪些好处.那么知道了好处,我们就开始吧,但是在异步编程上海徐汇企业网站制作这个方面 ...

  7. NET中的异步编程(二)- 传统的异步编程

    转自:http://www.cnblogs.com/yuyijq/archive/2011/02/22/1960273.html 在上一篇文章中,我们从构建响应灵敏的界面以及构建高可伸缩性的服务应用来 ...

  8. 浅谈C#中的异步编程

    实现异步编程有4种方法可供选择,这4种访求实际上也对应着4种异步调用的模式,分为"等待"和"回调"两大类. Title一.使用EndInvoke: 二.使用Wa ...

  9. .NET中的异步编程(四)- IO完成端口以及FileStream.BeginRead

    本文首发在IT168 写这个系列原本的想法是讨论一下.NET中异步编程风格的变化,特别是F#中的异步工作流以及未来的.NET 5.0中的基于任务的异步编程模型.但经过三篇文章后很多人对IO异步背后实现 ...

最新文章

  1. heapq 对有序的数组列表进行整体排序
  2. layui上传报错会有哪些原因_数据丢失如何恢复?哪些原因会导致数据丢失
  3. CSS3菜单栏透明兼容问题
  4. 启明云端分享| sigmstar SSD201/SSD202D/SSD210/SSD212开机动画启动到底能做到多少
  5. String 转化 list
  6. 发布软件之前,怎样告诉用户怎么用
  7. 键盘录入一个字符串,判断是否是对称字符串
  8. python中垃圾回收机制_Python中的变量和垃圾回收机制
  9. 力扣116. 填充每个节点的下一个右侧节点指针(C++,附思路)
  10. JS If...Else
  11. 敏感性分析算法 程序_计算机程序设计艺术(TAOCP)精读笔记1 - 算法分析真正应该有的样子 Part 1...
  12. mac上 sublime的配置,支持c++11且支持输入
  13. uniapp开发的多端APP源码
  14. Eclipse Spring Tool Suite常用配置
  15. 蓝桥杯五4史丰收速算
  16. 计算机丢失文件无法打开ae,AE打开aep工程文件提示文件丢失的图文解决教程
  17. 小程序webview组件实践
  18. 1C.小a与星际探索(C++)
  19. 移动硬盘如何分区?分区软件推荐:
  20. 苏州新闻网V2.0 新版上线

热门文章

  1. kafkaspot在ack机制下如何保证内存不溢
  2. Storm的ack机制在项目应用中的坑
  3. SQL批量提交修改业务
  4. 构造函数和clone以及在继承中
  5. Dart基础学习02--变量及内置类型
  6. 2015.5.21 Core Java Volume 1
  7. [算法][算法复杂度]常用算法复杂度速查表
  8. [Leetcode][第1025题][JAVA][除数博弈][数学][递推]
  9. [Leedcode][JAVA][第139题][单词拆分][递归][记忆优化][动态规划]
  10. java三年,Java开发三年,你不得不了解的JVM(一)