在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

的类来帮助我们简化并行计算中复杂的数据同步问题。

大体上分为二种:

①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

些轻量级的同步机制。

一:Barrier(屏障同步)

1:基本概念

msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作。乍一看有点不懂,没关系,我们采取提干法。

”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

啰嗦了半天,还是上下代码说话:

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;class Program
{
//四个task执行
static Task[] tasks = new Task[4];static Barrier barrier = null;static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);LoadUser(single);
barrier.SignalAndWait();LoadProduct(single);
barrier.SignalAndWait();LoadOrder(single);
barrier.SignalAndWait();
}, j);
}Task.WaitAll(tasks);Console.WriteLine("指定数据库中所有数据已经加载完毕!");Console.Read();
}static void LoadUser(int num)
{
Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);
}static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}

2:死锁问题

先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;class Program
{
//四个task执行
static Task[] tasks = new Task[4];static Barrier barrier = null;static void Main(string[] args)
{
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);LoadUser(single);
barrier.SignalAndWait();LoadProduct(single);
barrier.SignalAndWait();LoadOrder(single);
barrier.SignalAndWait();}, j);
}Task.WaitAll(tasks);barrier.Dispose();Console.WriteLine("指定数据库中所有数据已经加载完毕!");Console.Read();
}static void LoadUser(int num)
{
Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);if (num == 0)
{
//num=0:表示0号任务
//barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出
// SpinWait.SpinUntil: 自旋锁,相当于死循环
SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);
}
}static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}

我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

3:超时机制

当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

还要采用CancellationToken机制。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;class Program
{
//四个task执行
static Task[] tasks = new Task[4];static Barrier barrier = null;static void Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();CancellationToken ct = cts.Token;barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);LoadUser(single);if (!barrier.SignalAndWait(2000))
{
//抛出异常,取消后面加载的执行
throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct);
}LoadProduct(single);
barrier.SignalAndWait();LoadOrder(single);
barrier.SignalAndWait();}, j, ct);
}//等待所有tasks 4s
Task.WaitAll(tasks, 4000);try
{
for (int i = 0; i < tasks.Length; i++)
{
if (tasks[i].Status == TaskStatus.Faulted)
{
//获取task中的异常
foreach (var single in tasks[i].Exception.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}
}barrier.Dispose();
}
catch (AggregateException e)
{
Console.WriteLine("我是总异常:{0}", e.Message);
}Console.Read();
}static void LoadUser(int num)
{
Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);if (num == 0)
{
//自旋转5s
if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000))
return;
}Console.WriteLine("当前任务:{0}正在加载User数据完毕!", num);
}static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}
}

二:spinLock(自旋锁)

我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
using System.Threading;class Program
{
static SpinLock slock = new SpinLock(false);static int sum1 = 0;static int sum2 = 0;static void Main(string[] args)
{
Task[] tasks = new Task[100];for (int i = 1; i <= 100; i++)
{
tasks[i - 1] = Task.Factory.StartNew((num) =>
{
Add1((int)num);Add2((int)num);}, i);
}Task.WaitAll(tasks);Console.WriteLine("Add1数字总和:{0}", sum1);Console.WriteLine("Add2数字总和:{0}", sum2);Console.Read();
}//无锁
static void Add1(int num)
{
Thread.Sleep(100);sum1 += num;
}//自旋锁
static void Add2(int num)
{
bool lockTaken = false;Thread.Sleep(100);try
{
slock.Enter(ref lockTaken);
sum2 += num;
}
finally
{
if (lockTaken)
slock.Exit(false);
}
}
}

