最近同事对  .net core memcached 缓存客户端 EnyimMemcachedCore 进行了高并发下的压力测试,发现在 linux 上高并发下使用 async 异步方法读取缓存数据会出现大量失败的情况,比如在一次测试中,100万次读取缓存,只有12次成功,999988次失败,好恐怖。如果改为同步方法,没有一次失败,100%成功。奇怪的是,同样的压力测试程序在 Windows 上异步读取却没问题,100%成功。

排查后发现是2个地方使用的锁引起的,一个是 ManualResetEventSlim ,一个是 Semaphore ,这2个锁是在同步方法中使用的,但 aync 异步方法中调用了这2个同步方法,我们来分别看一下。

使用 ManualResetEventSlim 是在创建 Socket 连接时用于控制连接超时

var args = new SocketAsyncEventArgs();using (var mres = new ManualResetEventSlim()){    args.Completed += (s, e) => mres.Set();if (socket.ConnectAsync(args))    {if (!mres.Wait(timeout))        {throw new TimeoutException("Could not connect to " + endpoint);        }    }}

使用 Semaphore 是在从 EnyimMemcachedCore 自己实现的 Socket 连接池获取 Socket 连接时

if (!this.semaphore.WaitOne(this.queueTimeout)){    message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);    result.Fail(message, new TimeoutException());

// everyone is so busyreturn result;}

为了弃用这个2个锁造成的异步并发问题,采取了下面2个改进措施:

1)对于 ManualResetEventSlim ,参考 corefx 中 SqlClient 的 SNITcpHandle 的实现,改用 CancellationTokenSource 控制连接超时

var cts = new CancellationTokenSource();cts.CancelAfter(timeout);void Cancel(){if (!socket.Connected)    {        socket.Dispose();    }}cts.Token.Register(Cancel);

socket.Connect(endpoint);if (socket.Connected){    connected = true;}else{    socket.Dispose();}

2)对于 Semaphore ,根据同事提交的 PR ,将 Semaphore 换成 SemaphoreSlim ,用 SemaphoreSlim.WaitAsync 方法等待信号量锁

if (!await this.semaphore.WaitAsync(this.queueTimeout)){    message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);    result.Fail(message, new TimeoutException());

// everyone is so busyreturn result;}

改进后,压力测试结果立马与同步方法一样,100% 成功!

为什么会这样?

我们到 github 的 coreclr 仓库(针对 .net core 2.2)中看看 ManualResetEventSlim 与 Semaphore 的实现源码,看能否找到一些线索。

(一)

先看看 ManualResetEventSlim.Wait 方法的实现代码(523开始):

1)先 SpinWait 等待

var spinner = new SpinWait();while (spinner.Count < spinCount){    spinner.SpinOnce(sleep1Threshold: -1);

if (IsSet)    {return true;    }}

SpinWait 等待时间比较短,不会造成长时间阻塞线程。

在高并发下大量线程在争抢锁,所以大量线程在这个阶段等不到锁。

2)然后 Monitor.Wait 等待

try{// ** the actual wait **if (!Monitor.Wait(m_lock, realMillisecondsTimeout))return false; //return immediately if the timeout has expired.}finally{// Clean up: we're done waiting.    Waiters = Waiters - 1;}

Monitor.Wait 对应的实现代码

[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj);

public static bool Wait(object obj, int millisecondsTimeout, bool exitContext){if (obj == null)throw (new ArgumentNullException(nameof(obj)));return ObjWait(exitContext, millisecondsTimeout, obj);}

最终调用的是一个本地库的 ObjWait 方法。

查阅一下 Monitor.Wait 方法的帮助文档:

Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.

Monitor.Wait 的确会阻塞当前线程,这在异步高并发下会带来问题,详见一码阻塞,万码等待:ASP.NET Core 同步方法调用异步方法“死锁”的真相。

(二)

再看看 Semaphore 的实现代码,它继承自 WaitHandle , Semaphore.Wait 实际调用的是 WaitHandle.Wait ,后者调用的是 WaitOneNative ,这是一个本地库的方法

[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);

.net core 3.0 中有些变化,这里调用的是 WaitOneCore 方法

[MethodImpl(MethodImplOptions.InternalCall)]private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);

