一 定义Actor

import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import akka.event.Logging

/** 通过扩展Actor并实现receive方法来定义Actor*/
class MyActor extends Actor{
    //获取LoggingAdapter,用于日志输出
   
val log = Logging(context.system,this)
    def receive = {
        case "test" => log.info("Received Test")
        case _ => log.info("Received Unknown Message")
    }
}

object MyActor extends App {
    /*ActorSystem 用于创建维护Actor*/
   
val system = ActorSystem("MyActorSystem")
    /*返回ActorSystem的LoggingAdpater*/
   
val systemLog= system.log
    /*通过ActorSystem#actorOf创建MyActor指定名称为myactor*/
   
val myactor = system.actorOf(Props[MyActor],name="myactor")
    systemLog.info("准备向myactor发送消息")
    /*向myactor发送消息*/
    myactor
! "test"
    myactor ! "123"
    /*关闭ActorSystem,停止程序的运行*/
    system
.shutdown()
}

二 ActorSystem

ActorSystem的只要职责:

# 创建Actor 和 ActorRef

# 返回Actor Ref

# 拦截消息,并把它发到邮箱(临时存储消息的地方)

创建ActorRef

system.actorOf(Props[MyActor],name="myactor")

我们可以在Props[Actor]放入一个Actor类型,也可以放入一一个Actor引用,但是这种是不推荐的

使用ActorRef对象可以进行消息的发送等操作

三 2种创建Actor的方式

3.1 ActorSystem 创建Actor

val system = ActorSystem("mySystem")

val myActor = system.actorOf(Props[MyActor],"myactor2")

通过ActorSystem创建的Actor是顶级Actor,在Akka框架中,每一个Akka应用程序都会有一个守卫Actor(guardian)user,所有通过system.actorOf工厂方法创建的Actor都是user的子Actor,也是整Akka程序的顶级Actor

3.2 通过context创建Actor

我们可以通过Actor上下文创建子Actor,使得父Actor拥有管理子Actor的能力

import akka.actor.Actor.Receive
import akka.actor.{ActorLogging,Actor,Props}class ContextActor extends Actor with ActorLogging{val childActor = context.actorOf(Props[MyActor], name = "ChildActor")def receive: Receive = {case msg =>childActor !msg;log.info("Received "+msg)}
}
object MyActor extends App {/*ActorSystem 用于创建维护Actor*/val system = ActorSystem("MyActorSystem")/*返回ActorSystem的LoggingAdpater*/val systemLog = system.log/*通过ActorSystem#actorOf创建MyActor指定名称为myactor的顶级Actor*/val myactor = system.actorOf(Props[MyActor],name="topActor")systemLog.info("准备向Top Actor--MyActor发送消息")/*向myactor发送消息*/myactor ! "test"myactor ! "123"/*通过ActorSystem#actorOf创建ContextActor,然后在其内部创建一个子的MyActor*/val contextActor = system.actorOf(Props[ContextActor],name="childActor")systemLog.info("准备向Child Actor--MyActor发送消息")contextActor ! "hadoop"contextActor ! "hive"/*关闭ActorSystem,停止程序的运行*/system.shutdown()
}

得到的结果如下:

准备向TopActor--MyActor发送消息

