Scala进阶之路-并发编程模型Akka入门篇
Scala进阶之路-并发编程模型Akka入门篇
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.Akka Actor介绍
1>.Akka介绍
写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口。
2>.Akka 中 中 Actor 模型
Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。
3>.Akaka的特点
第一:它是对并发模型进行了更高的抽象;
第二:它是异步、非阻塞、高性能的事件驱动编程模型;
第三:它是轻量级事件处理(1GB 内存可容纳百万级别个 Actor);
4>.为什么 Actor 模型是一种处理并发问题的解决方案?
处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。
5>.Maven依赖
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.org.yinzhengjie</groupId> 8 <artifactId>MyActor</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- 定义一下常量 --> 12 <properties> 13 <encoding>UTF-8</encoding> 14 <scala.version>2.11.8</scala.version> 15 <scala.compat.version>2.11</scala.compat.version> 16 <akka.version>2.4.17</akka.version> 17 </properties> 18 19 <dependencies> 20 <!-- 添加scala的依赖 --> 21 <dependency> 22 <groupId>org.scala-lang</groupId> 23 <artifactId>scala-library</artifactId> 24 <version>${scala.version}</version> 25 </dependency> 26 27 <!-- 添加akka的actor依赖 --> 28 <dependency> 29 <groupId>com.typesafe.akka</groupId> 30 <artifactId>akka-actor_${scala.compat.version}</artifactId> 31 <version>${akka.version}</version> 32 </dependency> 33 34 <!-- 多进程之间的Actor通信 --> 35 <dependency> 36 <groupId>com.typesafe.akka</groupId> 37 <artifactId>akka-remote_${scala.compat.version}</artifactId> 38 <version>${akka.version}</version> 39 </dependency> 40 </dependencies> 41 42 <!-- 指定插件--> 43 <build> 44 <!-- 指定源码包和测试包的位置 --> 45 <sourceDirectory>src/main/scala</sourceDirectory> 46 <testSourceDirectory>src/test/scala</testSourceDirectory> 47 <plugins> 48 <!-- 指定编译scala的插件 --> 49 <plugin> 50 <groupId>net.alchim31.maven</groupId> 51 <artifactId>scala-maven-plugin</artifactId> 52 <version>3.2.2</version> 53 <executions> 54 <execution> 55 <goals> 56 <goal>compile</goal> 57 <goal>testCompile</goal> 58 </goals> 59 <configuration> 60 <args> 61 <arg>-dependencyfile</arg> 62 <arg>${project.build.directory}/.scala_dependencies</arg> 63 </args> 64 </configuration> 65 </execution> 66 </executions> 67 </plugin> 68 69 70 </plugins> 71 </build> 72 </project>
自定义默认的源代码包和测试包的位置,需要手动穿件Source Root目录哟,如下图:
二.编写HelloActor
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.actor 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 10 import scala.io.StdIn 11 12 class HelloActor extends Actor{ 13 // 重写接受消息的偏函数,其功能是接受消息并处理 14 override def receive: Receive = { 15 case "你好帅" => println("竟说实话,我喜欢你这种人!") 16 case "丑八怪" => println("滚犊子 !") 17 case "stop" => { 18 context.stop(self) // 停止自己的actorRef 19 context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService) 20 } 21 } 22 } 23 24 object HelloActor { 25 /** 26 * 创建线程池对象MyFactory,用来创建actor的对象的 27 */ 28 private val MyFactory = ActorSystem("myFactory") //里面的"myFactory"参数为线程池的名称 29 /** 30 * 通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类, 31 * 第二个参数是给actor起个名字 32 */ 33 private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor") 34 35 def main(args: Array[String]): Unit = { 36 var flag = true 37 while(flag){ 38 /** 39 * 接受用户输入的字符串 40 */ 41 print("请输入您想发送的消息:") 42 val consoleLine:String = StdIn.readLine() 43 /** 44 * 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息 45 */ 46 helloActorRef ! consoleLine 47 if (consoleLine.equals("stop")){ 48 flag = false 49 println("程序即将结束!") 50 } 51 /** 52 * 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒 53 */ 54 Thread.sleep(100) 55 } 56 } 57 }
以上代码执行结果如下:
三.两个actor通信案例-模拟下棋对话
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.actor 7 8 import akka.actor.{ActorSystem, Props} 9 10 import akka.actor.{Actor, ActorRef} 11 12 /** 13 * 定义玩家1 14 */ 15 class player1Actor(val p2: ActorRef) extends Actor{ 16 // receive方法是负责处理消息的 17 override def receive: Receive = { 18 case "start" => { 19 println("棋圣:I'm OK !") 20 p2 ! "该你了" 21 } 22 case "将军" => { 23 println("棋圣:你真猛!") 24 Thread.sleep(1000) 25 p2 ! "该你了" 26 } 27 } 28 } 29 30 31 /** 32 * 定义玩家2 33 */ 34 class player2Actor extends Actor{ 35 36 override def receive: Receive = { 37 case "start" => println("棋仙说:I'm OK !") 38 case "该你了" => { 39 println("棋仙:那必须滴!") 40 Thread.sleep(1000) 41 /** 42 * 注意,这个“sender()”,其实就是对ActorRef的一个引用。它指的是给发送"该你了"的这个对象本身! 43 */ 44 sender() ! "将军" 45 } 46 } 47 } 48 49 50 object ChineseChess extends App{ 51 // 创建 actorSystem的工厂,用来生产ActorRef对象! 52 private val ChineseChessActorSystem = ActorSystem("Chinese-chess") 53 /** 54 * 通过actorSystem创建ActorRef 55 */ 56 private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2") //创建player2Actor对象 57 private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1") //创建player1Actor对象 58 59 p2 ! "start" 60 p1 ! "start" 61 }
运行以上代码输出结果如下:
四.服务端和客户端交互的小程序
1>.服务端代码
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.robot 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 11 class ServerActor extends Actor{ 12 /** 13 * receive方法是用来处理客户端发送过来的问题的 14 */ 15 override def receive: Receive = { 16 case "start" => println("天猫系统已启动...") 17 18 case ClientMessage(msg) => { 19 println(s"收到客户端消息:$msg") 20 msg match { 21 /** 22 * sender()发送端的代理对象, 发送到客户端的mailbox中 -> 客户端的receive 23 */ 24 case "你叫啥" => 25 sender() ! ServerMessage("本宝宝是天猫精灵") 26 case "你是男是女" => 27 sender() ! ServerMessage("本宝宝非男非女") 28 case "你有男票吗" => 29 sender() ! ServerMessage("本宝宝还小哟") 30 case "stop" => 31 context.stop(self) // 停止自己的actorRef 32 context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService) 33 println("天猫系统已停止...") 34 case _ => 35 sender() ! ServerMessage("对不起,主人,我不知道你在说什么.......") 36 } 37 } 38 } 39 } 40 41 object ServerActor { 42 def main(args: Array[String]): Unit = { 43 //定义服务端的ip和端口 44 val host = "127.0.0.1" 45 val port = 8088 46 /** 47 * 使用ConfigFactory的parseString方法解析字符串,指定服务端IP和端口 48 */ 49 val config = ConfigFactory.parseString( 50 s""" 51 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 52 |akka.remote.netty.tcp.hostname=$host 53 |akka.remote.netty.tcp.port=$port 54 """.stripMargin) 55 /** 56 * 将config对象传递给ActorSystem并起名为"Server",为了是创建服务端工厂对象(ServerActorSystem)。 57 */ 58 val ServerActorSystem = ActorSystem("Server", config) 59 /** 60 * 通过工厂对象创建服务端的ActorRef 61 */ 62 val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao") 63 /** 64 * 到自己的mailbox -》 receive方法 65 */ 66 serverActorRef ! "start" 67 } 68 }
2>.客户端代码
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.robot 7 8 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 11 import scala.io.StdIn 12 13 class ClientActor(host: String, port: Int) extends Actor{ 14 15 var serverActorRef: ActorSelection = _ // 服务端的代理对象 16 17 // 在receive方法之前调用 18 override def preStart(): Unit = { 19 // akka.tcp://Server@127.0.0.1:8088 20 serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao") 21 } 22 // mailbox ->receive 23 override def receive: Receive = { // shit 24 case "start" => println("2018天猫精灵为您服务!") 25 case msg: String => { // shit 26 serverActorRef ! ClientMessage(msg) // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive 27 } 28 case ServerMessage(msg) => println(s"收到服务端消息:$msg") 29 } 30 } 31 32 object ClientActor { 33 def main(args: Array[String]): Unit = { 34 35 //指定客户端的IP和端口 36 val host = "127.0.0.1" 37 val port = 8089 38 39 //指定服务端的IP和端口 40 val serverHost = "127.0.0.1" 41 val serverPort = 8088 42 43 /** 44 * 使用ConfigFactory的parseString方法解析字符串,指定客户端IP和端口 45 */ 46 val config = ConfigFactory.parseString( 47 s""" 48 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 49 |akka.remote.netty.tcp.hostname=$host 50 |akka.remote.netty.tcp.port=$port 51 """.stripMargin) 52 53 /** 54 * 将config对象传递给ActorSystem并起名为"Server",为了是创建客户端工厂对象(clientActorSystem)。 55 */ 56 val clientActorSystem = ActorSystem("client", config) 57 58 // 创建dispatch | mailbox 59 val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client") 60 61 clientActorRef ! "start" // 自己给自己发送了一条消息 到自己的mailbox => receive 62 63 /** 64 * 接受用户的输入信息并传送给服务端 65 */ 66 while (true) { 67 Thread.sleep(500) 68 /** 69 * StdIn.readLine方法是同步阻塞的 70 */ 71 val question = StdIn.readLine("请问有什么我可以帮你的吗?>>>") 72 clientActorRef ! question 73 if (question.equals("stop")){ 74 Thread.sleep(500) 75 println("程序即将结束") 76 System.exit(0) 77 } 78 } 79 } 80 }
3>.先执行服务端再执行客户端并输入相应信息测试结果如下:
客户端运行结果如下:
服务端运行结果如下:
转载于:https://www.cnblogs.com/yinzhengjie/p/9376296.html
Scala进阶之路-并发编程模型Akka入门篇相关推荐
- Scala进阶之路-面向对象编程之类的成员详解
Scala进阶之路-面向对象编程之类的成员详解 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.Scala中的object对象及apply方法 1>.scala 单例对象 ...
- 15. Scala并发编程模型Akka
15.1 Akka介绍 1) Akka是Java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解成Akka是编写并发程序的框架 2) Akka用Scala语言写成,同时提供了S ...
- 并发编程模型Akka
1)Akka是java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解为Akka是编写并发程序的框架 2)akka是用scala语言写成,同时提供了scala和java的开发接 ...
- java akka 教程_Akka与并发编程模型
Akka是一种并发编程模型的框架,其官网为http://akka.io.提供Java版本和Scala版本的API. 从学习或者使用的角度来说,我们首先要说明的是,Akka的并发编程模型(流水线模式)与 ...
- 四种并发编程模型简介
概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" • 并发 :同一时间 对待 多件事情 (逻辑层面) • 并 ...
- [ ECUG 专题回顾]《再谈 CERL:详论 GO 与 ERLANG 的并发编程模型差异》-许式伟(七牛云存储 CEO)...
许式伟:我们开始,先介绍一下ECUG,从07年开始,最早在珠三角珠海广州深圳,在珠三角兴起,最早是Erlang的社区.大概到10年的时候更名为实时效云计算的群组,最早的时候也不局限于Erlang,而是 ...
- 多线程进阶=》JUC并发编程
多线程进阶=>JUC并发编程 1.什么是JUC JUC是java.util.concurrent的简写. 用中文概括一下,JUC的意思就是java并发编程工具包. 并发编程的本质就是 ...
- 【专家坐堂】四种并发编程模型简介
本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" • 并发 :同一时间 对待 多件事情 (逻辑层面) ...
- Scala进阶之路-正则表达式案例
Scala进阶之路-正则表达式案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 废话不多说,正则大家都很清楚,那在Scala如何使用正则了?我们直接上个案例,如下: 1 /* 2 ...
最新文章
- C++11如何减少内存拷贝次数
- Java Maximum Subarray debug
- 我的播客开通的第一天
- python3.7.2安装与pycharm_Python3和PyCharm安装与环境配置【图文教程】
- 用SandCastle为注释生成chm文档
- Unity3D 调用Java,Java调用Unity3D
- 苹果官网html简单代码,苹果官网CSS3应用案例分析
- 记账时对收支、借还款进行分类记录
- npm ETIMEDOUT 问题
- 电脑怎么显示文件后缀名?3个步骤
- Python +大数据-知行教育(四)-意向用户主题看板_全量流程
- Tengine全面支持寒武纪思元芯片平台,共同推进AI芯片加速之路
- 潜在语义分析(LSA)
- 【机器学习原理实战01】Ridge回归模型
- 分组密码体制【密码学笔记】
- 主成分分析(PCA)与K-L变换
- c语言编程曹冲称象,《曹冲称象》教学设计
- 用python3实现MD5withRSA数字签名的验证
- 搬书 hnust校赛
- 微信支付回调通知实现
热门文章
- 预防“断种”风险,专家:基因编辑能提高我国种子竞争力
- python3各个版本是通用的吗-Python各个版本间的差异
- 2011年度100个移动开发精品资料荟萃【珍藏级不容错过!】
- 最近一百年,全球涌现过哪些最顶尖的、最赚钱的公司?它们的共性是什么?
- 2022广东最新食品安全管理员模拟考试试题及答案
- 【职场进阶】怎么能够不加班或者少加班?
- 【Spring-Cloud】使用教程 基础篇 Eureka Ribbon Feign Hystrix Zuul Dashboard Sleuth Zipkin Config Bus
- python矩形碰撞检测算法_矩形的碰撞检测(模仿俄罗斯方块)
- pyecharts在数据可视化中的应用详解
- Gate Level Simulation (前仿及后仿总结)