泛函编程(19)-泛函库设计-Parallelism In Action
上节我们讨论了并行运算组件库的基础设计,实现了并行运算最基本的功能:创建新的线程并提交一个任务异步执行。并行运算类型的基本表达形式如下:
1 import java.util.concurrent._2 object Par {3 type Par[A] = ExecutorService => Future[A]4 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)5 //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch71.Par.Par[A])java.6 //| util.concurrent.Future[A]7 def unit[A](a: A): Par[A] = {8 es => new Future[A] {9 def get = a
10 def get(t: Long, u: TimeUnit) = get
11 def isDone = true
12 def isCancelled = false
13 def cancel(evenIsRunning: Boolean) = false
14 }
15 } //> unit: [A](a: A)ch71.Par.Par[A]
16 def fork[A](pa: Par[A]): Par[A] = { //注意这里有个错误?
17 es => es.submit(new Callable[A] {
18 def call: A = run(es)(pa).get
19 })
20 }
21 def async[A](a: => A): Par[A] = fork(unit(a))
22
23 }
实际上我们已经实现了两项最基本的函数:
1、unit[A](a: A): Par[A] : 我们硬生生的按照Par的类型款式造了一个Future实例,这样我们才可以用Future.get的形式读取运算结果值。看看这个例子:unit(42+1),在调用函数unit时由于传入参数是即时计算的,所以在进入unit前已经完成了计算结果43。然后人为的把这个结果赋予Future.get,这样我们就可以和真正的由ExecutorService返回的Future一样用同样的方式读取结果。所以说unit纯粹是一个改变格式的升格函数,没有任何其它作用。
2、async[A](a: => A): Par[A]:这个async函数把表达式a提交到主线程之外的另一个线程。新的线程由ExecutorService提供,我们无须理会,这样可以实现线程管理和并行运算组件库的松散耦合。由于async的传人函数是延后计算类型,所以我们可以把表达式a提交给另一个线程去运算。
那么我们用例子来示范一下:
1 val es = Executors.newCachedThreadPool() //线程由jvm提供,我们无须理会2 //> es : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool3 //| Executor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks =4 //| 0, completed tasks = 0]5 val a = unit({println(Thread.currentThread.getName); 42+1})6 //> main7 //| a : ch71.Par.Par[Int] = <function1>8 val b = async({println(Thread.currentThread.getName); 42+1})9 //> main
10 //| b : ch71.Par.Par[Int] = <function1>
11 run(es)(a).get //> res0: Int = 43
12 run(es)(b).get //> res1: Int = 43
13 es.shutdown()
看到问题了吗?用run运算a,b时没有显示println,而这个println在申明val a, val b 时已经执行了。对unit这可以理解:参数是即时计算的,所以println和结果43都在进入函数之前运算了(然后放到Future.get)。但是async的参数不是延迟计算的吗?我们再看清楚:async(a: => A) >>> fork(unit(a)),到fork函数参数unit(a)就立即计算了。所以 fork(pa: => Par[A])才可以保证在提交任务前都不会计算表达式a。我们必须把fork的函数款式改一下:
1 def fork[A](pa: => Par[A]): Par[A] = {
2 es => es.submit(new Callable[A] {
3 def call: A = run(es)(pa).get
4 })
5 } //> fork: [A](pa: ch71.Par.Par[A])ch71.Par.Par[A]
再运行一下例子:
1 val es = Executors.newCachedThreadPool() //线程由jvm提供,我们无须理会2 //> es : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool3 //| Executor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks =4 //| 0, completed tasks = 0]5 val a = unit({println(Thread.currentThread.getName); 42+1})6 //> main7 //| a : ch71.Par.Par[Int] = <function1>8 val b = async({println(Thread.currentThread.getName); 42+1})9 //> b : ch71.Par.Par[Int] = <function1>
10 run(es)(a).get //> res0: Int = 43
11 run(es)(b).get //> pool-1-thread-1
12 //| res1: Int = 43
13 es.shutdown()
看看结果:unit在主线程main运行,而async则在pool-1-thread-1这个非主线程内运行。
实现异步运算才是并行运算的第一步。并行运算顾名思义就是把一个大任务分解成几个较小任务然后同时异步运算后再把结果结合起来。我们用伪代码描述一下并行运算思路:
1 //伪代码
2 val big10sencondJob = ??? //一个10秒运算
3 val small5sJob1 = split big10sencondJob in half //分解成两个 5秒运算
4 val small5sJob2 = split big10sencondJob in half //分解成两个 5秒运算
5 val fa = run small5sJob1 //立即返回future 但开始运算 5 秒
6 val fb = run small5sJob2 //立即返回future 但开始运算 5 秒
7 val sum = fa.get + fb.get //等待5秒后可以得出结果
看来用以上方式是可以得到并行运算的效果(10秒到5秒区别)。但我们采用了串指令(imperative)方式实现。当然我们必须考虑用泛函方式来实现并行运算的启动及结果抽取。
先用泛函方式启动并行运算。如果我们并行启动两个运算:
1 def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C]
map2并行启动pa,pb然后把它们的结果用函数f结合。看起来很美。那么我们先试着把它实现了:
1 def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {2 import TimeUnit.NANOSECONDS3 es => new Future[C] {4 val fa = run(es)(pa) //在这里按pa的定义来确定在那个线程运行。如果pa是fork Par则在非主线程中运行5 val fb = run(es)(pb)6 def get = f(fa.get, fb.get)7 def get(timeOut: Long, timeUnit: TimeUnit) = {8 val start = System.nanoTime9 val a = fa.get
10 val end = System.nanoTime
11 //fa.get用去了一些时间。剩下给fb.get的timeout值要减去
12 val b = fb.get(timeOut - timeUnit.convert((end - start), NANOSECONDS) , timeUnit)
13 f(a,b)
14 }
15 def isDone = fa.isDone && fb.isDone
16 def isCancelled = fa.isCancelled && fb.isCancelled
17 def cancel(evenIsRunning: Boolean) = fa.cancel(evenIsRunning) || fb.cancel(evenIsRunning)
18 }
19 } //> map2: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C)ch
20 //| 71.Par.Par[C]
在map2的实现里我们人为地建了个Future[C]。但在建的过程中我们运行了pa,pb的计算。如果我们对pa或pb有运算超时要求的话,就必须计算每次运算所使用的时间。所以Future[C]是符合pa,pb的运算要求的。
我们先试着同时运算41+2,33+4两个计算:
1 val es = Executors.newCachedThreadPool() //线程由jvm提供,我们无须理会2 //> es : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoo3 //| lExecutor@19dfb72a[Running, pool size = 0, active threads = 0, queued tasks4 //| = 0, completed tasks = 0]5 map2(async({println(Thread.currentThread.getName); 41+2}),6 async({println(Thread.currentThread.getName); 33+4}))7 {(a,b) => {println(Thread.currentThread.getName); a+b}}(es).get8 //> pool-1-thread-19 //| pool-1-thread-2
10 //| main
11 //| res0: Int = 80
啊!pa,pb分别在不同的非主线程中运行了。但函数f的运行是在主线程main中运行的。我们试着把这个也放到非主线程中:
1 fork { map2(async({println(Thread.currentThread.getName); 41+2}),
2 async({println(Thread.currentThread.getName); 33+4}))
3 {(a,b) => {println(Thread.currentThread.getName); a+b}}}(es).get
4 //> pool-1-thread-2
5 //| pool-1-thread-3
6 //| pool-1-thread-1
7 //| res0: Int = 80
现在所有的计算都是在不同的非主线程中运算的了,清楚了吧。
两个以上并行运算可以通过map2来实现:
1 def map3[A,B,C,D](pa: Par[A], pb: Par[B], pc: Par[C])(f: (A,B,C) => D): Par[D] = {2 map2(pa,map2(pb,pc){(b,c) => (b,c)}){(a,bc) => {3 val (b,c) = bc4 f(a,b,c)5 }}6 }7 def map4[A,B,C,D,E](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D])(f: (A,B,C,D) => E): Par[E] = { //| 71.Par.Par[C]8 map2(pa,map2(pb,map2(pc,pd){(c,d) => (c,d)}){(b,cd) => (b,cd)}){(a,bcd) => {9 val (b,(c,d)) = bcd
10 f(a,b,c,d)
11 }}
12 }
13 def map5[A,B,C,D,E,F](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D], pe: Par[E])(f: (A,B,C,D,E) => F): Par[F] = { //| 71.Par.Par[C]
14 map2(pa,map2(pb,map2(pc,map2(pd,pe){(d,e) => (d,e)}){(c,de) => (c,de)}){(b,cde) => (b,cde)}){(a,bcde) => {
15 val (b,(c,(d,e))) = bcde
16 f(a,b,c,d,e)
17 }}
18 }
再看个例子:如果一个并行运算的表达式是个List[Int],即 Par[List[Int]]。 如何对内部的List[Int]进行排序?
1 //我们可以run pa, get list 后进行排序,然后再封装进Future[List[Int]]2 def sortPar(pa: Par[List[Int]]): Par[List[Int]] = {3 es => {4 val l = run(es)(pa).get5 new Future[List[Int]] {6 def get = l.sorted7 def isDone = true8 def isCancelled = false9 def get(t: Long, u: TimeUnit) = get
10 def cancel(e: Boolean) = false
11 }
12 }
13 }
14 //也可以用map2来实现。因为map2可以启动并行运算,也可以对par内元素进行操作。但操作只针对一个par,
15 //我们用unit(())替代第二个par。现在我们可以对一个par的元素进行操作了
16 def sortedPar(pa: Par[List[Int]]): Par[List[Int]] = {
17 map2(pa,unit(())){(a,_) => a.sorted}
18 }
19 //map是对一个par的元素进行变形操作,我们同样可以用map2实现了
20 def map[A,B](pa: Par[A])(f: A => B): Par[B] = {
21 map2(pa,unit(())){(a,_) => f(a) }
22 }
23 //然后用map去对Par[List[Int]]排序
24 def sortParByMap(pa: Par[List[Int]]): Par[List[Int]] = {
25 map(pa){_.sorted}
26 }
看看运行结果:
1 sortPar(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
2 //> pool-1-thread-1
3 //| res3: List[Int] = List(1, 2, 3, 4)
4 sortParByMap(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
5 //> pool-1-thread-1
6 //| res4: List[Int] = List(1, 2, 3, 4)
实际上map2做了两件事:启动了两个并行运算、对运算结果进行了处理。这样说map2是可以被分解成更基本的组件函数:
1 //启动两项并行运算2 def product[A,B](pa: Par[A], pb: Par[B]): Par[(A,B)] = {3 es => unit((run(es)(pa).get, run(es)(pb).get))(es)4 } //> product: [A, B](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])ch71.Par.Par[(A, B5 //| )]6 //处理运算结果7 def map[A,B](pa: Par[A])(f: A => B): Par[B] = {8 es => unit(f(run(es)(pa).get))(es)9 } //> map: [A, B](pa: ch71.Par.Par[A])(f: A => B)ch71.Par.Par[B]
10 //再组合map2
11 def map2_pm[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {
12 map(product(pa, pb)){a => f(a._1, a._2)}
13 } //> map2_pm: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C
14 //| )ch71.Par.Par[C]
我们还可以把函数A => B转换成A => Par[B],意思是把对A的运算变成并行运算Par[B]:
1 def asyncF[A,B](f: A => B): A => Par[B] = a => async(f(a))
2 //> asyncF: [A, B](f: A => B)A => ch71.Par.Par[B]
用asyncF应该可以把对一个List的处理函数变成并行运算:
1 def parMap[A,B](as: List[A])(f: A => B): Par[List[B]]
用 map(as){asyncF(f)}可以得到List[Par[B]]。再想办法List[Par[B]] >>> Par[List[B]],这不就是我们经常遇到的那个sequence函数的类型款式吗。那我们就先实现了par的sequence函数吧:
1 //用递归法实现2 def sequence_r[A](lp: List[Par[A]]): Par[List[A]] = {3 lp match {4 case Nil => unit(List())5 case h::t => map2(h,fork(sequence_r(t))){_ :: _}6 }7 } //> sequence_r: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]8 //用foldLeft9 def sequenceByFoldLeft[A](lp: List[Par[A]]): Par[List[A]] = {
10 lp.foldLeft(unit[List[A]](Nil)){(t,h) => map2(h,t){_ :: _}}
11 } //> sequenceByFoldLeft: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
12 //用foldRight
13 def sequenceByFoldRight[A](lp: List[Par[A]]): Par[List[A]] = {
14 lp.foldRight(unit[List[A]](Nil)){(h,t) => map2(h,t){_ :: _}}
15 } //> sequenceByFoldRight: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
16 //用IndexedSeq切成两半来实现
17 def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = {
18 if (as.isEmpty) unit(Vector())
19 else if (as.length == 1) map(as.head){a => Vector(a)}
20 else {
21 val (l,r) = as.splitAt(as.length / 2)
22 map2(sequenceBalanced(l),sequenceBalanced(r)){_ ++ _}
23 }
24 } //> sequenceBalanced: [A](as: IndexedSeq[ch71.Par.Par[A]])ch71.Par.Par[IndexedS
25 def sequence[A](lp: List[Par[A]]): Par[List[A]] = { //| eq[A]]
26 map(sequenceBalanced(lp.toIndexedSeq)){_.toList}
27 }
有了sequence就可以从List[Par[A]]到Par[List[A]],实现parMap应该没问题了:
1 def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] = fork {2 val lps = as.map{asyncF(f)}3 sequence(lps)4 } //> parMap: [A, B](as: List[A])(f: A => B)ch71.Par.Par[List[B]]5 fork(parMap(List(1,2,3,4,5)){ _ + 10 })(es).get //> pool-1-thread-16 //| pool-1-thread-27 //| pool-1-thread-38 //| pool-1-thread-49 //| pool-1-thread-5
10 //| pool-1-thread-6
11 //| pool-1-thread-8
12 //| pool-1-thread-7
13 //| pool-1-thread-9
14 //| pool-1-thread-10
15 //| pool-1-thread-14
16 //| pool-1-thread-12
17 //| pool-1-thread-15
18 //| pool-1-thread-11
19 //| pool-1-thread-13
20 //| res3: List[Int] = List(11, 12, 13, 14, 15)
现在我们的并行计算组件库已经能够提供一些基本的并行运算功能了。
泛函编程(19)-泛函库设计-Parallelism In Action相关推荐
- 泛函编程(4)-深入Scala函数类
既然是泛函编程,多了解一下函数自然是免不了的了: 方法(Method)不等于函数(Function) 方法不是函数但可以转化成函数:可以手工转换或者由编译器(compiler)在适当的情况下自动转换. ...
- C++ 泛型编程/模板 泛函编程/Lambda/λ演算
1.泛型编程(C++模板) 其中,Ada, Delpha, Java, C#, Swift 称之为 泛型/generics; ML, Scala和 Haskell 称之为 参数多态/parametri ...
- 「编程面试题库」,大佬开发的一款小程序~
这是一款好朋友zone7公众号号主开发的小程序--「编程面试题库」.涉及不同语言,不同方向,总有一个适合你!值得体验!(以下为作者的原文) 介绍一下这款小程序 咱们这款小程序记录了当前主流语言的一些常 ...
- 图片读取器和皮肤库设计
图片读取器和皮肤库设计 无论是图片读取器还是皮肤库设计,其实只是点到为止,没有深入做出很好的视觉效果,请见谅. 一.实验目的 1.结合实例,熟练分析并绘制UML类图: 2.熟练使用java实现两种常见 ...
- 高级shell脚本编程之函数库、信号与陷进、文件处理、数组、安全性
高级shell脚本编程之函数库.信号与陷进.文件处理.数组.安全性 1.函数库 把所有需要用到的函数都放到一个文件中,然后每个脚本的开头包含这个文件.例如: #!/bin/sh scope(){ lo ...
- 半导体MPW、PCM、WAT、单元库设计
1.什么是集成电路设计? 集成电路设计简单的说就是设计硬件电路.设计集成电路时,设计者首先根据对电路性能和功能的要求提出设计构思.然后将这样一个构思逐步细化,利用电子设计自动化软件实现具有这些性能和功 ...
- 第二章(1):Python入门:语法基础、面向对象编程和常用库介绍
第二章(1):Python入门:语法基础.面向对象编程和常用库介绍 目录 第二章(1):Python入门:语法基础.面向对象编程和常用库介绍 1. Python 简介 1.1 Python 是什么? ...
- 可编程器件电子产品设计与制作实训台QY-GY01X
一.设备概述 QY-GY01X可编程器件电子产品设计与制作实训考核设备参照<无线电调试工>高级(三级).技师(二级)及<电子设备装接工>高级(三级).技师(二级)等国家职业标准 ...
- muduo网络库设计与实现(二)
muduo网络库设计与实现(一) 文章目录 muduo网络库设计与实现(一) base InetAddress Socket 单线程网络库 Acceptor TcpServer TcpConnecti ...
- python基于c语言开发_C高级编程:基于模块化设计思想的C语言开发 PDF 超清版
给大家带来的一篇关于C语言相关的电子书资源,介绍了关于C高级编程.模块化.设计思想.C语言开发方面的内容,本书是由机械工业出版社出版,格式为PDF,资源大小80 MB,吉星编写,目前豆瓣.亚马逊.当当 ...
最新文章
- java中Class.forName与new
- [转] vuewebpack多页面配置
- 数据拟合matlab算法
- Ruby:Hash 排序
- java对接ldap_如何使用Java操作LDAP之LDAP连接(一)
- ReviewForJob——快速排序(基于插入排序)+快速选择(快速排序变体)
- 解决 idea 中 jsp 修改后页面不生效
- 团队组成五个基本要素_【记录】综合分部宁波分队团队拓展活动
- pyjion python3.6_pyjion python3.6
- 【ELK123】ElasticSearch+Kibana
- 概率图模型——贝叶斯网络
- 烽火携手中航信斩获“十佳上云”优秀案例大奖
- Detours库配置记录
- Linux中格式化(擦除)DVD + RW / DVD-RW磁盘
- 中富之命能有多少钱_算命中富 算命的说我是中富命,谁可以帮忙解释下
- 【LTspice】009 低通、高通、带通滤波器
- Python课堂笔记之判断一个数组中是否含有数字0
- java中的数组长度的计算
- Python: 异常处理
- 【计算机网络学习笔记08】ICMP
热门文章
- ObjectIOStream 对象流 ByteArrayIOStream 数组流 内存流 ZipOutputStream 压缩流
- 虚拟机实现二层交换机_局域网SDN技术硬核内幕 5 虚拟化网络的实现
- postgresql 子查询_PostgreSQL子事务及性能分析
- 判断某值是否属于枚举类中的值_编写高质量可维护的代码之优化逻辑判断
- 彩虹物语服务器维护,11.19《彩虹物语》服务器维护及数据互通公告
- 前后落差大用什么词语_夸迪是什么“鬼”——爱上夸迪之心路历程
- 2021年春季学期-信号与系统-第五次作业参考答案-第十一移小题—MATLAB
- DRV8834用于驱动双电机
- 腾讯视频会议使用测试
- php jquery ajax输出数组吗,jquery – 从PHP返回数组时的Ajax Parse错误