在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅

什么是TPL?

Task Parallel Library (TPL), 在.NET Framework 4微软推出TPL,并把TPL作为编写多线程和并行代码的首选方式,但是,在国内,到目前为止好像用的人并不多。(TPL)是System.Threading和System.Threading.Tasks命名空间中的一组公共类型和API 。TPL的目的是通过简化向应用程序添加并行性和并发性的过程来提高开发人员的工作效率,TPL动态地扩展并发度,以最有效地使用所有可用的处理器。通过使用TPL,您可以最大限度地提高代码的性能,让我们专注于程序本身而不用去关注负责的多线程管理。

出自: https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-parallel-library-tpl

为什么使用TPL?

在上面介绍了什么是TPL,可能大家还是云里雾里,不知道TPL的好处到底是什么。

我在youtube上找到了一个优秀的视频,讲述的是TPL和Thread的区别,我觉得对比一下,TPL的优势很快就能体现出来,如果大家能打开的话建议大家一定要看看。

地址是:https://www.youtube.com/watch?v=No7QqSc5cl8

现如今,我们的电脑的CPU怎么也是2核以上,下面假设我的电脑是四核的,我们来做一个实验。

使用Thread

代码中,如果使用Thread来处理任务,如果不做特出的处理,只是thread.Start(),监测电脑的核心的使用情况是下面这样的。

每一条线代表CPU某个核心的使用情况,明显,随着代码Run起来,其实只有某一个核心的使用率迅速提升,其他核心并无明显波动,为什么会这样呢?

原来,默认情况下,操作系统并不会调用所有的核心来处理任务,即使我们使用多线程,其实也是在一个核心里面运行这些Thread,而且Thread之间涉及到线程同步等问题,其实,效率也不会明显提高。

使用TPL

在代码中,引入了TPL来处理相同的任务,再次监视各个核心的使用情况,效果就变得截然不同,如下。

可以看到各个核心的使用情况都同时有了明显的提高。

说明使用TPL后,不再是使用CPU的某个核心来处理任务了,而是TPL自动把任务分摊给每个核心来处理,处理效率可想而知,理论上会有明显提升的(为什么说理论上?和使用多线程一样,各个核心之间的同步管理也是要占用一定的效率的,所以对于并不复杂的任务,使用TPL可能适得其反)。

实验结果出自https://www.youtube.com/watch?v=No7QqSc5cl8

看了这个实验讲解,是不是理解了上面所说的这句。

TPL的目的是通过简化向应用程序添加并行性和并发性的过程来提高开发人员的工作效率,TPL动态地扩展并发度,以最有效地使用所有可用的处理器。

所以说,使用TPL 来处理多线程任务可以让你不必吧把精力放在如何提高多线程处理效率上,因为这一切,TPL 能自动地帮你完成。

TPL Dataflow?

TPL处理Dataflow是TPL强大功能中的一种,它提供一套完整的数据流组件,这些数据流组件统称为TPL Dataflow Library,那么,在什么场景下适合使用TPL Dataflow Library呢?

官方举的一个 栗子 再恰当不过:

例如,通过TPL Dataflow提供的功能来转换图像,执行光线校正或防红眼,可以创建管道数据流组件,管道中的每个功能可以并行执行,并且TPL能自动控制图像流在不同线程之间的同步,不再需要Thread 中的Lock。

TPL数据流库由Block组成,Block是缓冲和处理数据的单元,TPL定义了三种最基础的Block。

source blocks(System.Threading.Tasks.Dataflow.ISourceBlock <TOutput>),源块充当数据源并且可以从中读取。

target blocks(System.Threading.Tasks.Dataflow.ITargetBlock <TInput>),目标块充当数据接收器并可以写入。

propagator blocks(System.Threading.Tasks.Dataflow.IPropagatorBlock <TInput,TOutput>),传播器块充当源块和目标块,并且可以被读取和写入。它继承自ISourceBlock <TOutput>和ITargetBlock <TInput>。

