一直想了解rust中actor并发模式,Actix库是rust中知名的库。看看Actix库的说明,走进actor。
这个库的重要几个概念:
1、actor
任何实现Actor trait的类型,就是一个actor.actor有生命周期,几个状态:

(1)Started

(2) Running

(3)Stopping

(4)Stopped

我们来看一下Actor trait:
里面有start()、start_default()等不带参数的函数,大都返回的是Addr < Self >。我们看到了一个Addr类型。因为所有的Actors有一个邮箱Addr,而Actors之间的通信是通过messages来实现的,Actors之间只知道messages地址,而不能侵入到对方内部。

pub trait Actor: Sized + 'static {type Context: ActorContext;fn started(&mut self, ctx: &mut Self::Context) {}fn stopping(&mut self, ctx: &mut Self::Context) -> Running {Running::Stop}fn start(self) -> Addr<Self>whereSelf: Actor<Context = Context<Self>>,{Context::new().run(self)}fn start_default() -> Addr<Self>whereSelf: Actor<Context = Context<Self>> + Default,{Self::default().start()}/// Start new actor in arbiter's thread.fn start_in_arbiter<F>(arb: &Arbiter, f: F) -> Addr<Self>whereSelf: Actor<Context = Context<Self>>,F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,{let (tx, rx) = channel::channel(DEFAULT_CAPACITY);// create actorarb.exec_fn(move || {let mut ctx = Context::with_receiver(rx);let act = f(&mut ctx);let fut = ctx.into_future(act);actix_rt::spawn(fut);});Addr::new(tx)}fn create<F>(f: F) -> Addr<Self>whereSelf: Actor<Context = Context<Self>>,F: FnOnce(&mut Context<Self>) -> Self + 'static,{let mut ctx = Context::new();let act = f(&mut ctx);ctx.run(act)}
}

自定义类,实现Actor:

use actix::prelude::*;struct MyActor {count: usize,
}impl Actor for MyActor {type Context = Context<Self>;// 启动的时侯,进行一些个性化设置,比如fn started(&mut self, ctx: &mut Self::Context) {ctx.set_mailbox_capacity(1);}
}
let addr = MyActor.start();

2、message 、handler、 address、Recipient

(1)任何实现Message trait类型,是一个message.

pub trait Message {/// The type of value that this message will resolved with if it is/// successful.type Result: 'static;
}

(2)所有的Actors之间的通信是通过messages来实现的。通信的message发往目标邮箱,Actors调用message handlers(句柄),执行上下文(context).

(3)handler是啥?

pub trait Handler<M>
whereSelf: Actor,M: Message,
{/// The type of value that this handler will return.type Result: MessageResponse<Self, M>;/// This method is called for every message received by this actor.fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
}

实现handler:

struct Ping(usize);
impl Handler<Ping> for MyActor {type Result = usize;fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {self.count += msg.0;self.count}
}

(4) 自定义类,实现Message:

use actix::prelude::*;struct Ping(usize);impl Message for Ping {type Result = usize;
}

(5)如何发送message?
首先要用到Addr object。具体地说,有几种方法在actors之间发送message:

Addr::do_send(M) - 忽视返回任何错误的方式,因为邮箱可能满了,也可能关闭。
Addr::try_send(M) - 如果错误 ,会返回 SendError。
Addr::send(M) - 会返回future object ,带有处理过程的结果信息。

(6)Recipient -收件人
收件人是一个Actor发给另外一个不同类型Actor时的地址。比如,订阅者与发送者。

3、context (上下文)、Mailbox

(1)Context字面上是上下文。具体有什么东东?看看源码:

pub struct Context<A>
whereA: Actor<Context = Context<A>>,
{parts: ContextParts<A>,mb: Option<Mailbox<A>>,
}impl<A> Context<A>
whereA: Actor<Context = Self>,
{#[inline]pub(crate) fn new() -> Self {let mb = Mailbox::default();Self {parts: ContextParts::new(mb.sender_producer()),mb: Some(mb),}}#[inline]pub fn with_receiver(rx: AddressReceiver<A>) -> Self {let mb = Mailbox::new(rx);Self {parts: ContextParts::new(mb.sender_producer()),mb: Some(mb),}}#[inline]pub fn run(self, act: A) -> Addr<A> {let fut = self.into_future(act);let addr = fut.address();actix_rt::spawn(fut);addr}pub fn into_future(mut self, act: A) -> ContextFut<A, Self> {let mb = self.mb.take().unwrap();ContextFut::new(self, act, mb)}pub fn handle(&self) -> SpawnHandle {self.parts.curr_handle()}pub fn set_mailbox_capacity(&mut self, cap: usize) {self.parts.set_mailbox_capacity(cap)}
}impl<A> AsyncContextParts<A> for Context<A>
whereA: Actor<Context = Self>,
{fn parts(&mut self) -> &mut ContextParts<A> {&mut self.parts}
}
pub trait ContextFutureSpawner<A>
whereA: Actor,A::Context: AsyncContext<A>,
{fn spawn(self, ctx: &mut A::Context);fn wait(self, ctx: &mut A::Context);
}impl<A, T> ContextFutureSpawner<A> for T
whereA: Actor,A::Context: AsyncContext<A>,T: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
{#[inline]fn spawn(self, ctx: &mut A::Context) {let _ = ctx.spawn(self);}#[inline]fn wait(self, ctx: &mut A::Context) {ctx.wait(self);}
}

里面有 ContextParts和Option<Mailbox>两部分内容构成。
(2)找到ContextParts的源码,我们来看看:

pub struct ContextParts<A>
whereA: Actor,A::Context: AsyncContext<A>,
{addr: AddressSenderProducer<A>,flags: ContextFlags,wait: SmallVec<[ActorWaitItem<A>; 2]>,items: SmallVec<[Item<A>; 3]>,handles: SmallVec<[SpawnHandle; 2]>,
}

(3)我们来看一下Mailbox中的设计:

pub struct Mailbox<A>
whereA: Actor,A::Context: AsyncContext<A>,
{msgs: AddressReceiver<A>,
}impl<A> Mailbox<A>
whereA: Actor,A::Context: AsyncContext<A>,
{#[inline]pub fn new(msgs: AddressReceiver<A>) -> Self {Self { msgs }}pub fn capacity(&self) -> usize {self.msgs.capacity()}pub fn set_capacity(&mut self, cap: usize) {self.msgs.set_capacity(cap);}#[inline]pub fn connected(&self) -> bool {self.msgs.connected()}pub fn address(&self) -> Addr<A> {Addr::new(self.msgs.sender())}pub fn sender_producer(&self) -> AddressSenderProducer<A> {self.msgs.sender_producer()}pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context) {#[cfg(feature = "mailbox_assert")]let mut n_polls = 0u16;loop {let mut not_ready = true;// sync messagesloop {if ctx.waiting() {return;}match self.msgs.poll() {Ok(Async::Ready(Some(mut msg))) => {not_ready = false;msg.handle(act, ctx);}Ok(Async::Ready(None)) | Ok(Async::NotReady) | Err(_) => break,}#[cfg(feature = "mailbox_assert")]{n_polls += 1;assert!(n_polls < MAX_SYNC_POLLS, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");}}if not_ready {return;}}}
}

4、Arbiter 、SyncArbiter

为Actors提供了异步执行的上下文环境,当一个actor运行时,Arbiters控制着actor包含特定执行状况的上下文环境。 Arbiters需要运行许多函数,包括起系统线程的函数、进行事件轮询、异步分发事件轮询任务、对异步任务进行支持。
当起一个actor时,是在一个系统的线程中运行的,这样效率比较高。也就是说,一个线程可能会针对N个actor.
事件轮询
在事件轮询时,对应的 Arbiter会控制事件轮询事件池的线程。 Arbiter会对任务队列进行排队,往往的情况是,你可以把Arbiter看成"single-threaded event loop".