查阅一下 WaitHandle.Wait 方法的帮助文档:

Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.

WaitHandle.Wait 也会阻塞当前线程。

2个地方在等待锁时都会阻塞线程,难怪高并发下会出问题。

(三)

接着阅读 SemaphoreSlim 的源码学习它是如何在 WaitAsync 中实现异步等待锁的?

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken){//...

lock (m_lockObj!)    {// If there are counts available, allow this waiter to succeed.if (m_currentCount > 0)        {--m_currentCount;if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();return s_trueTask;        }else if (millisecondsTimeout == 0)        {// No counts, if timeout is zero fail fastreturn s_falseTask;        }// If there aren't, create and return a task to the caller.// The task will be completed either when they've successfully acquired// the semaphore or when the timeout expired or cancellation was requested.else        {            Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");var asyncWaiter = CreateAndAddAsyncWaiter();return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?                asyncWaiter :                WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);        }    }}

重点看 else 部分的代码,SemaphoreSlim.WaitAsync 造了一个专门用于等待锁的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用于创建 TaskNode 的实例

private TaskNode CreateAndAddAsyncWaiter(){// Create the taskvar task = new TaskNode();

// Add it to the linked listif (m_asyncHead == null)    {        m_asyncHead = task;        m_asyncTail = task;    }else    {        m_asyncTail.Next = task;        task.Prev = m_asyncTail;        m_asyncTail = task;    }

// Hand it backreturn task;}

从上面的代码看到 TaskNode 用到了链表,神奇的等锁专用 Task —— TaskNode 是如何实现的呢?

private sealed class TaskNode : Task<bool>{internal TaskNode? Prev, Next;internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { }}

好简单!

那 SemaphoreSlim.WaitAsync 如何用 TaskNode 实现指定了超时时间的锁等待?

看 WaitUntilCountOrTimeoutAsync 方法的实现源码:

private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken){// Wait until either the task is completed, timeout occurs, or cancellation is requested.// We need to ensure that the Task.Delay task is appropriately cleaned up if the await// completes due to the asyncWaiter completing, so we use our own token that we can explicitly// cancel, and we chain the caller's supplied token into it.using (var cts = cancellationToken.CanBeCanceled ?        CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :new CancellationTokenSource())    {var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));if (asyncWaiter == await waitCompleted.ConfigureAwait(false))        {            cts.Cancel(); // ensure that the Task.Delay task is cleaned upreturn true; // successfully acquired        }    }

// If we get here, the wait has timed out or been canceled.

// If the await completed synchronously, we still hold the lock.  If it didn't,// we no longer hold the lock.  As such, acquire it.lock (m_lockObj)    {// Remove the task from the list.  If we're successful in doing so,// we know that no one else has tried to complete this waiter yet,// so we can safely cancel or timeout.if (RemoveAsyncWaiter(asyncWaiter))        {            cancellationToken.ThrowIfCancellationRequested(); // cancellation occurredreturn false; // timeout occurred        }    }

// The waiter had already been removed, which means it's already completed or is about to// complete, so let it, and don't return until it does.return await asyncWaiter.ConfigureAwait(false);}

用 Task.WhenAny 等待 TaskNode 与 Task.Delay ,等其中任一者先完成,简单到可怕。

又一次通过 .net core 源码欣赏了高手是怎么玩转 Task 的。

【2019-5-6更新】

今天将 Task.WhenAny + Task.Delay 的招式用到了异步连接 Socket 的超时控制中

var connTask = _socket.ConnectAsync(_endpoint);if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask){await connTask;}

原文地址:https://www.cnblogs.com/dudu/p/10812139.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 

一次 .NET Core 中玩锁的经历:ManualResetEventSlim, SemaphoreSlim相关推荐

  1. 一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim

    最近同事对  .net core memcached 缓存客户端 EnyimMemcachedCore 进行了高并发下的压力测试,发现在 linux 上高并发下使用 async 异步方法读取缓存数据会 ...

  2. .NET Core 中的并发编程

    今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点. 然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序 ...

  3. ASP.NET Core 中文文档 第三章 原理(13)管理应用程序状态

    原文:Managing Application State 作者:Steve Smith 翻译:姚阿勇(Dr.Yao) 校对:高嵩 在 ASP.NET Core 中,有多种途径可以对应用程序的状态进行 ...

  4. 在 .NET Core 中如何让 Entity Framework Core 在日志中记录由 LINQ 生成的SQL语句

    在开发中,我们想在调试中查看EF Core执行的sql语句,可以使用SQL Studio Manager Tools工具,另一种方式是使用EF Core提供的日志.在ASP.NET Core使用Ent ...

  5. 如何在 ASP.Net Core 中使用 Lamar

    ASP.Net Core 自带了一个极简的 开箱即用 的依赖注入容器,实际上,你还可以使用第三方的 依赖注入容器 来替代它,依赖注入是一种设计模式,它能够有效的实现对象之间的解耦并有利于提高单元测试和 ...

  6. 在.NET Core中使用Channel(一)

    我最近一直在熟悉.net Core中引入的新Channel<T>类型.我想在它第一次发布的时候我了解过它,但是有关文章非常非常少,我不能理解它们与其他队列有什么不同. 在使用了一段时间后, ...

  7. Windows新终端中玩转ASCII和Emoji游戏的正确姿势

    前一段时间,我搬运了几个Windows Terminal中玩游戏的视频. Windows Terminal - 动图GIF作背景图 Windows Terminal - 母牛说Hi Windows T ...

  8. 中间件是什么?在.NET Core中的工作原理又是怎样的呢?

    本文出自<从零开始学ASP.NET CORE MVC> 推荐文章:ASP.NET Core appsettings.json文件 ASP.NET Core 中的中间件(Middleware ...

  9. 在 .NET Core 中的并发编程

    原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今天我们购买的每台电脑都有一个多核心的 C ...

最新文章

  1. junit5_使用Junit测试名称
  2. HTML5本地存储 localStorage
  3. .Net开发3年,应聘大厂惨遭淘汰,如何翻身打脸面试官?
  4. 网络安全-使用HTTP动词篡改的认证旁路
  5. 程序员常用字体(vs2008字体修改方案)
  6. 预览文章: 猿们平常都喜欢听啥音乐?
  7. X86汇编语言从实模式到保护模式05:循环、批量传送和条件转移
  8. MathType 的使用
  9. 党媒发声IT圈里的35岁现象
  10. 实战项目 仿写小米商城 网页框架
  11. 单循环比赛赛程 java
  12. pascal编游戏攻略
  13. 惠普计算机如何用u盘引导启动不了系统安装系统,惠普笔记本进BIOS设置U盘启动教程...
  14. 排列组合数计算公式及性质
  15. iptables四表五链及基本使用
  16. 我也来谈谈《我不是药神》这部电影
  17. Android Activity 生命周期和重要的相关函数(基础一)
  18. web work 。。。
  19. vue使用echarts图表自适应的几种解决方案
  20. IT新人到底该不该去外包公司?

热门文章

  1. 华为堡垒机_安恒信息成为“华为云优秀严选合作伙伴”,携手保障“云上”资产安全访问...
  2. confd_confd + Nacos | 无代码侵入的配置变更管理
  3. 【20181026T2】**图【最小瓶颈路+非旋Treap+启发式合并】
  4. 上周面试回来后写的Java面试总结,想进BAT必看
  5. 《深入实践Spring Boot》下载
  6. curl   liinux下http命令执行工具
  7. 20161114记录一件工作的事
  8. Maven搭建SpringMVC+Mybatis项目详解【转】
  9. 跳槽9招让你“空降”任何企业都能成功
  10. CSS text-indent 属性