还有其他一些个性化的Block,但其实他们都是对这三种Block进行一些扩充,可以结合下面的代码来理解这三种Block.

Code Show

1.source block 和 target block 合并成propagator block.

private IPropagatorBlock<string, Dictionary<int, string>> Process1(){var bufferBlock = new BufferBlock<Dictionary<int, string>>();var actionBlock = new ActionBlock<string>(x =>{Console.WriteLine($"Process1 处理中:{x}");Thread.Sleep(5000);var dic = new Dictionary<int, string> { { 0, x } };dic.Add(1, "Process1");bufferBlock.Post(dic);}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});actionBlock.Completion.ContinueWith(_ =>{Console.WriteLine($"Process1 Complete,State{_.Status}");bufferBlock.Complete();});return DataflowBlock.Encapsulate(actionBlock, bufferBlock);}

可以看到,我定义了BufferBlock和ActionBlock,它们分别继承于ISourceBlock 和 ITargetBlock ,所以说,他们其实就是源块和目标块,在new actionBlock()中传入了一个Action<String>,该Action就是该Block所执行的任务。 最后,DataflowBlock.Encapsulate(actionBlock, bufferBlock)把源块和目标块合并成了一个传递块。

2.TransformBlock

private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2(){var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>{Console.WriteLine($"Process2 处理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(2, "Process2");return dic;}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process2 Complete,State{_.Status}");});return block;}

TransfromBlock继承了IPropagatorBlock,所以它本身就是一个传递块,所以它除了要处理出入数据,还要返回数据,所以给new TransformBlock()中传入的是Func<TInput, TOutput>而不是Action<TInput>.

3.TargetBlock来收尾

private ITargetBlock<Dictionary<int, string>> Process3(){var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>{Console.WriteLine($"Process3 处理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(3, "Process3");Console.WriteLine("Dic中的内容如下:");foreach (var item in dic){Console.Write($"{item.Key}:{item.Value}||");}Console.WriteLine();}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});return actionBlock;}

TargetBlock只能写入并处理数据,不能读取,所以TargetBlock适合作为Pipeline的最后一个Block。

4.控制每个Block的并行度

在在构造TargetBlock(包括其子类)的时候,可以传入ExecutionDataflowBlockOptions参数,ExecutionDataflowBlockOptions对象里面有一个MaxDegreeOfParallelism属性,通过改制,可以控制该Block的同时处理任务的数量(可以理解成线程数)。

new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism}

5.构建Pipeline,连接Block

public Task Builder(){_startBlock = Process1();var process2Block = Process2();var process3Block = Process3();_startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });process3Block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process3 Complete,State{_.Status}");Console.WriteLine("所有任务处理完成");});return process3Block.Completion;}

通过

ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOption)

方法,可以把Block连接起来,即构建Pipeline,当DataflowLinkOptions对象的PropagateCompletion属性为true时,SorceBlock任务处理完成是,会把TargetBlock也标记为完成。

Block被标记为Complete 后,无法传入新的数据了,即不能再处理新的任务了。

6.Pipeline的运行

public void Process(string[] inputs){if (inputs == null)return;foreach (var input in inputs){_startBlock.Post(input);}_startBlock.Complete();}

Pipeline构建好后,我们只需要给第一个Block传入数据,该数据就会在管道内流动起来了,所有数据传入完成后,调用Block的Complete方法,把该Block标记为完成,就不可以再往里面Post数据了。

