IOCP Thread Pool 在 C# 的Safe实现

IOCP是一种高性能的I/O模型,更多资料可以google下。
    在.Net Framework下,没有提供IOCP的类库,我们需要引入Win32 API来建立IOCP Thread Pool。

[DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean CloseHandle(SafeHandle hObject); /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped); /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool. /// All threads in the pool wait in this Win32 Function </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds);

我们用CreateIOCompletionPort获得一个IOCP对象的句柄,用PostQueuedCompetionStatus把状态对象(Socket编程下一般系传socket)放进队列,开启一定量线程来运行GetQueuedCompletionStatus监听,GetQueuedCompletionStatus函数会阻塞调用线程。

由于与非托管代码打交道,要实现Safe的代码,有几点需要注意。

1.我们要传递一个状态对象地址给非托管代码,由于GC的关系,我们不能直接传递地址,因为GC在回收的过程中,会移动在堆上的对象,造成地址改变。一般来说,GC移动会修改托管代码里面地址指向,但我们现在把地址传递出托管代码范围,GC也无能为力了。情况如图。

针对这种情况,可以用GCHandle类解决,GCHandle类的Alloc方法为对象注册,Alloc方法有两个参数,第二个参数系GCHandleType类型枚举,默认情况系Normal。当我们要GC不移动对象的时候,例如有个byte[]的Buffer需要非托管代码填充,可以使用Pinned。

