Scala进阶之路-并发编程模型Akka入门篇

                                  作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.Akka Actor介绍

1>.Akka介绍

  写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口。

2>.Akka 中 中 Actor  模型

  Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。

3>.Akaka的特点

  第一:它是对并发模型进行了更高的抽象;
  第二:它是异步、非阻塞、高性能的事件驱动编程模型;
  第三:它是轻量级事件处理(1GB 内存可容纳百万级别个 Actor);

4>.为什么 Actor 模型是一种处理并发问题的解决方案?

  处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。

5>.Maven依赖

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6
 7     <groupId>cn.org.yinzhengjie</groupId>
 8     <artifactId>MyActor</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10
11     <!-- 定义一下常量 -->
12     <properties>
13         <encoding>UTF-8</encoding>
14         <scala.version>2.11.8</scala.version>
15         <scala.compat.version>2.11</scala.compat.version>
16         <akka.version>2.4.17</akka.version>
17     </properties>
18
19     <dependencies>
20         <!-- 添加scala的依赖 -->
21         <dependency>
22             <groupId>org.scala-lang</groupId>
23             <artifactId>scala-library</artifactId>
24             <version>${scala.version}</version>
25         </dependency>
26
27         <!-- 添加akka的actor依赖 -->
28         <dependency>
29             <groupId>com.typesafe.akka</groupId>
30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
31             <version>${akka.version}</version>
32         </dependency>
33
34         <!-- 多进程之间的Actor通信 -->
35         <dependency>
36             <groupId>com.typesafe.akka</groupId>
37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
38             <version>${akka.version}</version>
39         </dependency>
40     </dependencies>
41
42     <!-- 指定插件-->
43     <build>
44         <!-- 指定源码包和测试包的位置 -->
45         <sourceDirectory>src/main/scala</sourceDirectory>
46         <testSourceDirectory>src/test/scala</testSourceDirectory>
47         <plugins>
48             <!-- 指定编译scala的插件 -->
49             <plugin>
50                 <groupId>net.alchim31.maven</groupId>
51                 <artifactId>scala-maven-plugin</artifactId>
52                 <version>3.2.2</version>
53                 <executions>
54                     <execution>
55                         <goals>
56                             <goal>compile</goal>
57                             <goal>testCompile</goal>
58                         </goals>
59                         <configuration>
60                             <args>
61                                 <arg>-dependencyfile</arg>
62                                 <arg>${project.build.directory}/.scala_dependencies</arg>
63                             </args>
64                         </configuration>
65                     </execution>
66                 </executions>
67             </plugin>
68
69
70         </plugins>
71     </build>
72 </project>

  自定义默认的源代码包和测试包的位置,需要手动穿件Source Root目录哟,如下图:

二.编写HelloActor

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.actor
 7
 8 import akka.actor.{Actor, ActorSystem, Props}
 9
10 import scala.io.StdIn
11
12 class HelloActor extends Actor{
13     // 重写接受消息的偏函数,其功能是接受消息并处理
14     override def receive: Receive = {
15         case "你好帅" => println("竟说实话,我喜欢你这种人!")
16         case "丑八怪" => println("滚犊子 !")
17         case "stop" => {
18             context.stop(self) // 停止自己的actorRef
19             context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
20         }
21     }
22 }
23
24 object HelloActor {
25     /**
26       * 创建线程池对象MyFactory,用来创建actor的对象的
27       */
28     private val MyFactory = ActorSystem("myFactory")    //里面的"myFactory"参数为线程池的名称
29     /**
30       *     通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类,
31       * 第二个参数是给actor起个名字
32       */
33     private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")
34
35     def main(args: Array[String]): Unit = {
36         var flag = true
37         while(flag){
38             /**
39               * 接受用户输入的字符串
40               */
41             print("请输入您想发送的消息:")
42             val consoleLine:String = StdIn.readLine()
43             /**
44               * 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息
45               */
46             helloActorRef ! consoleLine
47             if (consoleLine.equals("stop")){
48                 flag = false
49                 println("程序即将结束!")
50             }
51             /**
52               * 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒
53               */
54             Thread.sleep(100)
55         }
56     }
57 }

  以上代码执行结果如下:

三.两个actor通信案例-模拟下棋对话

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.actor
 7
 8 import akka.actor.{ActorSystem, Props}
 9