[akka://MyActorSystem/user/topActor] Received Test

[akka://MyActorSystem/user/topActor] ReceivedUnknown Message

准备向ChildActor--MyActor发送消息

[akka://MyActorSystem/user/childActor/ChildActor]Received Unknown Message

[akka://MyActorSystem/user/childActor] Receivedhadoop

[akka://MyActorSystem/user/childActor] Receivedhive

[akka://MyActorSystem/user/childActor/ChildActor]Received Unknown Message

结论:通过context创建子的Actor这种方式,可以支持链式调用

四 Actor的path

通过以上运行的结论:

akka://MyActorSystem/user/topActor

akka://MyActorSystem/user/childActor/ChildActor

五 Actor API的使用

5.1 hook方法

preStart,postStop方法

import akka.actor._
import akka.actor.Actor.Receiveclass HookActor extends Actor with ActorLogging{var child:ActorRef = _/*preStart(),Actor启动之前调用,用于完成初始化工作*/@throws[Exception](classOf[Exception])override def preStart(): Unit = {log.info("[preStrat method invoked]")child = context.actorOf(Props[MyActor],name="myChild")}def receive: Receive = {case msg => child ! msg;log.info("Received "+msg)}/*postStop(),Actor停止之后调用*/@throws[Exception](classOf[Exception])override def postStop(): Unit = {log.info("[postStop method invoked]")context.stop(child)}
}object HookActor extends App {val system = ActorSystem("HookActorSystem")val hook = system.actorOf(Props[HookActor],name="hook")hook ! "Welcome You"hook ! "Nice to meet you"system.terminate()
}

5.2成员变量self及成员方法sender方法的使用

self ! 消息:指的是自己向自己发送一条消息

sender ! 或者 sender() !消息 指定的是向给自己发送消息的Actor发送一条消息,如果没有给自己发送消息的Actor,则不会投递消息

class MyActor extends Actor{self ! "MSG FROM MYSELF"//获取LoggingAdapter,用于日志输出val log = Logging(context.system,this)def receive = {case "test" =>log.info("[MyActor]=> test")sender()!"MSG FROM MyActor"case "MSG FROM MYSELF" =>log.info("[MyActor]=> MSG FROM MYSELF")case "MSG FROM MyActor" =>log.info("[MyActor]=>MSG FROM MyActor")case _ => log.info("[MyActor]=> Unkown Message");}
}object MyActor extends App {/*ActorSystem 用于创建维护Actor*/val system = ActorSystem("MyActorSystem")/*返回ActorSystem的LoggingAdpater*/val systemLog = system.log/*通过ActorSystem#actorOf创建MyActor指定名称为myactor的顶级Actor*/
//    val myactor = system.actorOf(Props[MyActor],name="topActor")
//    systemLog.info("准备向Top Actor--MyActor发送消息")
//    /*向myactor发送消息*/
//    myactor ! "test"/*通过ActorSystem#actorOf创建ContextActor,然后在其内部创建一个子的MyActor*/val contextActor = system.actorOf(Props[ContextActor],name="childActor")systemLog.info("准备向Child Actor--MyActor发送消息")contextActor ! "test"/*关闭ActorSystem,停止程序的运行*/system.shutdown()
}
class ContextActor extends Actor with ActorLogging{val childActor = context.actorOf(Props[MyActor], name = "ChildActor")def receive: Receive = {case msg =>childActor ! msg;log.info("[ContextActor]=> "+msg)}
}

[akka://MyActorSystem/user/childActor][ContextActor]=> test

[akka://MyActorSystem/user/childActor/ChildActor][MyActor]=> MSG FROM MYSELF

[akka://MyActorSystem/user/childActor/ChildActor][MyActor]=> test

[akka://MyActorSystem/user/childActor/ChildActor][MyActor]=>MSG FROM MyActor

[akka://MyActorSystem/user/childActor][ContextActor]=> MSG FROM MyActor

5.3 unhandle方法的使用

unhandled方法用于处理没有被receive方法处理的消息,比如case 中并没有针对没有匹配的消息的处理case _ 语句,在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法

class UnhandleActor extends Actor with ActorLogging{def receive = {case "hello" => println("hello, nice to meet you!")}/*重写unhandled方法*/override def unhandled(message: Any): Unit = {log.info("UNHANDLED MSG {}",message.toString.toUpperCase())}
}object UnhandleActor {def main(args: Array[String]) {val system = ActorSystem("UnhandleActor")val unhandle = system.actorOf(Props[UnhandleActor],name="unhandle")unhandle ! "hello"unhandle ! "business dias"}
}

六 Actor的停止

6.1 通过ActorSystem#shutdown(过时) 或者ActorSystem#terminate

停止所有Actor运行

object UnhandleActor {def main(args: Array[String]) {val system = ActorSystem("UnhandleActor")val unhandle = system.actorOf(Props[UnhandleActor],name="unhandle")unhandle ! "hello"unhandle ! "business dias"//system.shutdown()system.terminate()}
}

6.2 通过ActorContext停止某一个Actor

class HookActor extends Actor with ActorLogging{var child:ActorRef = _/*preStart(),Actor启动之前调用,用于完成初始化工作*/@throws[Exception](classOf[Exception])override def preStart(): Unit = {log.info("[preStrat method invoked]")child = context.actorOf(Props[MyActor],name="myChild")}def receive: Receive = {case "self" => sender() ! "test"case msg => child ! msg;log.info("Received "+msg)}/*postStop(),Actor停止之后调用*/@throws[Exception](classOf[Exception])override def postStop(): Unit = {log.info("[postStop method invoked]")context.stop(child)}
}

6.3 通过发送消息的形式通知Actor关闭

语法格式 actor ! PoisonPill

class MyActor extends Actor{//获取LoggingAdapter,用于日志输出val log = Logging(context.system,this)def receive = {case "test" =>log.info("[MyActor]=> test")sender()!"MSG FROM MyActor"case _ =>log.info("[MyActor]=> Unkown Message");}@throws[Exception](classOf[Exception])override def postStop(): Unit = {log.info("[MyActor][postStop method invoked]")}
}
class HookActor extends Actor with ActorLogging{var child:ActorRef = context.actorOf(Props[MyActor],name="myChild")def receive: Receive = {case "stop" => child ! PoisonPillcase msg => child ! msg;log.info("[HookActor]=> "+msg)}
}object HookActor extends App {val system = ActorSystem("HookActorSystem")val hook = system.actorOf(Props[HookActor],name="hook")hook ! "Nice to meet you"hook ! "stop"
}

七 Actor 消息模型

Akka中,消息可以以2种模型来传递给Actor, 如下:

7.1 Fire and Forget

消息生产者不期望从消费者得到答复,这种消息是异步方式发送,然后立即返回,Akka actors使用tell()方法指示该消息是Fire and Forget模式

import akka.actor.Actor.Receive
import akka.actor.{ActorSystem, Props, ActorLogging, Actor}case class Animal(msg:String)
case class Human(msg:String)
case class Plant(msg:String)
case class UFO(msg:String)class FireAndForgetActor extends Actor with ActorLogging{def receive: Receive = {case Animal(msg) => log.info("[Animal] => {}", msg)case Human(msg) => log.info("[Human] => {}", msg)case Plant(msg) => log.info("[Plant] => {}", msg)case _ => log.info("Hahah")}
}class SendActor extends Actor with ActorLogging{val child = context.actorOf(Props[FireAndForgetActor],name="tell")def receive: Receive = {case msg => child.tell(msg,sender)}override def unhandled(message: Any): Unit = {log.info("[Unhandled Message] => {}",message)}
}object SendActor extends App {val system = ActorSystem("SendActorSystem")val send = system.actorOf(Props[SendActor],name="send")val animal = new Animal("我是老虎")val human = new Human("我是隔壁那啥")val plant = new Plant("我是小草")val ufo = new UFO("我是来自xxx星球的")send ! animalsend ! humansend ! plantsend ! ufosystem.terminate()
}

7.2 Send and Receive

这种模式,生产者期望从消费者得到一个答复,而且需要等这个答复,这种模式也是异步发送的,返回一个Future, kka actors 使用ask()方法发送消息然后等待答复的future

import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.Actor.Receive
import akka.actor._
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
import scala.concurrent.Future
case class Animal(msg:String)
case class Human(msg:String)
case class Plant(msg:String)
case class UFO(msg:String)class FireAndForgetActor extends Actor with ActorLogging{def receive: Receive = {case Animal(msg) => log.info("[Animal] => {}", msg)case Human(msg) => log.info("[Human] => {}", msg)case Plant(msg) => log.info("[Plant] => {}", msg)case _ => log.info("Hahah")}
}class SendAndReceiveActor extends Actor with ActorLogging{def receive: Receive = {case "动物" =>sender ! new Animal("老虎")case "人类" =>sender ! new Human("小明")case "植物" =>sender ! new Plant("杜鹃花")case _ =>sender ! new UFO("UFO")}
}class RunActor extends Actor with ActorLogging{implicit val timeout = Timeout(5,TimeUnit.MILLISECONDS)val askActor = context.actorOf(Props[SendAndReceiveActor],name="ask")val tellActor = context.actorOf(Props[FireAndForgetActor],name="tell")def receive: Receive = {case "动物" =>val result:Future[Animal] =for {obj <- ask(askActor,"动物").mapTo[Animal]} yield objpipe(result).to(tellActor)case "人类" =>val result:Future[Human] =for {obj <- ask(askActor,"人类").mapTo[Human]} yield objpipe(result).to(tellActor)case "植物" =>val result:Future[Plant] =for {obj <- ask(askActor,"植物").mapTo[Plant]} yield objpipe(result).to(tellActor)case "不明飞行物" =>val result:Future[UFO] =for {obj <- ask(askActor,"不明飞行物").mapTo[UFO]} yield objpipe(result).to(tellActor)}
}class SendActor extends Actor with ActorLogging{val child = context.actorOf(Props[FireAndForgetActor],name="tell")def receive: Receive = {case msg => child.tell(msg,sender)}override def unhandled(message: Any): Unit = {log.info("[Unhandled Message] => {}",message)}
}object SendActor extends App {val system = ActorSystem("SendActorSystem")val send = system.actorOf(Props[RunActor],name="send")send ! "动物"send ! "人类"send ! "植物"send ! "不明飞行物"Thread.sleep(2000)system.terminate()
}

注意点:

必须声明一个隐式timeout参数:

implicit val timeout = Timeout(5,TimeUnit.
MILLISECONDS)
必须引入import scala.concurrent.ExecutionContext.
Implicits.global
ask 方法不是Actor自己,需要引入import akka.pattern.ask
pipe也不是Actor自己的也需要引入:import
akka.pattern.pipe

八  Typed Actor

8.1 什么是Typed Actor

Typed Actor是Active Objects设计模式的实现. Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性。Typed Actor由公用的接口和对应实现两部分构成

我们来看看Active Object模式是怎么实现解耦的:

# 客户端调用代理对象的方法

# 代理对象不断传递方法调用作为Method Request到一个调度器或者Invocation Handler去拦截这些请求

# 调度器Scheduler或者InvocationHandler将Method Request放入队列

# 调度器连续不断的监控队列,然后决定哪一个Method Request是可以运行的,如果被调用了,则从队列中移除

# Scheduler或者Invocation Handler分发Method Request 到 这个方法实现类(implementationobject)

# 这个实现类和运行Scheduler处于相同的线程,处理请求然后与返回客户端Future

8.2 创建Type Actor,并发送消息

第一步: 先定义接口或者特质

trait CalculatorInt {
    def add(first:Int, second:Int):Future[Int]
    def subtract(first:Int, second:Int):Future[Int]
    def incrementCount():Unit
}

第二步:继承Trait,并实现方法

import scala.concurrent.{Promise, Future}
import akka.actor.TypedActor.dispatcher
/**
 * Future:表示一个可能还没有实际完成的异步任务的结果
 * 为了使Future完全非阻塞,注册Callback到Future中,一旦future完成,Callback会被异步执行
 * Promise:是一个可写的,只能赋一次值的容器,Promise存储计算结果,从Promise中可以得到Future,
 * 由Promise来完成Future. 也可以从字面上来理解,Promise也就是一个承诺,就好比去买一杯咖啡,
 * 付账过后,服务员承诺会给你一杯咖啡,但需要过几分钟才能领取这杯咖啡.服务员制作咖啡的过程就是
 * 一个Future,而付账过后,就得到服务员的一个Promise。
 */
class Calculator extends CalculatorInt{
    var counter  = 0;

def add(first: Int, second: Int): Future[Int] = {
        Promise.successful[Int](first + second).future
    }

def subtract(first: Int, second: Int): Future[Int] = {
        Promise.successful[Int](first - second).future
    }

def incrementCount(): Unit = {
        counter += 1
        Some(counter)
    }
}

第三步 创建TypeActor

# 方式一:直接通过默认的构造函数创建Typed Actor

val _system = ActorSystem("TypedActorsExample")
val c:CalculatorInt = TypedActor(_system).typedActorOf(TypedProps[Calculator]())

# 方式二:直接通过默认的构造函数创建Typed Actor并指定Typed Actor名称


val c1:CalculatorInt = TypedActor(_system).typedActorOf(TypedProps[Calculator](),"myCalculator")

方式三:通过非默认的构造函数创建Typed Actor并指定Typed Actor名称
val c2:CalculatorInt = TypedActor(_system).typedActorOf(TypedProps(classOf[CalculatorInt],new Calculator), "myCalculator")

第四步 : 发送消息

第一种:Fire and Forget

c.incrementCount()

第二种:Send and Receive

val future = c.add(14,6)
val durations = Duration.apply(5,TimeUnit.SECONDS);
val result = Await.result(future,durations)

8.3 停止Type Actor

有两种方式停止Type Actor:

TypedActor(_system).stop(c)
TypedActor(_system).poisonPill(c)

8.4 Type Actor 声明周期监控

通过实现TypeActor.PreStart和TypeActor.PostStop接口,在actor开始和结束的时候,我们可以添加一些功能

class Calculator extends CalculatorInt with PreStart with PostStop{var counter  = 0;override def preStart(): Unit = {println("[preStart] method invoked")}def add(first: Int, second: Int): Future[Int] = {Promise.successful[Int](first + second).future}def subtract(first: Int, second: Int): Future[Int] = {Promise.successful[Int](first - second).future}def incrementCount(): Unit = {counter += 1Some(counter)}override def postStop(): Unit = {println("[postStop] method invoked")}
}

Akka之actor模型相关推荐

  1. Akka入门(二)Akka的Actor模型如何满足现代分布式系统需求

    Actor模型允许开发者: 在不诉诸锁定的情况下实施封装. 使用协作实体的模型对信号做出反应,改变状态,并相互发送信号以推动整个应用程序向前发展. 不要担心与我们的世界观不匹配的执行机制. (一) 消 ...

  2. spark 如何用netty实现akka的actor模型

    Spark的中,通过netty实现了类似akka的actor机制. 在spark中,一个EndPointData就类似一个akka中的actor. private class EndpointData ...

  3. scala之Akka的Actor模型(上)

    原文地址:http://my.oschina.net/jingxing05/blog/287213 明确并行和并发 看两张图 并行parallelism 并发concurrency 关键点在于 多个任 ...

  4. 【Akka】Actor模型探索

    Akka是什么 Akka就是为了改变编写高容错性和强可扩展性的并发程序而生的.通过使用Actor模型我们提升了抽象级别,为构建正确的可扩展并发应用提供了一个更好的平台.在容错性方面我们采取了" ...

  5. 【Flink】FLink 通讯组件 Akka与Actor 模型

    1.概述 本博客是 视频的笔记,这个是讲解 flink 1.12 源码的,入门非常的好. 尚硅谷2021最新Flink内核源码解析课程(从入门到精通) 2.介绍 Flink内部节点之间的通信是用Akk ...

  6. Scala Akka的Actor模型

  7. Actor模型与Akka

    Actor模型与Akka 一. Actor模型 Actor模型概念 一个概念模型,用于处理并发计算 Actor模型内部的状态由自己的行为维护,外部线程不能直接调用对象的行为,必须通过消息才能激发行为, ...

  8. 95-848-020-源码-AKKA-Akka与Actor 模型

    1.概述 Akka是一个用来开发支持并发.容错.扩展性的应用程序框架.它是actor model的实现,因此跟Erlang的并发模型很像.在actor模型的上下文中,所有的活动实体都被认为是互不依赖的 ...

  9. actor 模型原理 (二)

    现在开始研究一下akka的actor模型是怎么实现的: 老外写了一个程序说明actor的工作机制,下图就是学生给老师发邮件的具体示意图,那么1-6一共6个步骤 1.学生创建actor system , ...

最新文章

  1. 2w字 + 40张图带你参透并发编程!
  2. LeetCode 309. Best Time to Buy and Sell Stock with Cooldown--Java解法-卖股票系列题目
  3. C# 语句中的各种单例模式代码
  4. ●BZOJ 1934 [Shoi2007]Vote 善意的投票
  5. oracle10g密钥,Oracle10G透明数据加密技术如何使用呢?
  6. 链接数据库增删改通用
  7. git出现红字说明什么_怀孕的第一个月会出现什么变化?若有7种表现,说明可能怀上了...
  8. 或许是比力扣 leetcode 更好的选择?推荐两个编程算法宝藏网站
  9. 错误RuntimeError: Invalid DISPLAY variable
  10. 关于在自己的程序中使用其它窗口的菜单
  11. BZOJ 4415 洛谷 3988 [Shoi2013]发牌
  12. 心有多高 未来就有多远
  13. java实验报告_java实验报告完整版.pdf
  14. 如何将php改成mp4,如何将swf转换成mp4
  15. eviews求相关系数
  16. 树莓派安装中文输入法(使用的谷歌提供的树莓派系统google voice kit)
  17. js 判断系统类型和手机型号(厂商)
  18. AR/VR/MR三者之间的区别和联系
  19. CodeForces 869A The Artful Expedient
  20. 用python画象棋棋盘_Python turtle绘画象棋棋盘

热门文章

  1. java服务器修改缓存数据,java监控服务器数据存入缓存
  2. easyexcel 工具类_问了个在阿里的同学,他们常用的15款开发者工具!
  3. shell编写mysql全备和增备脚本_基于mysqldump编写自动全备增备的shell脚本
  4. hdfs复制文件夹_Hadoop框架:HDFS简介与Shell管理命令
  5. 商淘多b2b2c商城系统怎么在个人电脑上安装_企业怎么做好b2b2c商城网站建设?...
  6. 语言 标签倾斜 绘图_一文搞懂ggplot2:老板再也不用担心我的科研绘图
  7. c++ 服务 以当前用户拉起进程_渗透技巧——通过CredSSP导出用户的明文口令
  8. cron表达式在线测试
  9. java实现多表增加_java多表插入数据
  10. centos7升级gcc,并安装redis