ConcurrentBag可以理解为是一个线程安全无序集合,API比我们的list要弱一点,那我们来看看它的实现:

  public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>{// ThreadLocalList object that contains the data per threadThreadLocal<ThreadLocalList> m_locals;// This head and tail pointers points to the first and last local lists, to allow enumeration on the thread locals objectsvolatile ThreadLocalList m_headList, m_tailList;bool m_needSync;public ConcurrentBag() { Initialize(null);}public ConcurrentBag(IEnumerable<T> collection){if (collection == null){throw new ArgumentNullException("collection", SR.GetString(SR.ConcurrentBag_Ctor_ArgumentNullException));}Initialize(collection);}private void Initialize(IEnumerable<T> collection){m_locals = new ThreadLocal<ThreadLocalList>();// Copy the collection to the bagif (collection != null){ThreadLocalList list = GetThreadList(true);foreach (T item in collection){list.Add(item, false);}}}public void Add(T item){// Get the local list for that thread, create a new list if this thread doesn't exist //(first time to call add)ThreadLocalList list = GetThreadList(true);AddInternal(list, item);}private void AddInternal(ThreadLocalList list, T item){bool lockTaken = false;try{Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);//Synchronization cases:// if the list count is less than two to avoid conflict with any stealing thread// if m_needSync is set, this means there is a thread that needs to freeze the bagif (list.Count < 2 || m_needSync){// reset it back to zero to avoid deadlock with stealing threadlist.m_currentOp = (int)ListOperation.None;Monitor.Enter(list, ref lockTaken);}list.Add(item, lockTaken);}finally{list.m_currentOp = (int)ListOperation.None;if (lockTaken){Monitor.Exit(list);}}}private ThreadLocalList GetThreadList(bool forceCreate){ThreadLocalList list = m_locals.Value;if (list != null){return list;}else if (forceCreate){// Acquire the lock to update the m_tailList pointerlock (GlobalListsLock){if (m_headList == null){list = new ThreadLocalList(Thread.CurrentThread);m_headList = list;m_tailList = list;}else{list = GetUnownedList();if (list == null){list = new ThreadLocalList(Thread.CurrentThread);m_tailList.m_nextList = list;m_tailList = list;}}m_locals.Value = list;}}else{return null;}Debug.Assert(list != null);return list;}public bool TryTake(out T result){return TryTakeOrPeek(out result, true);}public bool TryPeek(out T result){return TryTakeOrPeek(out result, false);}private bool TryTakeOrPeek(out T result, bool take){// Get the local list for that thread, return null if the thread doesn't exit //(this thread never add before) ThreadLocalList list = GetThreadList(false);if (list == null || list.Count == 0){return Steal(out result, take);}bool lockTaken = false;try{if (take) // Take operation
                {Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Take);//Synchronization cases:// if the list count is less than or equal two to avoid conflict with any stealing thread// if m_needSync is set, this means there is a thread that needs to freeze the bagif (list.Count <= 2 || m_needSync){// reset it back to zero to avoid deadlock with stealing threadlist.m_currentOp = (int)ListOperation.None;Monitor.Enter(list, ref lockTaken);// Double check the count and steal if it became emptyif (list.Count == 0){// Release the lock before stealingif (lockTaken){try { }finally{lockTaken = false; // reset lockTaken to avoid calling Monitor.Exit again in the finally block
                                    Monitor.Exit(list);}}return Steal(out result, true);}}list.Remove(out result);}else{if (!list.Peek(out result)){return Steal(out result, false);}}}finally{list.m_currentOp = (int)ListOperation.None;if (lockTaken){Monitor.Exit(list);}}return true;}private bool Steal(out T result, bool take){bool loop;List<int> versionsList = new List<int>(); // save the lists versiondo{versionsList.Clear(); //clear the list from the previous iterationloop = false;ThreadLocalList currentList = m_headList;while (currentList != null){versionsList.Add(currentList.m_version);if (currentList.m_head != null && TrySteal(currentList, out result, take)){return true;}currentList = currentList.m_nextList;}// verify versioning, if other items are added to this list since we last visit it, we should retrycurrentList = m_headList;foreach (int version in versionsList){if (version != currentList.m_version) //oops state changed
                    {loop = true;if (currentList.m_head != null && TrySteal(currentList, out result, take))return true;}currentList = currentList.m_nextList;}} while (loop);result = default(T);return false;}private bool TrySteal(ThreadLocalList list, out T result, bool take){lock (list){if (CanSteal(list)){list.Steal(out result, take);return true;}result = default(T);return false;}}private bool CanSteal(ThreadLocalList list){if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None){SpinWait spinner = new SpinWait();while (list.m_currentOp != (int)ListOperation.None){spinner.SpinOnce();}}if (list.Count > 0){return true;}return false;}/// <summary>/// Try to reuse an unowned list if exist/// unowned lists are the lists that their owner threads are aborted or terminated/// this is workaround to avoid memory leaks./// </summary>/// <returns>The list object, null if all lists are owned</returns>private ThreadLocalList GetUnownedList(){//the global lock must be held at this point
            Contract.Assert(Monitor.IsEntered(GlobalListsLock));ThreadLocalList currentList = m_headList;while (currentList != null){if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped){currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safereturn currentList;}currentList = currentList.m_nextList;}return null;}internal class ThreadLocalList{internal volatile Node m_head;private volatile Node m_tail;internal volatile int m_currentOp;private int m_count;internal int m_stealCount;internal volatile ThreadLocalList m_nextList;internal bool m_lockTaken;internal Thread m_ownerThread;internal volatile int m_version;internal ThreadLocalList(Thread ownerThread){m_ownerThread = ownerThread;}internal void Add(T item, bool updateCount){checked{m_count++;}Node node = new Node(item);if (m_head == null){Debug.Assert(m_tail == null);m_head = node;m_tail = node;m_version++; // changing from empty state to non empty state
                }else{node.m_next = m_head;m_head.m_prev = node;m_head = node;}if (updateCount) // update the count to avoid overflow if this add is synchronized
                {m_count = m_count - m_stealCount;m_stealCount = 0;}}/// <summary>/// Remove an item from the head of the list/// </summary>/// <param name="result">The removed item</param>internal void Remove(out T result){Debug.Assert(m_head != null);Node head = m_head;m_head = m_head.m_next;if (m_head != null){m_head.m_prev = null;}else{m_tail = null;}m_count--;result = head.m_value;}/// <summary>/// Peek an item from the head of the list/// </summary>/// <param name="result">the peeked item</param>/// <returns>True if succeeded, false otherwise</returns>internal bool Peek(out T result){Node head = m_head;if (head != null){result = head.m_value;return true;}result = default(T);return false;}internal void Steal(out T result, bool remove){Node tail = m_tail;Debug.Assert(tail != null);if (remove) // Take operation
                {m_tail = m_tail.m_prev;if (m_tail != null){m_tail.m_next = null;}else{m_head = null;}// Increment the steal countm_stealCount++;}result = tail.m_value;}}internal class Node{public Node(T value){m_value = value;}public readonly T m_value;public Node m_next;public Node m_prev;}}

首先我们需要知道里面有2个内部类Node和ThreadLocalList都是链表结构,其中Node是双向链表,因为它有m_next和m_prev属性,但是ThreadLocalList确是单项链表只有m_nextList属性,ThreadLocalList是Node的集合,有m_head和m_tail属性指向Node实例。现在我们来看ConcurrentBag的几个变量,ThreadLocal<ThreadLocalList> m_locals表示当前线程的list,所以从这里我们可以猜测线程安全是采用ThreadLocal来实现的。 volatile ThreadLocalList m_headList, m_tailList;这2个变量应该是可以遍历所有线程的list。

无论是初始化Initialize方法还是添加元素的Add方法,我们首先要调用GetThreadList放来获取当前线程的list,GetThreadList方法 首先检查当前线程的m_locals.Value是否存在,有则直接返回;否者检查当前线程是否是程序第一个线程【m_headList == null】,如果是则创建新的ThreadLocalList,否者调用GetUnownedList放法检查是否有孤立ThreadLocalList使用【ThreadLocalList的逻辑线程已经停止,但是该ThreadLocalList实例确存在】,如果有则返回改ThreadLocalList,否则只有新建ThreadLocalList实例。

现在看看AddInternal方法的实现,首先修改ThreadLocalList的m_currentOp标记为添加元素【 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add)】,然后在添加元素 list.Add(item, lockTaken);,如果该list需要lock的话,那么在添加元素前我们还需要加锁Monitor.Enter(list, ref lockTaken),添加后需要解锁 Monitor.Exit(list)。ThreadLocalList的Add方法非常简单,把新节点放到链表头部【 node.m_next = m_head;m_head.m_prev = node; m_head = node;】

添加元素时添加到各个线程的ThreadLocalList,那么读取就比较麻烦了,我们需要读取各各线程ThreadLocalList的数据,也就是说需要用到m_headList, m_tailList两个变量。如果当前线程存在ThreadLocalList实例,那么直接从ThreadLocalList里面拿去数据,如果需要加锁,那么我们就加锁【 Monitor.Enter(list, ref lockTaken)】和解锁【Monitor.Exit(list)】,都是当前线程的list,如果当前线程ThreadLocalList不存在,或者没有数据,我们需要从其他线程的ThreadLocalList获取数据,Steal方法 首先或从m_headList开始,依次遍历每一个ThreadLocalList,然后从它们里面获取数据,如果获取不到数据,那么就再次遍历一下所有的ThreadLocalList,检查哪些ThreadLocalList的版本m_version在这两次遍历过程中发生了变化。

  do{versionsList.Clear(); //clear the list from the previous iterationloop = false;ThreadLocalList currentList = m_headList;while (currentList != null){versionsList.Add(currentList.m_version);if (currentList.m_head != null && TrySteal(currentList, out result, take)){return true;}currentList = currentList.m_nextList;}// verify versioning, if other items are added to this list since we last visit it, we should retrycurrentList = m_headList;foreach (int version in versionsList){if (version != currentList.m_version) //oops state changed
                    {loop = true;if (currentList.m_head != null && TrySteal(currentList, out result, take))return true;}currentList = currentList.m_nextList;}} while (loop);