10 import akka.actor.{Actor, ActorRef}
11
12 /**
13   * 定义玩家1
14   */
15 class player1Actor(val p2: ActorRef) extends Actor{
16     // receive方法是负责处理消息的
17     override def receive: Receive = {
18         case "start" => {
19             println("棋圣:I'm OK !")
20             p2 ! "该你了"
21         }
22         case "将军" => {
23             println("棋圣:你真猛!")
24             Thread.sleep(1000)
25             p2 ! "该你了"
26         }
27     }
28 }
29
30
31 /**
32   * 定义玩家2
33   */
34 class player2Actor extends Actor{
35
36     override def receive: Receive = {
37         case "start" => println("棋仙说:I'm OK !")
38         case "该你了" => {
39             println("棋仙:那必须滴!")
40             Thread.sleep(1000)
41             /**
42               * 注意,这个“sender()”,其实就是对ActorRef的一个引用。它指的是给发送"该你了"的这个对象本身!
43               */
44             sender() ! "将军"
45         }
46     }
47 }
48
49
50 object ChineseChess extends App{
51     // 创建 actorSystem的工厂,用来生产ActorRef对象!
52     private val ChineseChessActorSystem = ActorSystem("Chinese-chess")
53     /**
54       * 通过actorSystem创建ActorRef
55       */
56     private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2")            //创建player2Actor对象
57     private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1")   //创建player1Actor对象
58
59     p2 ! "start"
60     p1 ! "start"
61 }

  运行以上代码输出结果如下:

四.服务端和客户端交互的小程序

1>.服务端代码

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.robot
 7
 8 import akka.actor.{Actor, ActorSystem, Props}
 9 import com.typesafe.config.ConfigFactory
10
11 class ServerActor extends Actor{
12     /**
13       * receive方法是用来处理客户端发送过来的问题的
14       */
15     override def receive: Receive = {
16         case "start" => println("天猫系统已启动...")
17
18         case ClientMessage(msg) => {
19             println(s"收到客户端消息:$msg")
20             msg match {
21                 /**
22                   * sender()发送端的代理对象, 发送到客户端的mailbox中 -> 客户端的receive
23                    */
24                 case "你叫啥" =>
25                     sender() ! ServerMessage("本宝宝是天猫精灵")
26                 case "你是男是女" =>
27                     sender() ! ServerMessage("本宝宝非男非女")
28                 case "你有男票吗" =>
29                     sender() ! ServerMessage("本宝宝还小哟")
30                 case "stop" =>
31                     context.stop(self) // 停止自己的actorRef
32                     context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
33                     println("天猫系统已停止...")
34                 case _ =>
35                     sender() ! ServerMessage("对不起,主人,我不知道你在说什么.......")
36             }
37         }
38     }
39 }
40
41 object ServerActor {
42     def main(args: Array[String]): Unit = {
43         //定义服务端的ip和端口
44         val host = "127.0.0.1"
45         val port = 8088
46         /**
47           * 使用ConfigFactory的parseString方法解析字符串,指定服务端IP和端口
48           */
49         val config = ConfigFactory.parseString(
50             s"""
51                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
52                |akka.remote.netty.tcp.hostname=$host
53                |akka.remote.netty.tcp.port=$port
54         """.stripMargin)
55         /**
56           * 将config对象传递给ActorSystem并起名为"Server",为了是创建服务端工厂对象(ServerActorSystem)。
57           */
58         val ServerActorSystem = ActorSystem("Server", config)
59         /**
60           * 通过工厂对象创建服务端的ActorRef
61           */
62         val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao")
63         /**
64           * 到自己的mailbox -》 receive方法
65           */
66         serverActorRef ! "start"
67     }
68 }

2>.客户端代码

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.robot
 7
 8 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
 9 import com.typesafe.config.ConfigFactory
10
11 import scala.io.StdIn
12
13 class ClientActor(host: String, port: Int) extends Actor{
14
15     var serverActorRef: ActorSelection = _ // 服务端的代理对象
16
17     // 在receive方法之前调用
18     override def preStart(): Unit = {
19         // akka.tcp://Server@127.0.0.1:8088
20         serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao")
21     }
22     // mailbox ->receive
23     override def receive: Receive = { // shit
24         case "start" => println("2018天猫精灵为您服务!")
25         case msg: String => { // shit
26             serverActorRef ! ClientMessage(msg) // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive
27         }
28         case ServerMessage(msg) => println(s"收到服务端消息:$msg")
29     }
30 }
31
32 object ClientActor  {
33     def main(args: Array[String]): Unit = {
34
35         //指定客户端的IP和端口
36         val host = "127.0.0.1"
37         val port  = 8089
38
39         //指定服务端的IP和端口
40         val serverHost = "127.0.0.1"
41         val serverPort = 8088
42
43         /**
44           * 使用ConfigFactory的parseString方法解析字符串,指定客户端IP和端口
45           */
46         val config = ConfigFactory.parseString(
47             s"""
48                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
49                |akka.remote.netty.tcp.hostname=$host
50                |akka.remote.netty.tcp.port=$port
51         """.stripMargin)
52
53         /**
54           * 将config对象传递给ActorSystem并起名为"Server",为了是创建客户端工厂对象(clientActorSystem)。
55           */
56         val clientActorSystem = ActorSystem("client", config)
57
58         // 创建dispatch | mailbox
59         val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client")
60
61         clientActorRef ! "start" // 自己给自己发送了一条消息 到自己的mailbox => receive
62
63         /**
64           * 接受用户的输入信息并传送给服务端
65           */
66         while (true) {
67             Thread.sleep(500)
68             /**
69               * StdIn.readLine方法是同步阻塞的
70               */
71             val question = StdIn.readLine("请问有什么我可以帮你的吗?>>>")
72             clientActorRef ! question
73             if (question.equals("stop")){
74                 Thread.sleep(500)
75                 println("程序即将结束")
76                 System.exit(0)
77             }
78         }
79     }
80 }

