一次 .NET Core 中玩锁的经历:ManualResetEventSlim, SemaphoreSlim
最近同事对 .net core memcached 缓存客户端 EnyimMemcachedCore 进行了高并发下的压力测试,发现在 linux 上高并发下使用 async 异步方法读取缓存数据会出现大量失败的情况,比如在一次测试中,100万次读取缓存,只有12次成功,999988次失败,好恐怖。如果改为同步方法,没有一次失败,100%成功。奇怪的是,同样的压力测试程序在 Windows 上异步读取却没问题,100%成功。
排查后发现是2个地方使用的锁引起的,一个是 ManualResetEventSlim ,一个是 Semaphore ,这2个锁是在同步方法中使用的,但 aync 异步方法中调用了这2个同步方法,我们来分别看一下。
使用 ManualResetEventSlim 是在创建 Socket 连接时用于控制连接超时
var args = new SocketAsyncEventArgs();using (var mres = new ManualResetEventSlim()){ args.Completed += (s, e) => mres.Set();if (socket.ConnectAsync(args)) {if (!mres.Wait(timeout)) {throw new TimeoutException("Could not connect to " + endpoint); } }}
使用 Semaphore 是在从 EnyimMemcachedCore 自己实现的 Socket 连接池获取 Socket 连接时
if (!this.semaphore.WaitOne(this.queueTimeout)){ message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message); result.Fail(message, new TimeoutException()); // everyone is so busyreturn result;}
为了弃用这个2个锁造成的异步并发问题,采取了下面2个改进措施:
1)对于 ManualResetEventSlim ,参考 corefx 中 SqlClient 的 SNITcpHandle 的实现,改用 CancellationTokenSource 控制连接超时
var cts = new CancellationTokenSource();cts.CancelAfter(timeout);void Cancel(){if (!socket.Connected) { socket.Dispose(); }}cts.Token.Register(Cancel); socket.Connect(endpoint);if (socket.Connected){ connected = true;}else{ socket.Dispose();}
2)对于 Semaphore ,根据同事提交的 PR ,将 Semaphore 换成 SemaphoreSlim ,用 SemaphoreSlim.WaitAsync 方法等待信号量锁
if (!await this.semaphore.WaitAsync(this.queueTimeout)){ message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message); result.Fail(message, new TimeoutException()); // everyone is so busyreturn result;}
改进后,压力测试结果立马与同步方法一样,100% 成功!
为什么会这样?
我们到 github 的 coreclr 仓库(针对 .net core 2.2)中看看 ManualResetEventSlim 与 Semaphore 的实现源码,看能否找到一些线索。
(一)
先看看 ManualResetEventSlim.Wait 方法的实现代码(523开始):
1)先 SpinWait 等待
var spinner = new SpinWait();while (spinner.Count < spinCount){ spinner.SpinOnce(sleep1Threshold: -1); if (IsSet) {return true; }}
SpinWait 等待时间比较短,不会造成长时间阻塞线程。
在高并发下大量线程在争抢锁,所以大量线程在这个阶段等不到锁。
2)然后 Monitor.Wait 等待
try{// ** the actual wait **if (!Monitor.Wait(m_lock, realMillisecondsTimeout))return false; //return immediately if the timeout has expired.}finally{// Clean up: we're done waiting. Waiters = Waiters - 1;}
Monitor.Wait 对应的实现代码
[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj); public static bool Wait(object obj, int millisecondsTimeout, bool exitContext){if (obj == null)throw (new ArgumentNullException(nameof(obj)));return ObjWait(exitContext, millisecondsTimeout, obj);}
最终调用的是一个本地库的 ObjWait 方法。
查阅一下 Monitor.Wait 方法的帮助文档:
Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.
Monitor.Wait 的确会阻塞当前线程,这在异步高并发下会带来问题,详见一码阻塞,万码等待:ASP.NET Core 同步方法调用异步方法“死锁”的真相。
(二)
再看看 Semaphore 的实现代码,它继承自 WaitHandle , Semaphore.Wait 实际调用的是 WaitHandle.Wait ,后者调用的是 WaitOneNative ,这是一个本地库的方法
[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);
.net core 3.0 中有些变化,这里调用的是 WaitOneCore 方法
[MethodImpl(MethodImplOptions.InternalCall)]private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);
查阅一下 WaitHandle.Wait 方法的帮助文档:
Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.
WaitHandle.Wait 也会阻塞当前线程。
2个地方在等待锁时都会阻塞线程,难怪高并发下会出问题。
(三)
接着阅读 SemaphoreSlim 的源码学习它是如何在 WaitAsync 中实现异步等待锁的?
public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken){//... lock (m_lockObj!) {// If there are counts available, allow this waiter to succeed.if (m_currentCount > 0) {--m_currentCount;if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();return s_trueTask; }else if (millisecondsTimeout == 0) {// No counts, if timeout is zero fail fastreturn s_falseTask; }// If there aren't, create and return a task to the caller.// The task will be completed either when they've successfully acquired// the semaphore or when the timeout expired or cancellation was requested.else { Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");var asyncWaiter = CreateAndAddAsyncWaiter();return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ? asyncWaiter : WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken); } }}
重点看 else 部分的代码,SemaphoreSlim.WaitAsync 造了一个专门用于等待锁的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用于创建 TaskNode 的实例
private TaskNode CreateAndAddAsyncWaiter(){// Create the taskvar task = new TaskNode(); // Add it to the linked listif (m_asyncHead == null) { m_asyncHead = task; m_asyncTail = task; }else { m_asyncTail.Next = task; task.Prev = m_asyncTail; m_asyncTail = task; } // Hand it backreturn task;}
从上面的代码看到 TaskNode 用到了链表,神奇的等锁专用 Task —— TaskNode 是如何实现的呢?
private sealed class TaskNode : Task<bool>{internal TaskNode? Prev, Next;internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { }}
好简单!
那 SemaphoreSlim.WaitAsync 如何用 TaskNode 实现指定了超时时间的锁等待?
看 WaitUntilCountOrTimeoutAsync 方法的实现源码:
private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken){// Wait until either the task is completed, timeout occurs, or cancellation is requested.// We need to ensure that the Task.Delay task is appropriately cleaned up if the await// completes due to the asyncWaiter completing, so we use our own token that we can explicitly// cancel, and we chain the caller's supplied token into it.using (var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :new CancellationTokenSource()) {var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));if (asyncWaiter == await waitCompleted.ConfigureAwait(false)) { cts.Cancel(); // ensure that the Task.Delay task is cleaned upreturn true; // successfully acquired } } // If we get here, the wait has timed out or been canceled. // If the await completed synchronously, we still hold the lock. If it didn't,// we no longer hold the lock. As such, acquire it.lock (m_lockObj) {// Remove the task from the list. If we're successful in doing so,// we know that no one else has tried to complete this waiter yet,// so we can safely cancel or timeout.if (RemoveAsyncWaiter(asyncWaiter)) { cancellationToken.ThrowIfCancellationRequested(); // cancellation occurredreturn false; // timeout occurred } } // The waiter had already been removed, which means it's already completed or is about to// complete, so let it, and don't return until it does.return await asyncWaiter.ConfigureAwait(false);}
用 Task.WhenAny 等待 TaskNode 与 Task.Delay ,等其中任一者先完成,简单到可怕。
又一次通过 .net core 源码欣赏了高手是怎么玩转 Task 的。
【2019-5-6更新】
今天将 Task.WhenAny + Task.Delay 的招式用到了异步连接 Socket 的超时控制中
var connTask = _socket.ConnectAsync(_endpoint);if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask){await connTask;}
原文地址:https://www.cnblogs.com/dudu/p/10812139.html
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com
一次 .NET Core 中玩锁的经历:ManualResetEventSlim, SemaphoreSlim相关推荐
- 一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim
最近同事对 .net core memcached 缓存客户端 EnyimMemcachedCore 进行了高并发下的压力测试,发现在 linux 上高并发下使用 async 异步方法读取缓存数据会 ...
- .NET Core 中的并发编程
今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点. 然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序 ...
- ASP.NET Core 中文文档 第三章 原理(13)管理应用程序状态
原文:Managing Application State 作者:Steve Smith 翻译:姚阿勇(Dr.Yao) 校对:高嵩 在 ASP.NET Core 中,有多种途径可以对应用程序的状态进行 ...
- 在 .NET Core 中如何让 Entity Framework Core 在日志中记录由 LINQ 生成的SQL语句
在开发中,我们想在调试中查看EF Core执行的sql语句,可以使用SQL Studio Manager Tools工具,另一种方式是使用EF Core提供的日志.在ASP.NET Core使用Ent ...
- 如何在 ASP.Net Core 中使用 Lamar
ASP.Net Core 自带了一个极简的 开箱即用 的依赖注入容器,实际上,你还可以使用第三方的 依赖注入容器 来替代它,依赖注入是一种设计模式,它能够有效的实现对象之间的解耦并有利于提高单元测试和 ...
- 在.NET Core中使用Channel(一)
我最近一直在熟悉.net Core中引入的新Channel<T>类型.我想在它第一次发布的时候我了解过它,但是有关文章非常非常少,我不能理解它们与其他队列有什么不同. 在使用了一段时间后, ...
- Windows新终端中玩转ASCII和Emoji游戏的正确姿势
前一段时间,我搬运了几个Windows Terminal中玩游戏的视频. Windows Terminal - 动图GIF作背景图 Windows Terminal - 母牛说Hi Windows T ...
- 中间件是什么?在.NET Core中的工作原理又是怎样的呢?
本文出自<从零开始学ASP.NET CORE MVC> 推荐文章:ASP.NET Core appsettings.json文件 ASP.NET Core 中的中间件(Middleware ...
- 在 .NET Core 中的并发编程
原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今天我们购买的每台电脑都有一个多核心的 C ...
最新文章
- junit5_使用Junit测试名称
- HTML5本地存储 localStorage
- .Net开发3年,应聘大厂惨遭淘汰,如何翻身打脸面试官?
- 网络安全-使用HTTP动词篡改的认证旁路
- 程序员常用字体(vs2008字体修改方案)
- 预览文章: 猿们平常都喜欢听啥音乐?
- X86汇编语言从实模式到保护模式05:循环、批量传送和条件转移
- MathType 的使用
- 党媒发声IT圈里的35岁现象
- 实战项目 仿写小米商城 网页框架
- 单循环比赛赛程 java
- pascal编游戏攻略
- 惠普计算机如何用u盘引导启动不了系统安装系统,惠普笔记本进BIOS设置U盘启动教程...
- 排列组合数计算公式及性质
- iptables四表五链及基本使用
- 我也来谈谈《我不是药神》这部电影
- Android Activity 生命周期和重要的相关函数(基础一)
- web work 。。。
- vue使用echarts图表自适应的几种解决方案
- IT新人到底该不该去外包公司?
热门文章
- 华为堡垒机_安恒信息成为“华为云优秀严选合作伙伴”,携手保障“云上”资产安全访问...
- confd_confd + Nacos | 无代码侵入的配置变更管理
- 【20181026T2】**图【最小瓶颈路+非旋Treap+启发式合并】
- 上周面试回来后写的Java面试总结,想进BAT必看
- 《深入实践Spring Boot》下载
- curl liinux下http命令执行工具
- 20161114记录一件工作的事
- Maven搭建SpringMVC+Mybatis项目详解【转】
- 跳槽9招让你“空降”任何企业都能成功
- CSS text-indent 属性