TrySteal方法的实现就非常简单了,检查list是否可以查询数据【CanSteal(list)】,CanSteal里面也用了自旋来实现【if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None){ SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) {spinner.SpinOnce(); } }】,真正Steal实现是由ThreadLocalList来做的,比较简单。

转载于:https://www.cnblogs.com/majiang/p/7884556.html

C# ConcurrentBag实现相关推荐

  1. C# ConcurrentBag的实现原理

    一.前言 笔者最近在做一个项目,项目中为了提升吞吐量,使用了消息队列,中间实现了生产消费模式,在生产消费者模式中需要有一个集合,来存储生产者所生产的物品,笔者使用了最常见的List<T>集 ...

  2. C#多线程编程系列(五)- C# ConcurrentBag的实现原理

    目录 一.前言 二.ConcurrentBag类 三. ConcurrentBag线程安全实现原理 1. ConcurrentBag的私有字段 2. 用于数据存储的ThreadLocalList类 3 ...

  3. 任务并行库(Task Parellel Library)parallel.for parallel.foreach、List、ConcurrentBag 并行集合、线程安全结合

    普通的for .foreach 都是顺序依次执行的. C#当中我们一般使用for和foreach执行循环,有时候我们呢的循环结构每一次的迭代需要依赖以前一次的计算或者行为.但是有时候则不需要.如果迭代 ...

  4. C#线程安全集合类说明(2): ConcurrentBag<T>

    线程安全的集合所在的命名空间 using System.Collections.Concurrent; Concurrent意思是并发的,并行的.反义是sequential(顺序的),线程安全的意思就 ...

  5. concurrentbag 删除指定元素_Python 列表,for循环,元组的使用(修改、添加、删除、排序、切片)

    Python 列表(List) Python的基本数据类型有整数,浮点数,布尔,字符串,它们是最基本的数据.在实际编程中,我们要经常组织由很多基本数据组成的集合,这些集合的不同组织方式就是:数据结构, ...

  6. concurrentbag 删除指定元素_Python实现列表索引批量删除的5种方法_python

    这篇文章主要介绍了Python实现列表索引批量删除的5种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 最近用Java做项目 ...

  7. concurrentbag 删除_你知道吗?这样删除iPhone中的APP腾出的空间会更大

    iPhone上要删除一个APP是非常方便的,长按要删除APP的图标直至其抖动且图标左上角出现一个小叉,然后点击这个叉就删除了,99%的人几乎都是这么删除的. iPhone上如何删除APP 大家可以打开 ...

  8. 改善C#程序的建议3:在C#中选择正确的集合进行编码

    原文:改善C#程序的建议3:在C#中选择正确的集合进行编码 要选择正确的集合,我们首先要了解一些数据结构的知识.所谓数据结构,就是相互之间存在一种或多种特定关系的数据元素的集合.结合下图,我们看一下对 ...

  9. Spring Boot 青睐的数据库连接池HikariCP为什么是史上最快的?

    前言 现在已经有很多公司在使用HikariCP了,HikariCP还成为了SpringBoot默认的连接池,伴随着SpringBoot和微服务,HikariCP 必将迎来广泛的普及. 下面陈某带大家从 ...

