1. LimitedConcurrencyLevelTaskScheduler 介绍

这个TaskScheduler用过的应该都知道,微软开源的一个任务调度器,它的代码很简单,
也很好懂,但是我没有明白的是他是如何实现限制并发数的
首先贴下它的代码,大家先熟悉一下。

public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{/// <summary>Whether the current thread is processing work items.</summary> [ThreadStatic]private static bool _currentThreadIsProcessingItems;/// <summary>The list of tasks to be executed.</summary> private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) /// <summary>The maximum concurrency level allowed by this scheduler.</summary> private readonly int _maxDegreeOfParallelism;/// <summary>Whether the scheduler is currently processing work items.</summary> private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) /// <summary> /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the /// specified degree of parallelism. /// </summary> /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");_maxDegreeOfParallelism = maxDegreeOfParallelism;}/// <summary>/// current executing number;/// </summary>public int CurrentCount { get; set; }/// <summary>Queues a task to the scheduler.</summary> /// <param name="task">The task to be queued.</param> protected sealed override void QueueTask(Task task){// Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks){Console.WriteLine("Task Count : {0} ", _tasks.Count);_tasks.AddLast(task);if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism){++_delegatesQueuedOrRunning;NotifyThreadPoolOfPendingWork();}}}int executingCount = 0;private static object executeLock = new object();/// <summary> /// Informs the ThreadPool that there's work to be executed for this scheduler. /// </summary> private void NotifyThreadPoolOfPendingWork(){ThreadPool.UnsafeQueueUserWorkItem(_ =>{// Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true;try{// Process all available items in the queue. while (true){Task item;lock (_tasks){// When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0){--_delegatesQueuedOrRunning;break;}// Get the next item from the queue item = _tasks.First.Value;_tasks.RemoveFirst();}// Execute the task we pulled out of the queue base.TryExecuteTask(item);}}// We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; }}, null);}/// <summary>Attempts to execute the specified task on the current thread.</summary> /// <param name="task">The task to be executed.</param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns>Whether the task could be executed on the current thread.</returns> protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){// If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false;// If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task);// Try to run the task. return base.TryExecuteTask(task);}/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> /// <param name="task">The task to be removed.</param> /// <returns>Whether the task could be found and removed.</returns> protected sealed override bool TryDequeue(Task task){lock (_tasks) return _tasks.Remove(task);}/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> /// <returns>An enumerable of the tasks currently scheduled.</returns> protected sealed override IEnumerable<Task> GetScheduledTasks(){bool lockTaken = false;try{Monitor.TryEnter(_tasks, ref lockTaken);if (lockTaken) return _tasks.ToArray();else throw new NotSupportedException();}finally{if (lockTaken) Monitor.Exit(_tasks);}}
}

简单使用

下面是调用代码。

static void Main(string[] args)
{TaskFactory fac = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(5));//TaskFactory fac = new TaskFactory();for (int i = 0; i < 1000; i++){fac.StartNew(s => {Thread.Sleep(1000);Console.WriteLine("Current Index {0}, ThreadId {1}",s,Thread.CurrentThread.ManagedThreadId);}, i);}Console.ReadKey();
}

调用很简单
根据调试调用顺序可以知道。
使用 LimitedConcurrencyLevelTaskScheduler 创建好TaskFactory 后,
调用该TaskFacotry.StartNew 方法后。会进入 LimitedConcurrencyLevelTaskScheduler
QueueTask 方法。

/// <summary>Queues a task to the scheduler.</summary> /// <param name="task">The task to be queued.</param> protected sealed override void QueueTask(Task task){// Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks){Console.WriteLine("Task Count : {0} ", _tasks.Count);_tasks.AddLast(task);if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism){++_delegatesQueuedOrRunning;NotifyThreadPoolOfPendingWork();}}}

代码很简单,把刚创建的Task 添加到任务队列中去,然后判断当前正在执行的任务数量与设置的允许最大并发数进行比较, 如果小于该值,则开始通知正在挂起的任务开始执行。
我的疑问主要在 NotifyThreadPoolOfPendingWork 这个方法上。