3>.先执行服务端再执行客户端并输入相应信息测试结果如下:

  客户端运行结果如下:

  服务端运行结果如下:

转载于:https://www.cnblogs.com/yinzhengjie/p/9376296.html

Scala进阶之路-并发编程模型Akka入门篇相关推荐

  1. Scala进阶之路-面向对象编程之类的成员详解

    Scala进阶之路-面向对象编程之类的成员详解 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.Scala中的object对象及apply方法 1>.scala 单例对象 ...

  2. 15. Scala并发编程模型Akka

    15.1 Akka介绍 1) Akka是Java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解成Akka是编写并发程序的框架 2) Akka用Scala语言写成,同时提供了S ...

  3. 并发编程模型Akka

    1)Akka是java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解为Akka是编写并发程序的框架 2)akka是用scala语言写成,同时提供了scala和java的开发接 ...

  4. java akka 教程_Akka与并发编程模型

    Akka是一种并发编程模型的框架,其官网为http://akka.io.提供Java版本和Scala版本的API. 从学习或者使用的角度来说,我们首先要说明的是,Akka的并发编程模型(流水线模式)与 ...

  5. 四种并发编程模型简介

    概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" •       并发 :同一时间 对待 多件事情 (逻辑层面) •       并 ...

  6. [ ECUG 专题回顾]《再谈 CERL:详论 GO 与 ERLANG 的并发编程模型差异》-许式伟(七牛云存储 CEO)...

    许式伟:我们开始,先介绍一下ECUG,从07年开始,最早在珠三角珠海广州深圳,在珠三角兴起,最早是Erlang的社区.大概到10年的时候更名为实时效云计算的群组,最早的时候也不局限于Erlang,而是 ...

  7. 多线程进阶=》JUC并发编程

    多线程进阶=>JUC并发编程 1.什么是JUC ​ JUC是java.util.concurrent的简写. ​ 用中文概括一下,JUC的意思就是java并发编程工具包. ​ 并发编程的本质就是 ...

  8. 【专家坐堂】四种并发编程模型简介

    本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" •       并发 :同一时间 对待 多件事情 (逻辑层面) ...

  9. Scala进阶之路-正则表达式案例

    Scala进阶之路-正则表达式案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 废话不多说,正则大家都很清楚,那在Scala如何使用正则了?我们直接上个案例,如下: 1 /* 2 ...

最新文章

  1. C++11如何减少内存拷贝次数
  2. Java Maximum Subarray debug
  3. 我的播客开通的第一天
  4. python3.7.2安装与pycharm_Python3和PyCharm安装与环境配置【图文教程】
  5. 用SandCastle为注释生成chm文档
  6. Unity3D 调用Java,Java调用Unity3D
  7. 苹果官网html简单代码,苹果官网CSS3应用案例分析
  8. 记账时对收支、借还款进行分类记录
  9. npm ETIMEDOUT 问题
  10. 电脑怎么显示文件后缀名?3个步骤
  11. Python +大数据-知行教育(四)-意向用户主题看板_全量流程
  12. Tengine全面支持寒武纪思元芯片平台,共同推进AI芯片加速之路
  13. 潜在语义分析(LSA)
  14. 【机器学习原理实战01】Ridge回归模型
  15. 分组密码体制【密码学笔记】
  16. 主成分分析(PCA)与K-L变换
  17. c语言编程曹冲称象,《曹冲称象》教学设计
  18. 用python3实现MD5withRSA数字签名的验证
  19. 搬书 hnust校赛
  20. 微信支付回调通知实现

热门文章

  1. 预防“断种”风险,专家:基因编辑能提高我国种子竞争力
  2. python3各个版本是通用的吗-Python各个版本间的差异
  3. 2011年度100个移动开发精品资料荟萃【珍藏级不容错过!】
  4. 最近一百年,全球涌现过哪些最顶尖的、最赚钱的公司?它们的共性是什么?
  5. 2022广东最新食品安全管理员模拟考试试题及答案
  6. 【职场进阶】怎么能够不加班或者少加班?
  7. 【Spring-Cloud】使用教程 基础篇 Eureka Ribbon Feign Hystrix Zuul Dashboard Sleuth Zipkin Config Bus
  8. python矩形碰撞检测算法_矩形的碰撞检测(模仿俄罗斯方块)
  9. pyecharts在数据可视化中的应用详解
  10. Gate Level Simulation (前仿及后仿总结)