最新文章

  1. 扫盲文章:AMF,RTMP,RTMPT,RTMPS
  2. spring restful遇到的问题
  3. c语言错误指导,c语言编程指导.pdf
  4. android 关于2.2版本之前的流量统计
  5. mysql orm c语言_【译】Simple MySQL ORM for C
  6. 如何对CAD进行区域覆盖
  7. 3分钟搞懂MySQL事务隔离级别及SET TRANSACTION影响事务
  8. android json字符串转成json对象_C++ 两行代码实现json与类对象互转
  9. 问题-Fastreport4 Memo打印时中文显示不全
  10. 【Mybatis笔记】mybatis实现mysql增删改查
  11. Redis简介、安装、配置、启用学习笔记
  12. UE4.26官方文档网页浏览录屏打包下载版
  13. docker和k8s的常见命令
  14. 怎样设计访谈提纲_如何设计调查问卷与访谈提纲要点分析.ppt
  15. python 对两列互补的数据合并
  16. Python 开发个人微信号在运维开发中的使用
  17. 我看技术人的成长路径
  18. View 5应用之五:iPad与Android携带虚拟桌面
  19. SMTP, POP3, IMAP,Exchange ActiveSync区别
  20. 网上舆情分析报告写作框架及六大技巧

热门文章

  1. 《途客圈创业记:不疯魔,不成活》一一2.1 创新工场初印象
  2. iOS NSNotificationCenter 使用姿势详解
  3. Unity插件之NGUI学习(8)—— Table和NGUI尺寸转换为世界坐标系尺寸
  4. 轻松学Linux之使用转义字符
  5. JAVA基础知识(1)
  6. courses to choose
  7. semester 2 deadline from the computer science and electronic engineering
  8. formal method lecture 9
  9. 【转】内存耗用:VSS/RSS/PSS/USS
  10. 远程桌面Web连接访问及端口更改方法