完整代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;namespace Tpl.Dataflow
{public class Pipeline{IPropagatorBlock<string, Dictionary<int, string>> _startBlock;private int _maxDegreeOfParallelism;public Pipeline(int maxDegreeOfParallelism){_maxDegreeOfParallelism = maxDegreeOfParallelism;}public void Process(string[] inputs){if (inputs == null)return;foreach (var input in inputs){_startBlock.Post(input);}_startBlock.Complete();}public Task Builder(){_startBlock = Process1();var process2Block = Process2();var process3Block = Process3();_startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true });process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true });process3Block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process3 Complete,State{_.Status}");Console.WriteLine("所有任务处理完成");});return process3Block.Completion;}private IPropagatorBlock<string, Dictionary<int, string>> Process1(){var bufferBlock = new BufferBlock<Dictionary<int, string>>();var actionBlock = new ActionBlock<string>(x =>{Console.WriteLine($"Process1 处理中:{x}");Thread.Sleep(5000);var dic = new Dictionary<int, string> { { 0, x } };dic.Add(1, "Process1");bufferBlock.Post(dic);}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});actionBlock.Completion.ContinueWith(_ =>{Console.WriteLine($"Process1 Complete,State{_.Status}");bufferBlock.Complete();});return DataflowBlock.Encapsulate(actionBlock, bufferBlock);}private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2(){var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic =>{Console.WriteLine($"Process2 处理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(2, "Process2");return dic;}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});block.Completion.ContinueWith(_ =>{Console.WriteLine($"Process2 Complete,State{_.Status}");});return block;}private ITargetBlock<Dictionary<int, string>> Process3(){var actionBlock = new ActionBlock<Dictionary<int, string>>(dic =>{Console.WriteLine($"Process3 处理中:{dic.First().Value}");Thread.Sleep(5000);dic.Add(3, "Process3");Console.WriteLine("Dic中的内容如下:");foreach (var item in dic){Console.Write($"{item.Key}:{item.Value}||");}Console.WriteLine();}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = _maxDegreeOfParallelism});return actionBlock;}}
}

Main方法如下:

static void Main(string[] args){Console.WriteLine("请输入管道并发数:");if (int.TryParse(Console.ReadLine(), out int max)){var pipeline = new Pipeline(max);var task = pipeline.Builder();pipeline.Process(new[] { "码", "农", "阿", "宇" });task.Wait();Console.ReadKey();}}

测试运行如图:

我来解释一下,为什么是这么运行的,因为把管道的并行度设置为2,所以每个Block可以同时处理两个任务,所以,如果给管道传入四个字符 ,每个字符作为一个任务,假设传入  “码农阿宇”四个任务,会时这样的一个过程…..

  1. 码   农  两个首先进入Process1,
  2. 处理完成后,码  农   两个任务流出,
  3. Process1位置空出来, 阿  宇 两个任务流入 Process1,
  4. 码  农 两个任务流向 Process2,
  5. 阿  宇 从 Process1 处理完成后流出,此时Process1任务完成
  6. 码  农 流出 Process2 ,同时 阿 宇  流入 Process2 ……
  7. 依此类推….

该项目Github地址: https://github.com/liuzhenyulive/Tpl-Dataflow-Demo

参考文献:https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

码字不易,如果对您有用,欢迎推荐和关注,谢谢

