
随着CPU多核的普及,编程时充分利用这个特性越显重要。上篇首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的System.Threading.Tasks提供的Parallel Class来并行地进行填充,最后对比他们的性能。本文将深入分析Parallel Class并借机回答上篇9楼提出的问题,而System.Threading.Tasks分析,这个将推迟到.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(三)中介绍。内容如下:

  • 1、Parallel Class

    • 1.1、For方法
    • 1.2、ForEach方法
    • 1.3、Invoke方法
  • 2、并发控制疑问?
    • 2.1、使用Lock锁
    • 2.2、使用PLINQ——用AsParallel
    • 2.3、使用PLINQ——用ParallelEnumerable
    • 2.4、使用Interlocked操作
    • 2.5、使用Parallel.For的有Thread-Local变量重载函数
  • 性能比较

1、Parallel Class


01 using System.Threading.Tasks;   
02 class Test
03 {
04     static int N = 1000;
06     static void TestMethod()
07     {
08         // Using a named method.
09         Parallel.For(0, N, Method2);
11         // Using an anonymous method.
12         Parallel.For(0, N, delegate(int i)
13         {
14             // Do Work.
15         });
17         // Using a lambda expression.
18         Parallel.For(0, N, i =>
19         {
20             // Do Work.
21         });
22     }
24     static void Method2(int i)
25     {
26         // Do work.
27     }
28 }


  • For(Int32 fromInclusive, Int32 toExclusive, Action<Int32> body),该方法对区间(fromInclusive,toExclusive)之间的迭代调用body表示的委托。body委托有一个迭代数次的int32参数,如果fromInclusive>=toExclusive,则不会执行任何迭代。
  • For(Int32 fromInclusive, Int32 toExclusive, Action<Int32, ParallelLoopState>),该方法对区间(fromInclusive, toExclusive)之间的迭代调用body表示的委托。body委托有两个参数——表示迭代数次的int32参数、一个可用于过早地跳出循环的ParallelLoopState实例。如果fromInclusive>=toExclusive,则不会执行任何迭代。
  • For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body),跟第一个方法类似,但它的区间是[fromInclusive, toExclusive)。
  • For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32, ParallelLoopState> body),跟第二个方法类似,单的区间是[fromInclusive, toExclusive)。
  • For<TLocal>(Int32 fromInclusive, Int32 toExclusive, Func<TLocal> localInit, Func<Int32, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally),它的迭代区间是[fromInclusive, toExclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localInit委托将在每个线程参与循环执行时调用,并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localFinally委托。每个线程执行在自己的loacl 状态上执行最后一个动作时,localFinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。
  • For<TLocal>(Int32, Int32, ParallelOptions, Func<TLocal>, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action<TLocal>),跟上面的方法类似。

下面代码演示了For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body)方法(来自MSDN):

01 using System;
02 using System.Collections.Generic;
03 using System.Linq;
04 using System.Text;
05 using System.Threading;
06 using System.Threading.Tasks;
08 namespace ConsoleApplication2
09 {
10     class Program
11     {
12         // Demonstrated features:
13         //        CancellationTokenSource
14         //         Parallel.For()
15         //        ParallelOptions
16         //        ParallelLoopResult
17         // Expected results:
18         //         An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.
19         //        The order of execution of the iterations is undefined.
20         //        The iteration when i=2 cancels the loop.
21         //        Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order, 
22         //          it is impossible to say which will start/complete and which won't.
23         //        At the end, an OperationCancelledException is surfaced.
24         // Documentation:
25         //        http://msdn.microsoft.com/en-us/library/system.threading.cancellationtokensource(VS.100).aspx
27         static void Main(string[] args)
28         {
29             CancellationTokenSource cancellationSource = new CancellationTokenSource();
30             ParallelOptions options = new ParallelOptions();
31             options.CancellationToken = cancellationSource.Token;
32             try
33             {
34                 ParallelLoopResult loopResult = Parallel.For(
35                     0,
36                     10,
37                     options,
38                     (i, loopState) =>
39                     {
40                         Console.WriteLine("Start Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
42                         // Simulate a cancellation of the loop when i=2
43                         if (i == 2)
44                         {
45                             cancellationSource.Cancel();
46                         }
48                         // Simulates a long execution
49                         for (int j = 0; j < 10; j++)
50                         {
51                             Thread.Sleep(1 * 200);
53                             // check to see whether or not to continue
54                             if (loopState.ShouldExitCurrentIteration) return;
55                         }
57                         Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
58                     }
59                 );
60                 if (loopResult.IsCompleted)
61                 {
62                     Console.WriteLine("All iterations completed successfully. THIS WAS NOT EXPECTED.");
63                 }
64             }
65                 // No exception is expected in this example, but if one is still thrown from a task,
66                 // it will be wrapped in AggregateException and propagated to the main thread.
67             catch (AggregateException e)
68             {
69                 Console.WriteLine("Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}", e);
70             }
71                 // Catching the cancellation exception
72             catch (OperationCanceledException e)
73             {
74                 Console.WriteLine("An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}", e.ToString());
75             }
76         }
77     }
78 }





  • Invoke(params Action[] actions):actions是一个要执行的动作数组,这些动作可能并行地执行,但并不保证执行的顺序及一定并行执行。这个方法直到提供的所有操作完成时才返回,不管是否正常地完成或异常终止。
  • Invoke(ParallelOptions parallelOptions, params Action[] actions):跟上面的方法类似,只是增加了一个parallelOptions参数,可以用户调用者取消整个操作。


01 using System;
02 using System.Collections.Generic;
03 using System.Linq;
04 using System.Text;
05 using System.Threading;
06 using System.Threading.Tasks;
08 namespace ConsoleApplication2
09 {
10     class Program
11     {
12         static void Main()
13         {
14             try
15             {
16                 Parallel.Invoke(
17                     BasicAction,    // Param #0 - static method
18                     () =>            // Param #1 - lambda expression
19                     {
20                         Console.WriteLine("Method=beta, Thread={0}", Thread.CurrentThread.ManagedThreadId);
21                     },
22                     delegate()        // Param #2 - in-line delegate
23                     {
24                         Console.WriteLine("Method=gamma, Thread={0}", Thread.CurrentThread.ManagedThreadId);
25                     }
26                 );
27             }
28             // No exception is expected in this example, but if one is still thrown from a task,
29             // it will be wrapped in AggregateException and propagated to the main thread.
30             catch (AggregateException e)
31             {
32                 Console.WriteLine("An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}", e.InnerException.ToString());
33             }
34         }
36         static void BasicAction()
37         {
38             Console.WriteLine("Method=alpha, Thread={0}", Thread.CurrentThread.ManagedThreadId);
39         }
40     }
41 }




01 using System;
02 using System.Collections.Generic;
03 using System.Linq;
04 using System.Text;
05 using System.Threading;
06 using System.Threading.Tasks;
08 namespace ConsoleApplication2
09 {
10     class Program
11     {        
12         static void Main(string[] args)
13         {
14             int loops=0;
15             while (loops <= 100)
16             {
17                 long sum = 0;                
18                 Parallel.For(1, 1001, delegate(long i)
19                 {
20                     sum += i;
21                 });
22                 System.Console.WriteLine(sum);
23                 loops++;
24             }
25         }
26     }
27 }