private void NotifyThreadPoolOfPendingWork(){ThreadPool.UnsafeQueueUserWorkItem(_ =>{// Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true;try{// Process all available items in the queue. while (true){Task item;lock (_tasks){// When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0){--_delegatesQueuedOrRunning;break;}// Get the next item from the queue item = _tasks.First.Value;_tasks.RemoveFirst();}// Execute the task we pulled out of the queue base.TryExecuteTask(item);}}// We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; }}, null);}

从代码中看到的意思是一直跑一个死循环, 不断从_tasks 中取出Task执行,
直到_task为空为止,然后退出循环。从这里并没有看到限制并发数的限制,只有在QueueTask中调用的时候有个简单的限制,然而好像并没有什么卵用,
因为只要 NotifyThreadPoolOfPendingWork 方法启动了, 就会一直跑,直到所有的Task执行完成。那他的并发数是如何限制的呢?

一直很迷惑,是不是我哪里理解错了, 还请知道的大神解惑一下。

~~简直醉了。markdown 不太会用,显示有些问题。。
如果看着不舒服可以看这里 https://www.zybuluo.com/kevinsforever/note/115066
使用这里的编辑器写的,但是复制过来不能正常显示。。
3Q .

转载于:https://www.cnblogs.com/DiscoverPalace/p/4567222.html

关于 LimitedConcurrencyLevelTaskScheduler 的疑惑相关推荐

  1. configure_file路径疑惑

    configure_file(<input> <output>[COPYONLY] [ESCAPE_QUOTES] [@ONLY][NEWLINE_STYLE [UNIX|DO ...

  2. 性能测试分析之带宽瓶颈的疑惑

    第一部分, 测试执行 先看一图,再看下文 这个当然就是压力过程中带宽的使用率了,我们的带宽是1Gbps的,合计传输速率为128MB/s,也正因为这个就让我越来越疑惑了,不过通过压力过程中的各项数据我又 ...

  3. RedLock: 看完这篇文章后请不要有任何疑惑了

    图片来源:无证之罪 公众号后台经常会有小伙伴咨询RedLock相关问题,笔者在此再来一篇文章剖析一下RedLock,希望此文能解决你对它所有的疑惑和误解. 为什么需要RedLock 这一点很好理解,因 ...

  4. ConcurrentDictionary线程不安全么,你难道没疑惑,你难道弄懂了么?

    前言 事情不太多时,会时不时去看项目中同事写的代码可以作个参考或者学习,个人觉得只有这样才能走的更远,抱着一副老子天下第一的态度最终只能是井底之蛙.前两篇写到关于断点传续的文章,还有一篇还未写出,后续 ...

  5. java当前路径和相对路径相关的疑惑

    java当前路径和相对路径相关的疑惑_克豪的博客-CSDN博客 java当前路径和相对路径相关的疑惑_克豪的博客-CSDN博客

  6. iOS单例创建的一点疑惑

    线程安全的单例常用写法, +(AccountManager *)sharedManager{static AccountManager *defaultManager = nil;disptch_on ...

  7. unicode环境下用CFile读取txt的若干疑惑,该如何处理

    unicode环境下用CFile读取txt的若干疑惑 在vs2010下对一些文件的操作总是有好多疑问 unicade环境下以前的许多资料发现会出现异常结果,下面看看我的问题: 主要是用CFile读取t ...

  8. feignclient对象找不到_为什么我找不到对象呢,一个33岁大龄剩女的疑惑

    小木是我朋友,属兔今年33了.不仅我觉的她很优秀,周围的人也觉得她很优秀,但就是现在还单身.下面是她的疑惑. 我各方面条件都还行,为什么找不到对象呢? 性格:温和,善良,阳光,有主见.(对待事物有自己 ...

  9. 关于面向对象的总结和疑惑(转载可乐冰

    在这节,老师讲了面向对象的三大特性:1.全局变量:2.封装:3.继承: 现在我就我自己的理解总结一下这节课的内容,并提出相应的疑惑,望老师解答 其一:全局变量 声明变量的方法有三:1,在全局对象中va ...

  10. 汇编实验1遇到的问题及解决之记录(以及尚未解决的疑惑,大神可以帮帮看看吗)

    比较有用的参考资料的网址: https://www.doc88.com/p-6601373721664.html http://www.rsdown.cn/down/168010.html (masm ...

最新文章

  1. ASP.net:Regex.Match 方法 中应该注意的几个问题
  2. 代谢组学的相关分析数据库,MetaboAnalyst 5.0 使用指南
  3. 数据库修复Part1:创建自己的测试corrupt数据库
  4. Hadoop掀起大数据革命 三巨头齐发力
  5. react循环setstate_[React] 8 - React 自身或工程性能优化点?
  6. Java下载文件的几种方式
  7. Spring Boot场景启动器(Starter)
  8. oracle 对象定义被修改,oracle 数据对象_xspaces
  9. Codeforces Round #447 Div. 2 A.B.C
  10. PHP获取IP所在地地址
  11. SpringBoot 轻量级英文版个人博客 flame
  12. python人口普查数据显示_如何使用FCC的API在Python中查找人口普查数据块并遍历dict列表?...
  13. python struct pack unpack
  14. ESP32-Ardunio 心知天气 hhtps 获取数据 +NTP对时
  15. 数字信号处理中,系统函数零极点图的绘制
  16. Securing Wireless LANs with PEAP and Passwords
  17. Windows7 U盘安装Ubuntu14.04双系统教程
  18. 算法设计与分析复习--回溯法
  19. 程序员追求技术夯实基础学习路线建议
  20. 熊过留印---爱心篇

热门文章

  1. stm32 某个io引脚不能拉高_【stm32f407】IO引脚复用和映射
  2. pythonATM,购物车项目实战_补充6-lib模块
  3. CS224N笔记——RNN和语言模型
  4. Go by Example练习
  5. tomcat体系结构
  6. HttpURLConnection模拟form表单提交文件
  7. EventBus (一) 使用详解——初步使用EventBus
  8. C++对象模型——关于对象(第一章)
  9. Hanoi Tower问题分析
  10. android之App widget实际应用Demo