8天玩转并行开发——第四天 同步机制(上)相关推荐

  1. 8天玩转并行开发——第五天 同步机制(下)

    承接上一篇,我们继续说下.net4.0中的同步机制,是的,当出现了并行计算的时候,轻量级别的同步机制应运而生,在信号量这一块 出现了一系列的轻量级,今天继续介绍下面的3个信号量 CountdownEv ...

  2. 8天玩转并行开发——第八天 用VS性能向导解剖你的程序

    8天玩转并行开发--第八天 用VS性能向导解剖你的程序 原文 8天玩转并行开发--第八天 用VS性能向导解剖你的程序 最后一篇,我们来说说vs的"性能向导",通常我们调试程序的性能 ...

  3. 8天玩转并行开发——第三天 plinq的使用

    8天玩转并行开发--第三天 plinq的使用 原文 8天玩转并行开发--第三天 plinq的使用 相信在.net平台下,我们都玩过linq,是的,linq让我们的程序简洁优美,简直玩的是爱不释手,但是 ...

  4. 8天玩转并行开发——第六天 异步编程模型

    原文:8天玩转并行开发--第六天 异步编程模型 在.net里面异步编程模型由来已久,相信大家也知道Begin/End异步模式和事件异步模式,在task出现以后,这些东西都可以被task包装 起来,可能 ...

  5. 并行开发 4.同步机制(上)

    原文:8天玩转并行开发--第四天 同步机制(上) 在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合.虽然task自带了两个方法:task.ContinueWith()和Task.Factor ...

  6. C#并行开发_Thread/ThreadPool, Task/TaskFactory, Parallel

    大家好,本次讨论的是C#中的并行开发,给力吧,随着并行的概念深入,哥也赶上这个潮流了,其实之前讨论C#的异步调用或者C#中BeginInvoke或者Invoke都已经涉及了部分本篇的内容. 参考书目: ...

  7. iOS开发系列--并行开发其实很容易

    --多线程开发 概览 大家都知道,在开发过程中应该尽可能减少用户等待时间,让程序尽可能快的完成运算.可是无论是哪种语言开发的程序最终往往转换成汇编语言进而解释成机器码来执行.但是机器码是按顺序执行的, ...

  8. bucket sort sample sort 并行_IBM布局AI硬件大杀器:硬软件并行开发、开源模拟AI工具包...

    原标题:IBM布局AI硬件大杀器:硬软件并行开发.开源模拟AI工具包 智东西(公众号:zhidxcom) 编 | 子佩 智东西11月4日消息,为了解决AI对数据.能源和内存资源的巨大需求,IBM一直致 ...

  9. 并行开发的基本概念及两个重要的定律

    并行开发的基本概念: 同步(Synchronous)和异步(Asynchronous) 同步和异步通常用来形容一次方法的调用.同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为.异 ...

最新文章

  1. 关闭ES动态创建type
  2. OpenCascade Primitives BRep-Cylinder
  3. [你必须知道的.NET]第二十七回:interface到底继承于object吗?
  4. poi excel 隐藏标识_纳尼?Excel竟然自带项目管理模板
  5. 博物馆自动灭火系统应如何选择
  6. Hibernate 一对一注释
  7. [安全攻防进阶篇] 四.逆向分析之条件语句和循环语句源码还原及流程控制逆向
  8. LC-BLSTM结构快速解读
  9. ios13 无法传参_iOS13个人热点功能频遭投诉
  10. 升级glibc库到glibc-2.14.1
  11. vscode大讲堂——代码力max的编辑器
  12. 微信小程序抖音实战-小视频弹幕
  13. 计算机基础知识总结(一)
  14. JavaScript合并两个有序数组
  15. xyz坐标转换ybc_GNSS仰角和方位角的计算及代码,XYZ转BLH坐标的代码及原理
  16. python与财务报表分析_《财务报表分析》第八章 企业财务综合分析与业绩评价课后练习...
  17. MySQL - 大量 sending data 状态进程,让数据库性能急剧下降。
  18. linux下给文件夹创建链接
  19. excel与access结合运用_access和excel结合应用
  20. Collection类和泛型

热门文章

  1. 在IE中测试调用Web Service
  2. oracle parallel 并行 设置 理解
  3. Python3 流程控制语句
  4. JVM笔记6-垃圾回收器
  5. Cisco 2950 忘记密码如何重设
  6. 查看终端进程是否死掉技巧
  7. Libgdx学习笔记:分享自己写的异步加载
  8. oracle技术之使用rman找回被误删除表空间
  9. 列表导航栏实例(01)
  10. 2019年,比特币现金爱好者线下见面会发展至6大洲30个国家