GCHandle gch = GCHandle.Alloc(obj);
PostQueuedCompletionStatus(GetHandle, (uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch);

2.PostQueuedCompletionStatus函数的第二个参数是要传送数据的长度,直接使用sizeof系unsafe代码,这里使用Marshal.SizeOf方法。

3.还有需要注意的是,传递的对象必须实现[StructLayout(LayoutKind.Sequential)]标签,以保证该对象的成员在内存里连续分配,遵守C++方式。

4.利用SafeFileHandle引用内核对象更安全,这个类能实现引用计数。

至此,已经讲述完实现Safe代码的要点了。

IOCPThreadPool的实现代码:

using System; using System.Threading; using System.Runtime.InteropServices; using Microsoft.Win32.SafeHandles; namespace Continuum.SafeThreading { [StructLayout(LayoutKind.Sequential)] public class MyData { private int value; public int Value { get { return value; } set { this.value = value; } } } // Classes //============================================ /// <summary> This class provides the ability to create a thread pool to manage work. The /// class abstracts the Win32 IOCompletionPort API so it requires the use of /// unmanaged code. Unfortunately the .NET framework does not provide this functionality </summary> public sealed class SafeIOCPThreadPool { // Win32 Function Prototypes /// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean CloseHandle(SafeHandle hObject); /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped); /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool. /// All threads in the pool wait in this Win32 Function </summary> [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds); // Constants /// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary> private readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1); /// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary> private readonly UInt32 INIFINITE = 0xffffffff; /// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary> private readonly IntPtr SHUTDOWN_IOCPTHREAD = new IntPtr(0x7fffffff); // Delegate Function Types /// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary> public delegate void USER_FUNCTION(MyData obj); // Private Properties private SafeFileHandle m_hHandle; /// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary> private SafeFileHandle GetHandle { get { return m_hHandle; } set { m_hHandle = value; } } private Int32 m_uiMaxConcurrency; /// <summary> SimType: The maximum number of threads that may be running at the same time </summary> private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } private Int32 m_iMinThreadsInPool; /// <summary> SimType: The minimal number of threads the thread pool maintains </summary> private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } private Int32 m_iMaxThreadsInPool; /// <summary> SimType: The maximum number of threads the thread pool maintains </summary> private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } private Object m_pCriticalSection; /// <summary> RefType: A serialization object to protect the class state </summary> private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } } private USER_FUNCTION m_pfnUserFunction; /// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary> private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } private Boolean m_bDisposeFlag; /// <summary> SimType: Flag to indicate if the class is disposing </summary> private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } // Public Properties private Int32 m_iCurThreadsInPool; /// <summary> SimType: The current number of threads in the thread pool </summary> public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } } /// <summary> SimType: Increment current number of threads in the thread pool </summary> private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); } /// <summary> SimType: Decrement current number of threads in the thread pool </summary> private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); } private Int32 m_iActThreadsInPool; /// <summary> SimType: The current number of active threads in the thread pool </summary> public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } } /// <summary> SimType: Increment current number of active threads in the thread pool </summary> private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); } /// <summary> SimType: Decrement current number of active threads in the thread pool </summary> private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); } private Int32 m_iCurWorkInPool; /// <summary> SimType: The current number of Work posted in the thread pool </summary> public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } } /// <summary> SimType: Increment current number of Work posted in the thread pool </summary> private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); } /// <summary> SimType: Decrement current number of Work posted in the thread pool </summary> private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); } // Constructor, Finalize, and Dispose //*********************************************** /// <summary> Constructor </summary> /// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param> /// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param> /// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param> /// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param> /// <exception cref = "Exception"> Unhandled Exception </exception> public SafeIOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction) { try { // Set initial class state GetMaxConcurrency = iMaxConcurrency; GetMinThreadsInPool = iMinThreadsInPool; GetMaxThreadsInPool = iMaxThreadsInPool; GetUserFunction = pfnUserFunction; // Init the thread counters GetCurThreadsInPool = 0; GetActThreadsInPool = 0; GetCurWorkInPool = 0; // Initialize the Monitor Object GetCriticalSection = new Object(); // Set the disposing flag to false IsDisposed = false; // Create an IO Completion Port for Thread Pool use GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, (UInt32)GetMaxConcurrency); // Test to make sure the IO Completion Port was created if (GetHandle.IsInvalid) throw new Exception("Unable To Create IO Completion Port"); // Allocate and start the Minimum number of threads specified Int32 iStartingCount = GetCurThreadsInPool; ThreadStart tsThread = new ThreadStart(IOCPFunction); for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread) { // Create a thread and start it Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); Console.WriteLine(thThread.Name); } } catch (Exception) { throw; } } //*********************************************** /// <summary> Finalize called by the GC </summary> ~SafeIOCPThreadPool() { if (!IsDisposed) Dispose(); } //********************************************** /// <summary> Called when the object will be shutdown. This /// function will wait for all of the work to be completed /// inside the queue before completing </summary> public void Dispose() { try { // Flag that we are disposing this object IsDisposed = true; // Get the current number of threads in the pool Int32 iCurThreadsInPool = GetCurThreadsInPool; // Shutdown all thread in the pool for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread) { bool bret = PostQueuedCompletionStatus(GetHandle, 4, SHUTDOWN_IOCPTHREAD, IntPtr.Zero); } // Wait here until all the threads are gone while (GetCurThreadsInPool != 0) Thread.Sleep(100); // Close the IOCP Handle CloseHandle(GetHandle); } catch { } } // Private Methods //******************************************* /// <summary> IOCP Worker Function that calls the specified user function </summary> private void IOCPFunction() { UInt32 uiNumberOfBytes; IntPtr dwCompletionKey; IntPtr lpOverlapped; try { while (true) { // Wait for an event GetQueuedCompletionStatus(GetHandle, out uiNumberOfBytes, out dwCompletionKey, out lpOverlapped, INIFINITE); if(uiNumberOfBytes <= 0) { continue; } // Decrement the number of events in queue DecCurWorkInPool(); // Was this thread told to shutdown if (dwCompletionKey == SHUTDOWN_IOCPTHREAD) break; // Increment the number of active threads IncActThreadsInPool(); try { // Call the user function GCHandle gch = GCHandle.FromIntPtr(lpOverlapped); MyData obj = (MyData) gch.Target; GetUserFunction(obj); } catch { throw; } // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Relase the lock Monitor.Exit(GetCriticalSection); // Increment the number of active threads DecActThreadsInPool(); } } catch { } // Decrement the thread pool count DecCurThreadsInPool(); } // Public Methods //****************************************** /// <summary> IOCP Worker Function that calls the specified user function </summary> /// <param name="obj"> SimType: A value to be passed with the event </param> /// <exception cref = "Exception"> Unhandled Exception </exception> public void PostEvent(MyData obj) { try { // Only add work if we are not disposing if (IsDisposed == false) { // Post an event into the IOCP Thread Pool GCHandle gch = GCHandle.Alloc(obj); PostQueuedCompletionStatus(GetHandle, (uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch); // Increment the number of item of work IncCurWorkInPool(); // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Release the lock Monitor.Exit(GetCriticalSection); } } catch (Exception e) { throw e; } catch { throw new Exception("Unhandled Exception"); } } //***************************************** /// <summary> IOCP Worker Function that calls the specified user function </summary> /// <exception cref = "Exception"> Unhandled Exception </exception> public void PostEvent() { try { // Only add work if we are not disposing if (IsDisposed == false) { // Post an event into the IOCP Thread Pool PostQueuedCompletionStatus(GetHandle, 0, IntPtr.Zero, IntPtr.Zero); // Increment the number of item of work IncCurWorkInPool(); // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Release the lock Monitor.Exit(GetCriticalSection); } } catch (Exception e) { throw e; } catch { throw new Exception("Unhandled Exception"); } } } }
测试代码:

using System; using System.Threading; // Included for the Thread.Sleep call using Continuum.SafeThreading; namespace SafeSample { //============================================ /// <summary> Sample class for the threading class </summary> public class UtilThreadingSample { //******************************************* /// <summary> Test Method </summary> static void Main() { // Create the MSSQL IOCP Thread Pool SafeIOCPThreadPool pThreadPool = new SafeIOCPThreadPool(0, 5, 10, new SafeIOCPThreadPool.USER_FUNCTION(IOCPThreadFunction)); for (int i = 0; i < 100; i++) { pThreadPool.PostEvent(new MyData(){Value = i}); } pThreadPool.Dispose(); Console.WriteLine("Disposed"); Console.ReadLine(); } private static object syncRoot = new object(); //***************************************** /// <summary> Function to be called by the IOCP thread pool. Called when /// a command is posted for processing by the SocketManager </summary> /// <param name="obj"> The value provided by the thread posting the event </param> static public void IOCPThreadFunction(MyData obj) { try { Console.WriteLine("Value: {0},Thread:{1}", obj.Value, Thread.CurrentThread.Name); } catch (Exception pException) { Console.WriteLine(pException.Message); } } } }
参考资料:

IOCP Thread Pooling in C# - Part I
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-I/

IOCP Thread Pooling in C# - Part II
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-II/

蛙蛙推荐:在c#使用IOCP(完成端口)的简单示例
http://www.cnblogs.com/onlytiancai/archive/2009/01/05/1241571.html

IOCP Thread Pool 在 C# 的Safe实现相关推荐

  1. mysql5.6 thread pool_mysql5.6 thread pool

    从percona 的压测来看,确实很牛笔啊.提升很大. http://www.mysqlperformanceblog.com/2014/01/29/percona-server-thread-poo ...

  2. Reporting Service 告警w WARN: Thread pool pressure. Using current thread for a work item

    如果Reporting Service偶尔出现不可访问或访问出错情况,这种情况一般没有做监控的话,很难捕捉到.出现这种问题,最好检查Reporting Service的日志文件. 今天早上就遇到这样一 ...

  3. 【案例】常驻查询引发的thread pool 性能问题之二

    一 现象     某业务单机4个实例中的一个实例出现连接数远高于其他三个实例(正常是4K,问题实例是8K+),但是这4个实例的配置完全相同.业务开发反馈为部分连接失败.     执行show proc ...

  4. 白话Elasticsearch67-不随意调节jvm和thread pool的原因jvm和服务器内存分配的最佳实践

    文章目录 概述 不随意调节jvm和thread pool的原因 jvm gc threadpool jvm和服务器内存分配的最佳实践 jvm heap分配 将机器上少于一半的内存分配给es 为什么不要 ...

  5. Thread pool引起的程序连接数据库响应慢

    数据库版本:percona-mysql 5.6.16 ​在很长一段时间,都会出现程序连接数据库,出现响应慢的情况,正常在几到几十毫秒之间,但是偶尔会出现上百毫秒的情况: 开始由于开发重新设置并调整过程 ...

  6. 自定义parallelStream的thread pool

    文章目录 简介 通常操作 使用自定义ForkJoinPool 总结 自定义parallelStream的thread pool 简介 之前我们讲到parallelStream的底层使用到了ForkJo ...

  7. worksteal thread pool

    worksteal的场景 对于一个线程池,每个线程有一个队列,想象这种场景,有的线程队列中有大量的比较耗时的任务堆积,而有的线程队列却是空的,现象就是有的线程处于饥饿状态,而有的线程处于消化不良的状态 ...

  8. MySQL Thread pool 操作过程

    Thread pool 操作过程:      thread pool 包含一定数量的 thread groups,每个groups 管理一定量的client connections,当mysql建立 ...

  9. Dubbo 线上 Thread pool is EXHAUSTED 问题排查

    本文来自作者投稿,原创作者:Tom 前景提要 早上9点第一个到公司泡了一包枸杞,准备打开极客时间看两篇文章提提神.突然客服部反馈用户发送短信收取不到验证码还一通在有大领导的群里@所有人(负责这块的同事 ...

最新文章

  1. YC陆奇发起知乎第一问:怎样的环境才能让更多AI创业公司成功?
  2. Nat. Mach. Intell. | 集成深度学习在生物信息学中的发展与展望
  3. 马斯克刚失了一枚大火箭!
  4. 写Java程序要体现面向对象
  5. python统计字符串中数字个数 socket_Python中socket中的listen()里参数(数字)到底代表什么?...
  6. CSS深入理解学习笔记之z-index
  7. webpack入门进阶调优第一章
  8. 如何查看光驱硬盘托架的尺寸_如何确定光驱位的硬盘托架的大小尺寸和接口
  9. 流行编程语言_编程语言的流行度排名
  10. LeetCode7.反转整数
  11. networkx怎么显示图_如何将标签添加到networkx图形中的节点?
  12. Linux下ip route、ip rule、iptables的关系(转
  13. rsync通过服务同步、linux日志、screen工具
  14. HDFVIEW3.1.2下载
  15. 功能安全标准-ISO26262-6---硬件集成测试
  16. ISO/IEC 27701:2019(隐私信息安全管理扩展要求和指南)解读(二)
  17. 1寸到36寸照片的尺寸规格
  18. OMNet++ Tic Toc例程的解析1
  19. 华为交换机导入配置_华为交换机配置导入和导出
  20. SQL异常:exist: integer = character varying

热门文章

  1. 计算数组和以及平均值
  2. 微信小程序API之setInterval
  3. Unity3D-VR_Gevr VR射线+tag的转换+物体展示
  4. 计算机二级access上机题,2017历年全国计算机二级access上机试题及答案
  5. 如何在Ubuntu 13.04, 13.10上安装Sublime Text 3
  6. 添加Maven(mvn)、sbt的国内仓库
  7. 【操作系统/OS笔记20】打开文件、文件数据块分配、空闲空间管理、多磁盘管理(RAID)、磁盘调度算法概述
  8. 【操作系统/OS笔记13】信号量、PV操作、管程、条件变量、生产者消费者问题
  9. 威学一百_涨价通知丨威学一百VIP全年畅学卡价格即将上调!
  10. 基于MATLAB的夜间车牌识别处理