最近同事对  .net core memcached 缓存客户端 EnyimMemcachedCore 进行了高并发下的压力测试,发现在 linux 上高并发下使用 async 异步方法读取缓存数据会出现大量失败的情况,比如在一次测试中,100万次读取缓存,只有12次成功,999988次失败,好恐怖。如果改为同步方法,没有一次失败,100%成功。奇怪的是,同样的压力测试程序在 Windows 上异步读取却没问题,100%成功。

排查后发现是2个地方使用的锁引起的,一个是 ManualResetEventSlim ,一个是 Semaphore ,这2个锁是在同步方法中使用的,但 aync 异步方法中调用了这2个同步方法,我们来分别看一下。

使用 ManualResetEventSlim 是在创建 Socket 连接时用于控制连接超时

var args = new SocketAsyncEventArgs();using (var mres = new ManualResetEventSlim()){    args.Completed += (s, e) => mres.Set();if (socket.ConnectAsync(args))    {if (!mres.Wait(timeout))        {throw new TimeoutException("Could not connect to " + endpoint);        }    }}

使用 Semaphore 是在从 EnyimMemcachedCore 自己实现的 Socket 连接池获取 Socket 连接时

if (!this.semaphore.WaitOne(this.queueTimeout)){    message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);    result.Fail(message, new TimeoutException());

// everyone is so busyreturn result;}


1)对于 ManualResetEventSlim ,参考 corefx 中 SqlClient 的 SNITcpHandle 的实现,改用 CancellationTokenSource 控制连接超时

var cts = new CancellationTokenSource();cts.CancelAfter(timeout);void Cancel(){if (!socket.Connected)    {        socket.Dispose();    }}cts.Token.Register(Cancel);

socket.Connect(endpoint);if (socket.Connected){    connected = true;}else{    socket.Dispose();}

2)对于 Semaphore ,根据同事提交的 PR ,将 Semaphore 换成 SemaphoreSlim ,用 SemaphoreSlim.WaitAsync 方法等待信号量锁

if (!await this.semaphore.WaitAsync(this.queueTimeout)){    message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);    result.Fail(message, new TimeoutException());

// everyone is so busyreturn result;}

改进后,压力测试结果立马与同步方法一样,100% 成功!


我们到 github 的 coreclr 仓库(针对 .net core 2.2)中看看 ManualResetEventSlim 与 Semaphore 的实现源码,看能否找到一些线索。


先看看 ManualResetEventSlim.Wait 方法的实现代码(523开始):

1)先 SpinWait 等待

var spinner = new SpinWait();while (spinner.Count < spinCount){    spinner.SpinOnce(sleep1Threshold: -1);

if (IsSet)    {return true;    }}

SpinWait 等待时间比较短,不会造成长时间阻塞线程。


2)然后 Monitor.Wait 等待

try{// ** the actual wait **if (!Monitor.Wait(m_lock, realMillisecondsTimeout))return false; //return immediately if the timeout has expired.}finally{// Clean up: we're done waiting.    Waiters = Waiters - 1;}

Monitor.Wait 对应的实现代码

[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj);

public static bool Wait(object obj, int millisecondsTimeout, bool exitContext){if (obj == null)throw (new ArgumentNullException(nameof(obj)));return ObjWait(exitContext, millisecondsTimeout, obj);}

最终调用的是一个本地库的 ObjWait 方法。

查阅一下 Monitor.Wait 方法的帮助文档:

Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.

Monitor.Wait 的确会阻塞当前线程,这在异步高并发下会带来问题,详见一码阻塞,万码等待:ASP.NET Core 同步方法调用异步方法“死锁”的真相。


再看看 Semaphore 的实现代码,它继承自 WaitHandle , Semaphore.Wait 实际调用的是 WaitHandle.Wait ,后者调用的是 WaitOneNative ,这是一个本地库的方法

[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);

.net core 3.0 中有些变化,这里调用的是 WaitOneCore 方法

[MethodImpl(MethodImplOptions.InternalCall)]private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);

查阅一下 WaitHandle.Wait 方法的帮助文档:

Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.

WaitHandle.Wait 也会阻塞当前线程。



接着阅读 SemaphoreSlim 的源码学习它是如何在 WaitAsync 中实现异步等待锁的?

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken){//...

lock (m_lockObj!)    {// If there are counts available, allow this waiter to succeed.if (m_currentCount > 0)        {--m_currentCount;if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();return s_trueTask;        }else if (millisecondsTimeout == 0)        {// No counts, if timeout is zero fail fastreturn s_falseTask;        }// If there aren't, create and return a task to the caller.// The task will be completed either when they've successfully acquired// the semaphore or when the timeout expired or cancellation was requested.else        {            Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");var asyncWaiter = CreateAndAddAsyncWaiter();return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?                asyncWaiter :                WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);        }    }}

重点看 else 部分的代码,SemaphoreSlim.WaitAsync 造了一个专门用于等待锁的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用于创建 TaskNode 的实例

private TaskNode CreateAndAddAsyncWaiter(){// Create the taskvar task = new TaskNode();

// Add it to the linked listif (m_asyncHead == null)    {        m_asyncHead = task;        m_asyncTail = task;    }else    {        m_asyncTail.Next = task;        task.Prev = m_asyncTail;        m_asyncTail = task;    }

// Hand it backreturn task;}

从上面的代码看到 TaskNode 用到了链表,神奇的等锁专用 Task —— TaskNode 是如何实现的呢?

private sealed class TaskNode : Task<bool>{internal TaskNode? Prev, Next;internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { }}


那 SemaphoreSlim.WaitAsync 如何用 TaskNode 实现指定了超时时间的锁等待?

看 WaitUntilCountOrTimeoutAsync 方法的实现源码:

private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken){// Wait until either the task is completed, timeout occurs, or cancellation is requested.// We need to ensure that the Task.Delay task is appropriately cleaned up if the await// completes due to the asyncWaiter completing, so we use our own token that we can explicitly// cancel, and we chain the caller's supplied token into it.using (var cts = cancellationToken.CanBeCanceled ?        CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :new CancellationTokenSource())    {var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));if (asyncWaiter == await waitCompleted.ConfigureAwait(false))        {            cts.Cancel(); // ensure that the Task.Delay task is cleaned upreturn true; // successfully acquired        }    }

// If we get here, the wait has timed out or been canceled.

// If the await completed synchronously, we still hold the lock.  If it didn't,// we no longer hold the lock.  As such, acquire it.lock (m_lockObj)    {// Remove the task from the list.  If we're successful in doing so,// we know that no one else has tried to complete this waiter yet,// so we can safely cancel or timeout.if (RemoveAsyncWaiter(asyncWaiter))        {            cancellationToken.ThrowIfCancellationRequested(); // cancellation occurredreturn false; // timeout occurred        }    }

// The waiter had already been removed, which means it's already completed or is about to// complete, so let it, and don't return until it does.return await asyncWaiter.ConfigureAwait(false);}

用 Task.WhenAny 等待 TaskNode 与 Task.Delay ,等其中任一者先完成,简单到可怕。

又一次通过 .net core 源码欣赏了高手是怎么玩转 Task 的。


今天将 Task.WhenAny + Task.Delay 的招式用到了异步连接 Socket 的超时控制中

var connTask = _socket.ConnectAsync(_endpoint);if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask){await connTask;}


