本节主要内容

  1. Scala并发编程简介
  2. Scala Actor并发编程模型
  3. react模型
  4. Actor的几种状态
  5. Actor深入使用解析

1. Scala并发编程简介

2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 “内核”(虚拟 CPU)所取代。这一发现对编程社区造成了不小的冲击,因为正确创建线程安全的代码,在理论而非实践中,始终会提高高性能开发人员的身价,而让各公司难以聘用他们。看上去,仅有少数人充分理解了 Java 的线程模型、并发 API 以及 “同步” 的含义,以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在(来源:http://www.ibm.com/developerworks/cn/java/j-scala02049.html)。

在Java中,要编写一个线程安全的程序并不是一件易事,例如:

class Account {  private int balance;  synchronized public int getBalance() {  return balance;  }  synchronized public void incrementBalance() {  balance++;  }
}  

上面这段java代码虽然方法前面加了synchronized ,但它仍然不是线程安全的,例如,在执行下面两个语句

account.incrementBalance();
account.getBalance();

时,有可能account.incrementBalance()执行完成后,其它线程可能会获取对象的锁,修改account的balance,从而造成得不到预期结果的问题。解决问题的方法是将两个功能结合起来形成一个方法:

synchronized public int incrementAndGetBalance() {  balance++;  return balance;
}  

但这可能并不是我们想要的,每次获取balance都要将balance增加, 这显然与实际不符。除此之外,java中的并发编程可能还会经常遇到死锁问题,而这个问题往往难调试,问题可能会随机性的出现。总体上来看,java的并发编程模型相对较复杂,难以驾驭。

Scala很好地解决了java并发编程的问题,要在scala中进行并发编程,有以下几种途径可以实现:
1 actor消息模型、akka actor并发模型。

2 Thread、Runnable

3 java.util.concurennt

4 第三方开源并发框架如Netty,Mina

在上述四种途径当中,利用 actor消息模型、akka actor并发模型是scala并发编程的首先,本节主要介绍actor消息模型,akka actor并发模型我们将放在后面的章节中介绍。
在scala中,通过不变对象来实现线程安全,涉及到修改对象状态时,则创建一个新的对象来实现,如:

//成员balance状态一旦被赋值,便不能更改
//因而它也是线程安全的
class Person(val age: Integer) {  def getAge() = age
}  object Person{  //创建新的对象来实现对象状态修改def increment(person: Person): Person{  new Person(Person.getAge() + 1)  }
}  

通过不变对象实现并发编程,可以简化编程模型,使并发程序更容易现实和控制。

2.Scala Actor并发编程模型

java中的并发主要是通过线程来实现,各线程采用共享资源的机制来实现程序的并发,这里面临竞争资源的问题,虽然采用锁机制可以避免竞争资源的问题,但会存在死锁问题,要开发一套健壮的并发应用程序具有一定的难度。而scala的并发模型相比于java它更简单,它采用消息传递而非资源共享来实现程序的并发,消息传递正是通过Actor来实现的。下面的代码给出了Actor使用示例

//混入Actor特质,然后实现act方法
//如同java中的Runnable接口一样
//各线程的run方法是并发执行的
//Actor中的act方法也是并发执行的
class ActorDemo extends Actor{//实现 act()方法def act(){while(true){//receive从邮箱中获取一条消息//然后传递给它的参数//该参数是一个偏函数receive{case "actorDemo" => println("receive....ActorDemo")}      }}
}
object ActorDemo extends App{val actor=new ActorDemo//启动创建的actor actor.start()//主线程发送消息给actoractor!"actorDemo"
}

下面给的是recieve方法的部分源代码

def receive[R](f: PartialFunction[Any, R]): R = {assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")synchronized {if (shouldExit) exit() // linksdrainSendBuffer(mailbox)}var done = falsewhile (!done) {val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {senders = replyTo :: sendersval matches = f.isDefinedAt(m)senders = senders.tailmatches})
................

从上述代码中不能看出,receive方法接受的参数是一个偏函数,并且是通过mailbox来实现消息的发送与接收。

在前述的class ActorDemo中,receive方法的参数为

{case "actorDemo" => println("receive....ActorDemo")
}      

该代码块在执行时被转换为一个PartialFunction[Any, R]的偏函数,其中R是偏函数的返回类型,对应case 语句=> 右边的部分,在本例子中R是Unit类型,而Any对应的则对应case语句的模式部分。

前面给的是通过extends Actor的方式来创建一个Actor类,其实scala.actors.Actor中提供了一个actor工具方法,可以非常方便地直接创建Actor对象如:

import scala.actors.Actor._object ActorFromMethod extends App{//通过工具方法actor直接创建Actor对象val methodActor = actor {for (i <- 1 to 5)println("That is the question.")Thread.sleep(1000)}
}

上述代码创建的actor对象无需调用start方法,对象创建完成后会立即执行。

scala中本地线程也可用作Actor,下面的代码演示了如何在REPL命令行中将本地线程当作Actor;

scala> import scala.actors.Actor._
import scala.actors.Actor._//self引用本地线程,并发送消息
scala> self ! "hello"
//接收消息
scala> self.receive { case x:String => x }
res1: String = hello

上述代码中,如果发送的消息不是String类型的,线程将被阻塞,为避免这个问题,可以采用receiveWithin方法,

scala> self ! 123scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT

3. react模型

scala中的Actor也是构建在java线程基础之上的,前面在使用Actor时都是通过创建Actor对象,然后再调用act方法来启动actor。我们知道,java中线程的创建、销毁及线程间的切换是比较耗时的,因此实际中尽量避免频繁的线程创建、销毁和销毁。Scala中提供React方法,在方法执行结束后,线程仍然被保留。下面的代码演示了react方法的使用:

package cn.scala.xtwy.concurrency
import scala.actors._object NameResolver extends Actor {import java.net.{ InetAddress, UnknownHostException }def act() {react {//匹配主线程发来的("www.scala-lang.org", NameResolver)case (name: String, actor: Actor) =>//向actor发送解析后的IP地址信息//由于本例中传进来的actor就是NameResolver自身//也即自己给自己发送消息,并存入将消息存入邮箱actor ! getIp(name)//再次调用act方法,试图从邮箱中提取信息//如果邮箱中信息为空,则进入等待模式act()case "EXIT" =>println("Name resolver exiting.")// quit//匹配邮箱中的单个信息,本例中会匹配邮箱中的IP地址信息case msg =>println("Unhandled message: " + msg)act()}}def getIp(name: String): Option[InetAddress] = {try {Some(InetAddress.getByName(name))} catch {case _: UnknownHostException => None}}
}
object Main extends App{NameResolver.start()//主线程向NameResolver发送消息("www.scala-lang.org", NameResolver)NameResolver ! ("www.scala-lang.org", NameResolver)NameResolver ! ("wwwwww.scala-lang.org", NameResolver)}

从上述代码中可以看到,通过在react方法执行结束时加入act方法,方法执行完成后没有被销毁,而是继续试图从邮箱中获取信息,获取不到则等待。

4. Actor的几种状态

Actor有下列几种状态:

  • 初始状态(New),Actor对象被创建,但还没有启动即没有执行start方法时的状态
  • 执行状态(Runnable),正在执行时的状态
  • 挂起状态(Suspended),在react方法中等待时的状态
  • 时间点挂起状态(TimedSuspended),挂起状态的一种特殊形式,reactWithin方法中的等待时的状态
  • 阻塞状态(Blocked),在receive方法中阻塞等待时的状态
  • 时间点阻塞状态(TimedBlocked),在receiveWithin方法中阻塞等待时的状态
  • 结束状态(Terminated),执行完成后被销毁

5. Actor深入使用解析

1 receive方法单次执行:


object Actor2{case class Speak(line : String)case class Gesture(bodyPart : String, action : String)case class NegotiateNewContract()def main(args : Array[String]) ={val badActor =actor{//这里receive方法只会匹配一次便结束receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case _ =>System.out.println("Huh? I'll be in my trailer.")}}//receive方法只处理下面这条语句发送的消息badActor ! NegotiateNewContract//下面其余的消息不会被处理badActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")}}

上述代码只会输出:
I won’t do it for less than $1 million!
即后面发送的消息如:
badActor ! Speak(“Do ya feel lucky, punk?”)
badActor ! Gesture(“face”, “grimaces”)
badActor ! Speak(“Well, do ya?”)
不会被处理。这是因为receive方法的单次执行问题。

2 能够处理多个消息的receive方法:

object Actor2{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract()//处理结束消息case class ThatsAWrap()def main(args : Array[String]) ={val badActor =actor{var done = false//while循环while (! done){receive{case NegotiateNewContract =>System.out.println("I won't do it for less than $1 million!")case Speak(line) =>System.out.println(line)case Gesture(bodyPart, action) =>System.out.println("(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println("Great cast party, everybody! See ya!")done = truecase _ =>System.out.println("Huh? I'll be in my trailer.")}}}//下面所有的消息都能被处理badActor ! NegotiateNewContractbadActor ! Speak("Do ya feel lucky, punk?")badActor ! Gesture("face", "grimaces")badActor ! Speak("Well, do ya?")//消息发送后,receive方法执行完毕badActor ! ThatsAWrap}}

3 Actor后面实现原理仍然是线程的证据

object Actor3{case class Speak(line : String);case class Gesture(bodyPart : String, action : String);case class NegotiateNewContract;case class ThatsAWrap;def main(args : Array[String]) ={def ct ="Thread " + Thread.currentThread().getName() + ": "val badActor =actor{var done = falsewhile (! done){receive{case NegotiateNewContract =>System.out.println(ct + "I won't do it for less than $1 million!")case Speak(line) =>System.out.println(ct + line)case Gesture(bodyPart, action) =>System.out.println(ct + "(" + action + "s " + bodyPart + ")")case ThatsAWrap =>System.out.println(ct + "Great cast party, everybody! See ya!")done = truecase _ =>System.out.println(ct + "Huh? I'll be in my trailer.")}}}System.out.println(ct + "Negotiating...")badActor ! NegotiateNewContractSystem.out.println(ct + "Speaking...")badActor ! Speak("Do ya feel lucky, punk?")System.out.println(ct + "Gesturing...")badActor ! Gesture("face", "grimaces")System.out.println(ct + "Speaking again...")badActor ! Speak("Well, do ya?")System.out.println(ct + "Wrapping up")badActor ! ThatsAWrap}}

执行结果如下:

Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!

从上述执行结果可以看到,Actor最终的实现仍然是线程,只不过它提供的编程模型与java中的编程模型不一样而已。

4 利用!?发送同步消息,等待返回值


import scala.actors._,Actor._object ProdConSample2{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//获取响应值val r= consumer !? "Mares eat oats"println("replyed message"+r)System.out.println("Sending....")consumer !? "Does eat oats"System.out.println("Sending....")consumer !? "Little lambs eat ivy"System.out.println("Sending....")consumer !? "Kids eat ivy too"System.out.println("Sending....")consumer !? "DONE"      }}

代码执行结果:

Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE

通过上述代码执行结果可以看到,!?因为是同步消息,发送完返回结果后才会接着发送下一条消息。

5 Spawn方法发送消息

object ProdConSampleUsingSpawn{import concurrent.ops._def main(args : Array[String]) : Unit ={// Spawn Consumerval consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("MESSAGE RECEIVED: " + msg)done = (msg == "DONE")reply("RECEIVED")}}}// Spawn Producerspawn  //spawn是一个定义在current.ops中的方法{val importantInfo : Array[String] = Array("Mares eat oats","Does eat oats","Little lambs eat ivy","A kid will eat ivy too","DONE");importantInfo.foreach((msg) => consumer !? msg)}}}

6 !! 发送异步消息,返回值是 Future[Any]

object ProdConSample3{case class Message(msg : String)def main(args : Array[String]) : Unit ={val consumer =actor{var done = falsewhile (! done){receive{case msg =>System.out.println("Received message! -> " + msg)done = (msg == "DONE")reply("Already RECEIVED....."+msg)}}}System.out.println("Sending....")//发送异步消息,返回val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()println("replyed message*****"+r)System.out.println("Sending....")consumer !! "Does eat oats"System.out.println("Sending....")consumer !! "Little lambs eat ivy"System.out.println("Sending....")consumer !! "Kids eat ivy too"System.out.println("Sending....")consumer !! "DONE"      }}

执行结果:

Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE

通过上述代码的执行结果可以看到,!!的消息发送是异步的,消息发送后无需等待结果返回便执行下一条语句,但如果要获取异步消息的返回值,如:

 val replyFuture= consumer !! "Mares eat oats"val r=replyFuture()

则执行到这两条语句的时候,程序先被阻塞,等获得结果之后再发送其它的异步消息。

7 loop方法实现react

object LoopReact extends App{val a1 = Actor.actor {//注意这里loop是一个方法,不是关键字//实现类型while循环的作用loop {react {//为整型时结束操作case x: Int=>println("a1 stop: " + x); exit()case msg: String => println("a1: " + msg)}}}a1!("我是摇摆少年梦")a1.!(23)}

Scala入门到精通——第二十六节 Scala并发编程基础相关推荐

  1. Scala入门到精通——第二十九节 Scala数据库编程

    本节主要内容 Scala Mavenproject的创建 Scala JDBC方式訪问MySQL Slick简单介绍 Slick数据库编程实战 SQL与Slick相互转换 本课程在多数内容是在官方教程 ...

  2. Scala入门到精通——第二十四节 高级类型 (三)

    本节主要内容 Type Specialization Manifest.TypeTag.ClassTag Scala类型系统总结 在Scala中,类(class)与类型(type)是两个不一样的概念. ...

  3. Scala入门到精通——第十六节 泛型与注解

    本节主要内容 泛型(Generic Type)简介 注解(Annotation)简介 注解常用场景 1. 泛型(Generic Type)简介 泛型用于指定方法或类可以接受任意类型参数,参数在实际使用 ...

  4. Scala入门到精通——第二十五节 提取器(Extractor)

    本节主要内容 apply与unapply方法 零变量或变量的模式匹配 提取器与序列模式 scala中的占位符使用总结 1. apply与unapply方法 apply方法我们已经非常熟悉了,它帮助我们 ...

  5. Scala入门到精通——第十四节 Case Class与模式匹配(一)

    本节主要内容 模式匹配入门 Case Class简介 Case Class进阶 1. 模式匹配入门 在Java语言中存在switch语句,例如: //下面的代码演示了java中switch语句的使用 ...

  6. Scala入门到精通——第二十八节 Scala与JAVA互操作

    本节主要内容 JAVA中调用Scala类 Scala中调用JAVA类 Scala类型参数与JAVA泛型互操作 Scala与Java间的异常处理互操作 1. JAVA中调用Scala类 Java可以直接 ...

  7. Scala入门到精通——第二十节 类型参数(二)

    本节主要内容 Ordering与Ordered特质 上下文界定(Context Bound) 多重界定 类型约束 1. Ordering与Ordered特质 在介绍上下文界定之前,我们对Scala中的 ...

  8. Scala入门到精通——第十九节 隐式转换与隐式参数(二)

    本节主要内容 隐式参数中的隐式转换 函数中隐式参数使用概要 隐式转换问题梳理 1. 隐式参数中的隐式转换 前一讲中,我们提到函数中如果存在隐式参数,在使用该函数的时候如果不给定对应的参数,则编译器会自 ...

  9. Scala入门到精通——第十五节 Case Class与模式匹配(二)

    本节主要内容 模式匹配的类型 for控制结构中的模式匹配 option类型模式匹配 1. 模式的类型 1 常量模式 object ConstantPattern{def main(args: Arra ...

最新文章

  1. 中国矿业大学计算机控制技术英语,中国矿业大学计算机控制系统参考试卷4及答案.pdf...
  2. 论初始值的重要性-仅仅是更改初始值loss差别就非常大
  3. 【Canal】互联网背景下有哪些数据同步需求和解决方案?看完我知道了!!
  4. mybatis简单案例源码详细【注释全面】——测试层(UserMapperTest.java)
  5. 爬虫之request
  6. Educational Codeforces Round 18
  7. UK Biobank专题
  8. 机器学习5-支持向量机
  9. 【leetcode_easy】590. N-ary Tree Postorder Traversal
  10. 什么时候都要记得:生活愈是往下,嘴角愈要上扬
  11. Friendster,linkedin,orkut,liring对SNS的求索
  12. 软件测试工具介绍 (静态测试工具和动态测试工具)
  13. App列表之下拉刷新
  14. MAC安装视频播放器MPV
  15. 数据库原理及应用(东南大学)笔记——第二章 数据模型
  16. CRMEB-知识付费系统程序配置之直播配置(方法二下)
  17. 光场相机重聚焦--焦点堆栈深度估计法
  18. MDK_EventRecorder
  19. *CF1216F. Wi-Fi (dp)
  20. LSH(Locality Sensitive Hashing)基本思想

热门文章

  1. python 字符串拼接_Python字符串拼接的6种方法(转)
  2. linux终端命令教程,Linux终端命令入坑技巧
  3. [java]常用类型转化
  4. 用JAVAMAIL发送邮件的一个简单例子
  5. java api接口报500_应用程序编程接口API,我们来聊一聊这个熟悉的名词
  6. python中字符串的制表符为_零基础学python_03_字符串(拼接+换行+制表符)
  7. 安卓编程用什么软件_震惊!安卓IOS都可以用的牛逼软件
  8. java自定义findbugs规则_静态代码扫描 (三)——FindBugs 自定义规则入门
  9. mysql将大表定时转储_mysql数据库数据定时封装转储
  10. php抢购排队是怎样做的,基于swoole的抢购排队通用中间件,适合抢购秒杀场景,跟具体业务解耦...