Scala入门到精通——第二十六节 Scala并发编程基础
本节主要内容
- Scala并发编程简介
- Scala Actor并发编程模型
- react模型
- Actor的几种状态
- Actor深入使用解析
1. Scala并发编程简介
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 “内核”(虚拟 CPU)所取代。这一发现对编程社区造成了不小的冲击,因为正确创建线程安全的代码,在理论而非实践中,始终会提高高性能开发人员的身价,而让各公司难以聘用他们。看上去,仅有少数人充分理解了 Java 的线程模型、并发 API 以及 “同步” 的含义,以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在(来源:http://www.ibm.com/developerworks/cn/java/j-scala02049.html)。
在Java中,要编写一个线程安全的程序并不是一件易事,例如:
class Account { private int balance; synchronized public int getBalance() { return balance; } synchronized public void incrementBalance() { balance++; }
}
上面这段java代码虽然方法前面加了synchronized ,但它仍然不是线程安全的,例如,在执行下面两个语句
account.incrementBalance();
account.getBalance();
时,有可能account.incrementBalance()执行完成后,其它线程可能会获取对象的锁,修改account的balance,从而造成得不到预期结果的问题。解决问题的方法是将两个功能结合起来形成一个方法:
synchronized public int incrementAndGetBalance() { balance++; return balance;
}
但这可能并不是我们想要的,每次获取balance都要将balance增加, 这显然与实际不符。除此之外,java中的并发编程可能还会经常遇到死锁问题,而这个问题往往难调试,问题可能会随机性的出现。总体上来看,java的并发编程模型相对较复杂,难以驾驭。
Scala很好地解决了java并发编程的问题,要在scala中进行并发编程,有以下几种途径可以实现:
1 actor消息模型、akka actor并发模型。
2 Thread、Runnable
3 java.util.concurennt
4 第三方开源并发框架如Netty,Mina
在上述四种途径当中,利用 actor消息模型、akka actor并发模型是scala并发编程的首先,本节主要介绍actor消息模型,akka actor并发模型我们将放在后面的章节中介绍。
在scala中,通过不变对象来实现线程安全,涉及到修改对象状态时,则创建一个新的对象来实现,如:
//成员balance状态一旦被赋值,便不能更改
//因而它也是线程安全的
class Person(val age: Integer) { def getAge() = age
} object Person{ //创建新的对象来实现对象状态修改def increment(person: Person): Person{ new Person(Person.getAge() + 1) }
}
通过不变对象实现并发编程,可以简化编程模型,使并发程序更容易现实和控制。
2.Scala Actor并发编程模型
java中的并发主要是通过线程来实现,各线程采用共享资源的机制来实现程序的并发,这里面临竞争资源的问题,虽然采用锁机制可以避免竞争资源的问题,但会存在死锁问题,要开发一套健壮的并发应用程序具有一定的难度。而scala的并发模型相比于java它更简单,它采用消息传递而非资源共享来实现程序的并发,消息传递正是通过Actor来实现的。下面的代码给出了Actor使用示例
//混入Actor特质,然后实现act方法
//如同java中的Runnable接口一样
//各线程的run方法是并发执行的
//Actor中的act方法也是并发执行的
class ActorDemo extends Actor{//实现 act()方法def act(){while(true){//receive从邮箱中获取一条消息//然后传递给它的参数//该参数是一个偏函数receive{case "actorDemo" => println("receive....ActorDemo")} }}
}
object ActorDemo extends App{val actor=new ActorDemo//启动创建的actor actor.start()//主线程发送消息给actoractor!"actorDemo"
}
下面给的是recieve方法的部分源代码
def receive[R](f: PartialFunction[Any, R]): R = {assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")synchronized {if (shouldExit) exit() // linksdrainSendBuffer(mailbox)}var done = falsewhile (!done) {val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {senders = replyTo :: sendersval matches = f.isDefinedAt(m)senders = senders.tailmatches})
................
从上述代码中不能看出,receive方法接受的参数是一个偏函数,并且是通过mailbox来实现消息的发送与接收。
在前述的class ActorDemo中,receive方法的参数为
{case "actorDemo" => println("receive....ActorDemo")
}
该代码块在执行时被转换为一个PartialFunction[Any, R]的偏函数,其中R是偏函数的返回类型,对应case 语句=> 右边的部分,在本例子中R是Unit类型,而Any对应的则对应case语句的模式部分。
前面给的是通过extends Actor的方式来创建一个Actor类,其实scala.actors.Actor中提供了一个actor工具方法,可以非常方便地直接创建Actor对象如:
import scala.actors.Actor._object ActorFromMethod extends App{//通过工具方法actor直接创建Actor对象val methodActor = actor {for (i <- 1 to 5)println("That is the question.")Thread.sleep(1000)}
}
上述代码创建的actor对象无需调用start方法,对象创建完成后会立即执行。
scala中本地线程也可用作Actor,下面的代码演示了如何在REPL命令行中将本地线程当作Actor;
scala> import scala.actors.Actor._
import scala.actors.Actor._//self引用本地线程,并发送消息
scala> self ! "hello"
//接收消息
scala> self.receive { case x:String => x }
res1: String = hello
上述代码中,如果发送的消息不是String类型的,线程将被阻塞,为避免这个问题,可以采用receiveWithin方法,
scala> self ! 123scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT
3. react模型
scala中的Actor也是构建在java线程基础之上的,前面在使用Actor时都是通过创建Actor对象,然后再调用act方法来启动actor。我们知道,java中线程的创建、销毁及线程间的切换是比较耗时的,因此实际中尽量避免频繁的线程创建、销毁和销毁。Scala中提供React方法,在方法执行结束后,线程仍然被保留。下面的代码演示了react方法的使用:
package cn.scala.xtwy.concurrency
import scala.actors._object NameResolver extends Actor {import java.net.{ InetAddress, UnknownHostException }def act() {react {//匹配主线程发来的("www.scala-lang.org", NameResolver)case (name: String, actor: Actor) =>//向actor发送解析后的IP地址信息//由于本例中传进来的actor就是NameResolver自身//也即自己给自己发送消息,并存入将消息存入邮箱actor ! getIp(name)//再次调用act方法,试图从邮箱中提取信息//如果邮箱中信息为空,则进入等待模式act()case "EXIT" =>println("Name resolver exiting.")// quit//匹配邮箱中的单个信息,本例中会匹配邮箱中的IP地址信息case msg =>println("Unhandled message: " + msg)act()}}def getIp(name: String): Option[InetAddress] = {try {Some(InetAddress.getByName(name))} catch {case _: UnknownHostException => None}}
}
object Main extends App{NameResolver.start()//主线程向NameResolver发送消息("www.scala-lang.org", NameResolver)NameResolver ! ("www.scala-lang.org", NameResolver)NameResolver ! ("wwwwww.scala-lang.org", NameResolver)}
从上述代码中可以看到,通过在react方法执行结束时加入act方法,方法执行完成后没有被销毁,而是继续试图从邮箱中获取信息,获取不到则等待。
4. Actor的几种状态
Actor有下列几种状态:
- 初始状态(New),Actor对象被创建,但还没有启动即没有执行start方法时的状态
- 执行状态(Runnable),正在执行时的状态
- 挂起状态(Suspended),在react方法中等待时的状态
- 时间点挂起状态(TimedSuspended),挂起状态的一种特殊形式,reactWithin方法中的等待时的状态
- 阻塞状态(Blocked),在receive方法中阻塞等待时的状态
- 时间点阻塞状态(TimedBlocked),在receiveWithin方法中阻塞等待时的状态
- 结束状态(Terminated),执行完成后被销毁
5. Actor深入使用解析
1 receive方法单次执行:
object Actor2{case class Speak(line : String)case class Gesture(bodyPart : String, action : String)case class NegotiateNewContract()def main(args : Array[String]) ={val badActor =actor{//这里receive方法只会匹配一次便结束receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case _ =>System.out.println("Huh? I'll be in my trailer.")}}//receive方法只处理下面这条语句发送的消息badActor ! NegotiateNewContract//下面其余的消息不会被处理badActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")}}
上述代码只会输出:
I won’t do it for less than $1 million!
即后面发送的消息如:
badActor ! Speak(“Do ya feel lucky, punk?”)
badActor ! Gesture(“face”, “grimaces”)
badActor ! Speak(“Well, do ya?”)
不会被处理。这是因为receive方法的单次执行问题。
2 能够处理多个消息的receive方法:
object Actor2{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract()//处理结束消息case class ThatsAWrap()def main(args : Array[String]) ={val badActor =actor{var done = false//while循环while (! done){receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println("Great cast party, everybody! See ya!")done = truecase _ =>System.out.println("Huh? I'll be in my trailer.")}}}//下面所有的消息都能被处理badActor ! NegotiateNewContractbadActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")//消息发送后,receive方法执行完毕badActor ! ThatsAWrap}}
3 Actor后面实现原理仍然是线程的证据
object Actor3{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract;case class ThatsAWrap;def main(args : Array[String]) ={def ct ="Thread " + Thread.currentThread().getName() + ": "val badActor =actor{var done = falsewhile (! done){receive{case NegotiateNewContract =>System.out.println(ct + "I won't do it for less than $1 million!")case Speak(line) =>System.out.println(ct + line)case Gesture(bodyPart, action) =>System.out.println(ct + "(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println(ct + "Great cast party, everybody! See ya!")done = truecase _ =>System.out.println(ct + "Huh? I'll be in my trailer.")}}}System.out.println(ct + "Negotiating...")badActor ! NegotiateNewContractSystem.out.println(ct + "Speaking...")badActor ! Speak("Do ya feel lucky, punk?")System.out.println(ct + "Gesturing...")badActor ! Gesture("face", "grimaces")System.out.println(ct + "Speaking again...")badActor ! Speak("Well, do ya?")System.out.println(ct + "Wrapping up")badActor ! ThatsAWrap}}
执行结果如下:
Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!
从上述执行结果可以看到,Actor最终的实现仍然是线程,只不过它提供的编程模型与java中的编程模型不一样而已。
4 利用!?发送同步消息,等待返回值
import scala.actors._,Actor._object ProdConSample2{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//获取响应值val r= consumer !? "Mares eat oats"println("replyed message"+r)System.out.println("Sending....")consumer !? "Does eat oats"System.out.println("Sending....")consumer !? "Little lambs eat ivy"System.out.println("Sending....")consumer !? "Kids eat ivy too"System.out.println("Sending....")consumer !? "DONE" }}
代码执行结果:
Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE
通过上述代码执行结果可以看到,!?因为是同步消息,发送完返回结果后才会接着发送下一条消息。
5 Spawn方法发送消息
object ProdConSampleUsingSpawn{import concurrent.ops._def main(args : Array[String]) : Unit ={// Spawn Consumerval consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("MESSAGE RECEIVED: " + msg)done = (msg == "DONE")reply("RECEIVED")}}}// Spawn Producerspawn //spawn是一个定义在current.ops中的方法{val importantInfo : Array[String] = Array("Mares eat oats","Does eat oats","Little lambs eat ivy","A kid will eat ivy too","DONE");importantInfo.foreach((msg) => consumer !? msg)}}}
6 !! 发送异步消息,返回值是 Future[Any]
object ProdConSample3{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//发送异步消息,返回val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()println("replyed message*****"+r)System.out.println("Sending....")consumer !! "Does eat oats"System.out.println("Sending....")consumer !! "Little lambs eat ivy"System.out.println("Sending....")consumer !! "Kids eat ivy too"System.out.println("Sending....")consumer !! "DONE" }}
执行结果:
Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE
通过上述代码的执行结果可以看到,!!的消息发送是异步的,消息发送后无需等待结果返回便执行下一条语句,但如果要获取异步消息的返回值,如:
val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()
则执行到这两条语句的时候,程序先被阻塞,等获得结果之后再发送其它的异步消息。
7 loop方法实现react
object LoopReact extends App{val a1 = Actor.actor {//注意这里loop是一个方法,不是关键字//实现类型while循环的作用loop {react {//为整型时结束操作case x: Int=>println("a1 stop: " + x); exit()case msg: String => println("a1: " + msg)}}}a1!("我是摇摆少年梦")a1.!(23)}
Scala入门到精通——第二十六节 Scala并发编程基础相关推荐
- Scala入门到精通——第二十九节 Scala数据库编程
本节主要内容 Scala Mavenproject的创建 Scala JDBC方式訪问MySQL Slick简单介绍 Slick数据库编程实战 SQL与Slick相互转换 本课程在多数内容是在官方教程 ...
- Scala入门到精通——第二十四节 高级类型 (三)
本节主要内容 Type Specialization Manifest.TypeTag.ClassTag Scala类型系统总结 在Scala中,类(class)与类型(type)是两个不一样的概念. ...
- Scala入门到精通——第十六节 泛型与注解
本节主要内容 泛型(Generic Type)简介 注解(Annotation)简介 注解常用场景 1. 泛型(Generic Type)简介 泛型用于指定方法或类可以接受任意类型参数,参数在实际使用 ...
- Scala入门到精通——第二十五节 提取器(Extractor)
本节主要内容 apply与unapply方法 零变量或变量的模式匹配 提取器与序列模式 scala中的占位符使用总结 1. apply与unapply方法 apply方法我们已经非常熟悉了,它帮助我们 ...
- Scala入门到精通——第十四节 Case Class与模式匹配(一)
本节主要内容 模式匹配入门 Case Class简介 Case Class进阶 1. 模式匹配入门 在Java语言中存在switch语句,例如: //下面的代码演示了java中switch语句的使用 ...
- Scala入门到精通——第二十八节 Scala与JAVA互操作
本节主要内容 JAVA中调用Scala类 Scala中调用JAVA类 Scala类型参数与JAVA泛型互操作 Scala与Java间的异常处理互操作 1. JAVA中调用Scala类 Java可以直接 ...
- Scala入门到精通——第二十节 类型参数(二)
本节主要内容 Ordering与Ordered特质 上下文界定(Context Bound) 多重界定 类型约束 1. Ordering与Ordered特质 在介绍上下文界定之前,我们对Scala中的 ...
- Scala入门到精通——第十九节 隐式转换与隐式参数(二)
本节主要内容 隐式参数中的隐式转换 函数中隐式参数使用概要 隐式转换问题梳理 1. 隐式参数中的隐式转换 前一讲中,我们提到函数中如果存在隐式参数,在使用该函数的时候如果不给定对应的参数,则编译器会自 ...
- Scala入门到精通——第十五节 Case Class与模式匹配(二)
本节主要内容 模式匹配的类型 for控制结构中的模式匹配 option类型模式匹配 1. 模式的类型 1 常量模式 object ConstantPattern{def main(args: Arra ...
最新文章
- 中国矿业大学计算机控制技术英语,中国矿业大学计算机控制系统参考试卷4及答案.pdf...
- 论初始值的重要性-仅仅是更改初始值loss差别就非常大
- 【Canal】互联网背景下有哪些数据同步需求和解决方案?看完我知道了!!
- mybatis简单案例源码详细【注释全面】——测试层(UserMapperTest.java)
- 爬虫之request
- Educational Codeforces Round 18
- UK Biobank专题
- 机器学习5-支持向量机
- 【leetcode_easy】590. N-ary Tree Postorder Traversal
- 什么时候都要记得:生活愈是往下,嘴角愈要上扬
- Friendster,linkedin,orkut,liring对SNS的求索
- 软件测试工具介绍 (静态测试工具和动态测试工具)
- App列表之下拉刷新
- MAC安装视频播放器MPV
- 数据库原理及应用(东南大学)笔记——第二章 数据模型
- CRMEB-知识付费系统程序配置之直播配置(方法二下)
- 光场相机重聚焦--焦点堆栈深度估计法
- MDK_EventRecorder
- *CF1216F. Wi-Fi (dp)
- LSH(Locality Sensitive Hashing)基本思想
热门文章
- python 字符串拼接_Python字符串拼接的6种方法(转)
- linux终端命令教程,Linux终端命令入坑技巧
- [java]常用类型转化
- 用JAVAMAIL发送邮件的一个简单例子
- java api接口报500_应用程序编程接口API,我们来聊一聊这个熟悉的名词
- python中字符串的制表符为_零基础学python_03_字符串(拼接+换行+制表符)
- 安卓编程用什么软件_震惊!安卓IOS都可以用的牛逼软件
- java自定义findbugs规则_静态代码扫描 (三)——FindBugs 自定义规则入门
- mysql将大表定时转储_mysql数据库数据定时封装转储
- php抢购排队是怎样做的,基于swoole的抢购排队通用中间件,适合抢购秒杀场景,跟具体业务解耦...