5、future
从Future库可以看出:

    pub trait Future {/// The type of value produced on completion.#[stable(feature = "futures_api", since = "1.36.0")]type Output;#[stable(feature = "futures_api", since = "1.36.0")]fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;}

Rust : actor模式 与 Actix库相关推荐

  1. Rust actix aiohttp_如何看待 Rust Actix 库的维护者退出开源界?

    更新:actix-web已经找到了接手维护者. 新的维护者看上去是一个比较靠谱的开发者,看到他也参与bastion这个项目,该项目旨在用Rust实现一个类Erlang VM(BEAM)的东东.感觉ac ...

  2. 多进程-> 多线程 -> 异步 -> EventDriven -> Actor模式

    没有感悟的经历,似乎不是我的风格.格物致知的本质是列举和比较.经历了许多,似乎我们看透的本质还是不够本质. 作为一个老程序猿了.对于老调重弹的如何能够更好的利用机器资源这件至始至终的老话题再说一下.谈 ...

  3. Skynet基于Actor模式的开源框架

    使用多进程解决高并发带来的问题是进程安全锁,框架经常会因为部分代码的报错而导致死锁或内存占用不释放等诸多问题.而使用单进程的服务器框架,通过线程池来做消息轮询和任务执行,能够避开锁带来的诸多问题. 框 ...

  4. 如何使用OWASP Dependency Check的命令行(CLI)模式进行依赖库安全漏洞扫描

    OWASP Dependency Check是一款用于识别项目的依赖项是否有已知漏洞的工具,本文介绍一下如何使用Dependency Check工具的命令行模式进行依赖库漏洞扫描. [下载地址]:安装 ...

  5. Newbe.Claptrap-一套以“事件溯源”和“Actor模式”作为基本理论的服务端开发框架...

    本文是关于 Newbe.Claptrap 项目主体内容的介绍,读者可以通过这篇文章,大体了解项目内容. 轮子源于需求 随着互联网应用的蓬勃发展,相关的技术理论和实现手段也在被不断创造出来.诸如 &qu ...

  6. Newbe.Claptrap - 一套以 “事件溯源” 和“Actor 模式”作为基本理论的服务端开发框架...

    本文是关于 Newbe.Claptrap 项目主体内容的介绍,读者可以通过这篇文章,大体了解项目内容. 轮子源于需求 随着互联网应用的蓬勃发展,相关的技术理论和实现手段也在被不断创造出来.诸如 &qu ...

  7. Actor模式理解与使用

    最近学习ThingsBoard,其中大量使用了Actor设计模式,再这里做个Actor模式理解与使用的笔记 Actor模式是一种并发模型,与另一种模型共享内存完全相反,Actor模型share not ...

  8. java actor_十分钟理解Actor模式

    Actor模式是一种并发模型,与另一种模型共享内存完全相反,Actor模型share nothing.所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为Actor.共享内存更适合单 ...

  9. 【ProgrammingMicrosoftAzureServiceFabric】第四章: Actor模式

    第4章: Actor模式 过去几十年里我们从面向对象编程(OOP)中学习到许多问题可以被建模为一些拥有行为和状态的交互对象.Actor模式更进了一步,把问题建模为独立的并通过消息交互的Agent. A ...

  10. Hadoop SequnceFile.Writer 压缩模式及压缩库浅析

    2019独角兽企业重金招聘Python工程师标准>>> 先说明SequnceFile的压缩类型(Compression Type)分为三种NONE,RECORD,BLOCK,通过配置 ...

最新文章

  1. .gitignore和.gitkeep有什么区别?
  2. js继承的实现(转载)
  3. IDEA基于kotlin开发android程序配置小结
  4. 日计不足涓滴成河-自定义响应结果格式化器
  5. 33迭代器模式(Iterator Pattern)
  6. excel设置曲线图横坐标值
  7. arm平台linux移植ethtool工具
  8. android多个单选框超格,福昕PDF阅读器打印时提示“打印机被意外删除了”怎么处理?...
  9. 模拟CMOS集成电路设计 学习笔记(三)
  10. 计算机分屏功能吗,电脑分屏显示
  11. whois php,域名whois php
  12. ShopTalk第19集
  13. Mac 解决终端:-bash: /Users/xxx/.profile: No such file or directory
  14. javascript 打印错误信息 catch err
  15. linux下载ccle数据,对CCLE数据库可以做的分析--转载
  16. php 国家地区码,有没有一种简单的方法可以从PHP的国家/地区代码中获取语言代码 - php...
  17. Linux 批量修改密码
  18. visual Studio Code(VS code)软件中HTML超级好用的一个插件 liveserver,vs code浏览网页
  19. 十进制转化为二进制的几种方法
  20. 数字科技行业的“挖井人”:京东数科不做一锤子买卖

热门文章

  1. 面试题思考:try 代码块中含 return 语句时,代码执行顺序
  2. windows 下 新建 点开头的文件和文件夹
  3. SHELL 002 -- ps命令常用方法
  4. javascript中的弹框
  5. Android典型界面设计——ViewPage+Fragment实现区域顶部tab滑动切换
  6. linux下通过文件句柄恢复误删除的数据文件
  7. cisco路由器配置DHCP实例
  8. [Leetcode]设计链表
  9. 字符串中单词去多余空格
  10. 2020-10-19