.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow相关推荐

  1. Asp.Net Core中利用Seq组件展示结构化日志功能

    在一次.Net Core小项目的开发中,掌握的不够深入,对日志记录并没有好好利用,以至于一出现异常问题,都得跑动服务器上查看,那时一度怀疑自己肯定没学好,不然这一块日志不可能需要自己扒服务器日志来查看 ...

  2. 在ASP.NET Core中使用的ML.NET模型构建器入门

    目录 介绍 背景 先决条件 使用代码 第1步-创建ASP.NET Core应用程序 步骤2:使用ML.NET Model Builder 数据 训练 评估 代码 步骤3:将ML.NET添加到ASP.N ...

  3. 三分钟总览微软任务并行库TPL

    点击上方蓝字进行关注 有小伙伴问我每天忽悠的TPL是什么? ☹️ 这次站位高一点,严肃讲一讲. 引言 俗话说,不想开飞机的程序员不是一名好爸爸:作为微软技术栈的老鸟,一直将代码整洁之道奉为经典, 优秀 ...

  4. 微软并行库初体验之TPL

    前端时间因为要做个大数据量分析,所以用C#写了个脚本跑,不过由于算法复杂度问题,初步估计需要40小时才能跑完.为了加快运算,我一开始想到了 并行计算,利用MPICH或其他类似的分布式计算框架开发,不过 ...

  5. Py之matplotlib:在matplotlib库中利用legend函数创建自定义图例(代码实现)

    Py之matplotlib:在matplotlib库中利用legend函数创建自定义图例(代码实现) 目录 matplotlib库中利用legend函数创建自定义图例 原始图像 在原始图像上创建自定义 ...

  6. Delphi XE7中新并行库

    Delphi XE7中添加了新的并行库,和.NET的Task和Parellel相似度99%. 详细内容能够看以下的文章: http://www.delphifeeds.com/go/s/119574 ...

  7. .NET Core中的CSV解析库

    感谢 本篇首先特别感谢从此启程兄的<.NetCore外国一些高质量博客分享>, 发现很多国外的.NET Core技术博客资源, 我会不定期从中选择一些有意思的文章翻译总结一下. .NET ...

  8. matlab2018中变压器模块,利用MATLAB中Sim+Power+Systems模库时变压器模型的参数计算及其仿真结果比较...

    [实例简介] 变压器模型 matlab 仿真 参数计算 第21卷第1期向秋风,等:利用 MATLAB中 Sim Power System模库时变压器模型的参数计算及其仿真结果比较 17 其标幺值:R= ...

  9. .NET Core 中的并发编程

    今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点. 然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序 ...

最新文章

  1. 为什么做小程序的时候要做定制开发小程序?
  2. 粒子群优化算法_每日论文19:粒子群优化算法综述
  3. Springboot整合shiro基于url身份认证和授权认证
  4. 中天数相减获得差_Power BI 了解DAX中LASTDATE和MAX之间的区别
  5. php header 404 nginx,ThinkPHP在nginx下怎么设置?路由统统404,疯了~
  6. Oracle用rowid删除同一张表的重复记录
  7. 集群的可扩展性及其分布式体系结构(2)-下
  8. javascript 的module 模块化
  9. Atitit code for biz lst idx项目分析法,包括模块分析,与模块位置idx数据库分析 数据表的分类 日志表不断增长(包括用户表,订单表等)。。元数据表表 基本不增长。。。
  10. windows 2003 server安装iis6,附下载文件
  11. linux 查看网卡以及开启网卡
  12. Excel表格撤销工作表保护
  13. Java文件操作、IO流
  14. 使用jquery生成随机二维码的方法
  15. 数仓工具—Hive源码之Beeline/HiveCli(4)
  16. 液晶面板里面有些什么配件_液晶模组和液晶面板有什么区别?
  17. C# 调用钉钉接口进行发送企业通知消息,适应于网页版
  18. 计算机输入法设计大赛,搜狗输入法皮肤设计大赛获奖作品
  19. 向大家推荐一部小说《鉴鬼实录》
  20. 探讨一下如何防止撞库。

热门文章

  1. LaTeX技巧006:使用pdfLaTeX时,添加PDF文件属性的方法
  2. 创建用于 ASP.NET 的分页程序控件
  3. 微信小游戏 Egret开发数据域官方Demo下载地址
  4. python hashlib模块
  5. OGG 跳过事务(转)
  6. [Java][Android] 多线程同步-主线程等待全部子线程完毕案例
  7. ZLMS教学管理平台系统V1.2.0最新版本发布,支持纯Web视频直播点播,还带运营在线支付功能!完全免费提供!...
  8. 无法安装或运行应用程序。该应用程序要求首先在“全局程序集缓存(gac)”中安装程序集system.data.entity...
  9. linux下搭建mrbs会议室预定管理系统
  10. 图像分析:投影曲线的波峰查找