深入异步

Tokio教程之深入异步

https://tokio.rs/tokio/tutorial/async

在这一点上,我们已经完成了对异步 Rust 和 Tokio 的相当全面的考察。现在我们将深入挖掘Rust的异步运行时模型。在教程的一开始,我们就暗示过,异步Rust采取了一种独特的方法。现在,我们解释一下这意味着什么。

Futures

作为快速回顾,让我们采取一个非常基本的异步函数。与本教程到目前为止所涉及的内容相比,这并不新鲜。

use tokio::net::TcpStream;async fn my_async_fn() {println!("hello from async");let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();println!("async TCP operation complete");
}

Copy

我们调用这个函数,它返回一些值。我们在这个值上调用.await。

#[tokio::main]
async fn main() {let what_is_this = my_async_fn();// Nothing has been printed yet.what_is_this.await;// Text has been printed and socket has been// established and closed.
}

Copy

my_async_fn() 返回的值是一个future。future是一个实现了标准库所提供的 std::future::Future 特性的值。它们是包含正在进行的异步计算的值。

std::future::Future trait的定义是:

use std::pin::Pin;
use std::task::{Context, Poll};pub trait Future {type Output;fn poll(self: Pin<&mut Self>, cx: &mut Context)-> Poll<Self::Output>;
}

Copy

相关类型 Output 是 future 完成后产生的类型。Pin 类型是Rust能够支持异步函数中的借用的方式。更多细节请参见标准库文档。

与其他语言实现 future 的方式不同,Rust future 并不代表在后台发生的计算,相反,Rust future就是计算本身。Future的所有者负责通过轮询Future来推进计算。这可以通过调用 Future::poll 来实现。

实现future

让我们来实现一个非常简单的future。这个future将:

  • 等待到一个特定的时间点。
  • 输出一些文本到STDOUT。
  • 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};struct Delay {when: Instant,
}impl Future for Delay {type Output = &'static str;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<&'static str>{if Instant::now() >= self.when {println!("Hello world");Poll::Ready("done")} else {// Ignore this line for now.cx.waker().wake_by_ref();Poll::Pending}}
}#[tokio::main]
async fn main() {let when = Instant::now() + Duration::from_millis(10);let future = Delay { when };let out = future.await;assert_eq!(out, "done");
}

Copy

作为Future的Async fn

在main函数中,我们实例化了future并对其调用 .await。从异步函数中,我们可以对任何实现Future的值调用 .await。反过来,调用一个异步函数会返回一个实现Future的匿名类型。在 async fn main() 的例子中,生成的future大致是这样的。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};enum MainFuture {// Initialized, never polledState0,// Waiting on `Delay`, i.e. the `future.await` line.State1(Delay),// The future has completed.Terminated,
}impl Future for MainFuture {type Output = ();fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<()>{use MainFuture::*;loop {match *self {State0 => {let when = Instant::now() +Duration::from_millis(10);let future = Delay { when };*self = State1(future);}State1(ref mut my_future) => {match Pin::new(my_future).poll(cx) {Poll::Ready(out) => {assert_eq!(out, "done");*self = Terminated;return Poll::Ready(());}Poll::Pending => {return Poll::Pending;}}}Terminated => {panic!("future polled after completion")}}}}
}

Copy

Rust futures是一种状态机。在这里,MainFuture 被表示为一个 future 的可能状态的枚举。future 在 State0 状态下开始。当 poll 被调用时,future 试图尽可能地推进其内部状态。如果 future 能够完成,Poll::Ready 将被返回,其中包含异步计算的输出。

如果future不能完成,通常是由于它所等待的资源没有准备好,那么就会返回 Poll::Pending。收到 Poll::Pending 是向调用者表明,future 将在稍后的时间完成,调用者应该在稍后再次调用poll。

我们还看到,future 是由其他 future 组成的。在外层 future 上调用 poll 的结果是调用内部 future 的 poll 函数。

executors

异步的Rust函数返回future。future必须被调用 poll 以推进其状态。future是由其他 future 组成的。那么,问题来了,是什么在最外层的 future 上调用poll?

回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn 或者是被 #[tokio::main] 注释的主函数。这样做的结果是将生成的外层 future 提交给 Tokio执行器。执行器负责在外部 future 上调用 Future::poll,推动异步计算的完成。

mini Tokio

为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的最小版本的Tokio! 完整的代码可以在这里找到。

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;fn main() {let mut mini_tokio = MiniTokio::new();mini_tokio.spawn(async {let when = Instant::now() + Duration::from_millis(10);let future = Delay { when };let out = future.await;assert_eq!(out, "done");});mini_tokio.run();
}struct MiniTokio {tasks: VecDeque<Task>,
}type Task = Pin<Box<dyn Future<Output = ()> + Send>>;impl MiniTokio {fn new() -> MiniTokio {MiniTokio {tasks: VecDeque::new(),}}/// Spawn a future onto the mini-tokio instance.fn spawn<F>(&mut self, future: F)whereF: Future<Output = ()> + Send + 'static,{self.tasks.push_back(Box::pin(future));}fn run(&mut self) {let waker = task::noop_waker();let mut cx = Context::from_waker(&waker);while let Some(mut task) = self.tasks.pop_front() {if task.as_mut().poll(&mut cx).is_pending() {self.tasks.push_back(task);}}}
}

Copy

这将运行异步块。一个具有所要求的延迟的 Delay 实例被创建并被等待。然而,到目前为止,我们的实现有一个重大缺陷。我们的执行器从未进入睡眠状态。执行器不断地循环所有被催生的 future,并对它们进行 poll 。大多数时候,这些 future 还没有准备好执行更多的工作,并会再次返回 Poll::Pending。这个过程会消耗CPU,一般来说效率不高。

理想情况下,我们希望 mini-tokio 只在 future 能够取得进展时 poll future。这发生在任务被阻塞的资源准备好执行请求的操作时。如果任务想从一个TCP套接字中读取数据,那么我们只想在TCP套接字收到数据时 poll 任务。在我们的例子中,任务在达到给定的瞬间被阻断。理想情况下,mini-tokio只会在那个瞬间过去后 poll 任务。

为了实现这一点,当一个资源被 poll 而该资源又还没有准备好时,一旦它过渡到 ready 的状态,该资源将发送一个通知。

Wakers

Waker 是缺失的那部分。这是一个系统,通过这个系统,资源能够通知等待的任务,资源已经准备好继续某些操作。

让我们再看一下Future::poll的定义:

fn poll(self: Pin<&mut Self>, cx: &mut Context)-> Poll<Self::Output>;

Copy

Poll 的 Context 参数有一个 waker() 方法。该方法返回一个与当前任务绑定的Waker。该Waker有一个wake()方法。调用该方法向执行器发出信号,相关任务应该被安排执行。当资源过渡到准备好的状态时调用wake(),通知执行者,poll 任务将能够取得进展。

更新 Delay

我们可以更新 Delay 来使用 wakers。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;struct Delay {when: Instant,
}impl Future for Delay {type Output = &'static str;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<&'static str>{if Instant::now() >= self.when {println!("Hello world");Poll::Ready("done")} else {// Get a handle to the waker for the current tasklet waker = cx.waker().clone();let when = self.when;// Spawn a timer thread.thread::spawn(move || {let now = Instant::now();if now < when {thread::sleep(when - now);}waker.wake();});Poll::Pending}}
}

Copy

现在,一旦请求的持续时间过了,调用的任务就会被通知,执行者可以确保任务被再次安排。下一步是更新mini-tokio以监听唤醒通知。

我们的 Delay 实现还有一些剩余的问题。我们将在以后修复它们。

当一个 future 返回 Poll::Pending 时,它必须确保在某个时间点对 waker 发出信号。忘记这样做会导致任务无限期地挂起。

在返回 Poll::Pending 后忘记唤醒一个任务是一个常见的错误来源。

回顾一下 “Delay"的第一次迭代。这里是 future 的实现。

impl Future for Delay {type Output = &'static str;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)-> Poll<&'static str>{if Instant::now() >= self.when {println!("Hello world");Poll::Ready("done")} else {// Ignore this line for now.cx.waker().wake_by_ref();Poll::Pending}}
}

Copy

在返回 Poll::Pending 之前,我们调用 cx.waker().wake_by_ref()。这是为了满足 future 契约。通过返回 Poll::Pending,我们负责给唤醒者发信号。因为我们还没有实现定时器线程,所以我们在内联中给唤醒者发信号。这样做的结果是,future 将立即被重新安排,再次执行,而且可能还没有准备好完成。

请注意,允许对 waker 发出超过必要次数的信号。在这个特殊的例子中,即使我们根本没有准备好继续操作,我们还是向唤醒者发出信号。除了浪费一些CPU周期外,这样做并没有什么问题。然而,这种特殊的实现方式会导致一个繁忙的循环。

更新Mini Tokio

下一步是更新 Mini Tokio 以接收 waker 的通知。我们希望执行器只在被唤醒时运行任务,为了做到这一点,Mini Tokio将提供它自己的唤醒器。当唤醒者被调用时,其相关的任务将被排队执行。Mini Tokio在 poll future 时将这个 waker 传递给 future。

更新后的 Mini Tokio 将使用一个通道来存储预定任务。通道允许任务从任何线程被排队执行。Wakers 必须是 Send 和 Sync,所以我们使用来自crossbeam crate的通道,因为标准库的通道不是Sync。

Send和Sync特性是Rust提供的与并发性有关的标记特性。可以被发送到不同线程的类型是Send。大多数类型都是Send,但像Rc这样的类型则不是。可以通过不可变的引用并发访问的类型是Sync。一个类型可以是Send,但不是Sync–一个很好的例子是Cell,它可以通过不可变的引用被修改,因此并发访问是不安全的。

更多细节请参见Rust书中的相关章节。

然后,更新MiniTokio的结构。

use crossbeam::channel;
use std::sync::Arc;struct MiniTokio {scheduled: channel::Receiver<Arc<Task>>,sender: channel::Sender<Arc<Task>>,
}struct Task {// This will be filled in soon.
}

Copy

Wakers 是 sync,并且可以被克隆。当 wake 被调用时,任务必须被安排执行。为了实现这一点,我们有一个通道。当 wake() 被调用时,任务被推到通道的发送部分。我们的 task 结构将实现唤醒逻辑。要做到这一点,它需要同时包含催生的future 和通道的发送部分。

use std::sync::{Arc, Mutex};struct Task {// The `Mutex` is to make `Task` implement `Sync`. Only// one thread accesses `future` at any given time. The// `Mutex` is not required for correctness. Real Tokio// does not use a mutex here, but real Tokio has// more lines of code than can fit in a single tutorial// page.future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,executor: channel::Sender<Arc<Task>>,
}impl Task {fn schedule(self: &Arc<Self>) {self.executor.send(self.clone());}
}

Copy

为了安排任务,Arc被克隆并通过通道发送。现在,我们需要将我们的 schedule 函数与 std::task::Waker 挂钩。标准库提供了一个低级别的API,通过手动构建vtable来完成这个任务。这种策略为实现者提供了最大的灵活性,但需要一堆不安全的模板代码。我们不直接使用 RawWakerVTable,而是使用由futures crate提供的ArcWake工具。这使得我们可以实现一个简单的特质,将我们的任务结构暴露为一个waker。

在你的Cargo.toml中添加以下依赖,以拉入future。

futures = "0.3"

Copy

然后实现 futures::task::ArcWake

use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {fn wake_by_ref(arc_self: &Arc<Self>) {arc_self.schedule();}
}

Copy

当上面的定时器线程调用waker.wake()时,任务被推送到通道中。接下来,我们在MiniTokio::run()函数中实现接收和执行任务。

impl MiniTokio {fn run(&self) {while let Ok(task) = self.scheduled.recv() {task.poll();}}/// Initialize a new mini-tokio instance.fn new() -> MiniTokio {let (sender, scheduled) = channel::unbounded();MiniTokio { scheduled, sender }}/// Spawn a future onto the mini-tokio instance.////// The given future is wrapped with the `Task` harness and pushed into the/// `scheduled` queue. The future will be executed when `run` is called.fn spawn<F>(&self, future: F)whereF: Future<Output = ()> + Send + 'static,{Task::spawn(future, &self.sender);}
}impl Task {fn poll(self: Arc<Self>) {// Create a waker from the `Task` instance. This// uses the `ArcWake` impl from above.let waker = task::waker(self.clone());let mut cx = Context::from_waker(&waker);// No other thread ever tries to lock the futurelet mut future = self.future.try_lock().unwrap();// Poll the futurelet _ = future.as_mut().poll(&mut cx);}// Spawns a new taks with the given future.//// Initializes a new Task harness containing the given future and pushes it// onto `sender`. The receiver half of the channel will get the task and// execute it.fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)whereF: Future<Output = ()> + Send + 'static,{let task = Arc::new(Task {future: Mutex::new(Box::pin(future)),executor: sender.clone(),});let _ = sender.send(task);}}

Copy

这里发生了多件事情。首先,MiniTokio::run()被实现。该函数在一个循环中运行,接收来自通道的预定任务。由于任务在被唤醒时被推入通道,这些任务在执行时能够取得进展。

此外,MiniTokio::new() 和 MiniTokio::spwn() 函数被调整为使用通道而不是 VecDeque。当新的任务被催生时,它们会被赋予一个通道的发送者部分的克隆,任务可以用它来在运行时安排自己。

Task::poll() 函数使用来自 futures crate 的 ArcWake 工具创建waker。waker被用来创建一个 task::Context。该 task::Context 被传递给 poll。

摘要

我们现在已经看到了一个端到端的例子,说明异步Rust是如何工作的。Rust的 async/await 功能是由traits支持的。这允许第三方crate,如Tokio,提供执行细节。

  • Rust的异步操作是 lazy 的,需要调用者来 poll 它们。
  • Wakers被传递给futures,以将一个future与调用它的任务联系起来。
  • 当一个资源没有准备好完成一个操作时,Poll::Pending 被返回,任务的waker被记录。
  • 当资源准备好时,任务的 waker 会被通知。
  • 执行者收到通知并安排任务的执行。
  • 任务再次被 poll ,这次资源已经准备好了,任务取得了进展。

某些未尽事宜

记得我们在实现 Delay future 的时候,说过还有一些事情要解决。Rust的异步模型允许单个future在执行时跨任务迁移。考虑一下下面的情况。

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;#[tokio::main]
async fn main() {let when = Instant::now() + Duration::from_millis(10);let mut delay = Some(Delay { when });poll_fn(move |cx| {let mut delay = delay.take().unwrap();let res = Pin::new(&mut delay).poll(cx);assert!(res.is_pending());tokio::spawn(async move {delay.await;});Poll::Ready(())}).await;
}

Copy

poll_fn 函数使用闭包创建Future实例。上面的片段创建了一个Delay实例,对其进行了一次轮询,然后将Delay实例发送到一个新的任务中等待。在这个例子中,Delay::poll在不同的Waker实例中被调用了不止一次。当这种情况发生时,你必须确保在最近一次调用 poll 时所传递的 Waker上调用 wake。

当实现 future 时,关键是要假设每一次对poll的调用都可能提供一个不同的Waker实例。poll 函数必须用新的唤醒者来更新任何先前记录的唤醒者。

我们早期实现的Delay在每次 poll 时都会产生一个新的线程。这很好,但是如果 poll 太频繁的话,效率就会很低(例如,如果你 select! 这个future和其他的future,只要其中一个有事件,这两个都会被poll)。一种方法是记住你是否已经产生了一个线程,如果你还没有产生一个线程,就只产生一个新的线程。然而,如果你这样做,你必须确保线程的Waker在以后调用 poll 时被更新,否则你就不能唤醒最近的Waker。

为了修复我们之前的实现,我们可以这样做。

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};struct Delay {when: Instant,// This Some when we have spawned a thread, and None otherwise.waker: Option<Arc<Mutex<Waker>>>,
}impl Future for Delay {type Output = ();fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {// First, if this is the first time the future is called, spawn the// timer thread. If the timer thread is already running, ensure the// stored `Waker` matches the current task's waker.if let Some(waker) = &self.waker {let mut waker = waker.lock().unwrap();// Check if the stored waker matches the current task's waker.// This is necessary as the `Delay` future instance may move to// a different task between calls to `poll`. If this happens, the// waker contained by the given `Context` will differ and we// must update our stored waker to reflect this change.if !waker.will_wake(cx.waker()) {*waker = cx.waker().clone();}} else {let when = self.when;let waker = Arc::new(Mutex::new(cx.waker().clone()));self.waker = Some(waker.clone());// This is the first time `poll` is called, spawn the timer thread.thread::spawn(move || {let now = Instant::now();if now < when {thread::sleep(when - now);}// The duration has elapsed. Notify the caller by invoking// the waker.let waker = waker.lock().unwrap();waker.wake_by_ref();});}// Once the waker is stored and the timer thread is started, it is// time to check if the delay has completed. This is done by// checking the current instant. If the duration has elapsed, then// the future has completed and `Poll::Ready` is returned.if Instant::now() >= self.when {Poll::Ready(())} else {// The duration has not elapsed, the future has not completed so// return `Poll::Pending`.//// The `Future` trait contract requires that when `Pending` is// returned, the future ensures that the given waker is signalled// once the future should be polled again. In our case, by// returning `Pending` here, we are promising that we will// invoke the given waker included in the `Context` argument// once the requested duration has elapsed. We ensure this by// spawning the timer thread above.//// If we forget to invoke the waker, the task will hang// indefinitely.Poll::Pending}}
}

Copy

这有点复杂,但想法是,在每次调用 poll 时,future 会检查所提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,那么就没有其他事情要做。如果不匹配,则必须更新记录的 waker。

Notify 工具

我们演示了如何使用wakers手工实现一个 Delay future。Wakers是异步Rust工作方式的基础。通常情况下,没有必要深入到这个水平。例如,在Delay的情况下,我们可以通过使用 tokio::sync::Notify 工具,完全用 async/await 实现它。这个工具提供了一个基本的任务通知机制。它处理了waker的细节,包括确保记录的waker与当前任务相匹配。

使用Notify,我们可以像这样用 async/await 实现一个 Delay 函数。

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;async fn delay(dur: Duration) {let when = Instant::now() + dur;let notify = Arc::new(Notify::new());let notify2 = notify.clone();thread::spawn(move || {let now = Instant::now();if now < when {thread::sleep(when - now);}notify2.notify_one();});notify.notified().await;
}

Copy

Tokio教程之深入异步相关推荐

  1. Tikz教程:一个异步FIFO设计步骤示意图的画法

    如上图(图片来源:异步FIFO),如何用Tikz来画出这个图形呢? 作图思路: 使用流程图的作图方法可以很容易作出这幅图形.使用node来画出节点,并用node输入图中不带框的文字.使用"- ...

  2. HTML5前端入门教程:Ajax 异步请求技术

    AJAX的全称是Asynchronous JavaScript and XML(异步的 JavaScript 和 XML). ✦ajax不是新的编程语言,而是一种使用现有标准的新方法.ajax是与服务 ...

  3. rust异步编程--理解并发/多线程/回调/异步/future/promise/async/await/tokio

    1. 异步编程简介 通常我们将消息通信分成同步和异步两种: 同步就是消息的发送方要等待消息返回才能继续处理其它事情 异步就是消息的发送方不需要等待消息返回就可以处理其它事情 很显然异步允许我们同时做更 ...

  4. 使用 Tokio 实现 Actor 系统

    本文将不使用任何 Actors 库(例如 Actix ) 而直接使用Tokio实现 Actors 系统.事实上这甚至是更容易的,但是还是有一些细节需要注意: tokio::spawn 的调用位置. 使 ...

  5. 异步编程之Promise(2):探究原理

    异步编程系列教程: (翻译)异步编程之Promise(1)--初见魅力 异步编程之Promise(2):探究原理 异步编程之Promise(3):拓展进阶 异步编程之Generator(1)--领略魅 ...

  6. Silverlight实例教程 - Validation数据验证开篇

    Silverlight 4 Validation验证实例系列 Silverlight实例教程 - Validation数据验证开篇 Silverlight实例教程 - Validation数据验证基础 ...

  7. Silverlight实例教程 - Validation验证系列汇总

    转自http://www.cnblogs.com/jv9/archive/2010/09/27/1836394.html Silverlight Validation验证系列教程,详细讲解Silver ...

  8. echarts异步加载柱状图遇到的错误- Error: Component series. not exists. Load it first.

    今天看了下echarts教程之中的异步加载柱状图,我按照教程中的代码敲出来之后再运行,就报了一个 Error: Component series. not exists. Load it first. ...

  9. Turbot4机器人入门教程-配置网络

     系列文章目录: Turbot4机器人入门教程-硬件清单 Turbot4机器人入门教程-软件清单 Turbot4机器人入门教程-NoMachine远程控制 Turbot4机器人入门教程-配置网络 Tu ...

  10. Turbot4机器人入门教程-使用统一建图入口

      系列文章目录: Turbot4机器人入门教程-硬件清单 Turbot4机器人入门教程-软件清单 Turbot4机器人入门教程-NoMachine远程控制 Turbot4机器人入门教程-配置网络 T ...

最新文章

  1. 大数据分析技术未来发展会如何
  2. python文本风格_Python的代码风格
  3. VMware 中软盘镜像文件 *.flp 使用方法
  4. HenCoder「仿写酷界面」征稿
  5. 软件工程标准与软件文档
  6. centos7下安装airflow
  7. 使用tkinter打造一个小说下载器,想看什么小说,就下什么
  8. android studio耗电量检测,[腾讯 TMQ] Android 场景化性能测试专栏之 CPU 耗电性能篇...
  9. android11下文件管理,华为文件管理器下载-华为文件管理器 安卓版v10.11.11.301-PC6安卓网...
  10. 视频图片 超分与动漫化+补帧
  11. 24点游戏java代码 中国开源社区_编程实现一个有GUI的24点游戏
  12. 一度智信:新开的电商店铺销量低?如何快速提升
  13. 【Python】python之subprocess模块详解
  14. 虚函数,虚指针和虚表详解
  15. ubuntu、win跨平台局域网文件传输工具
  16. 学习微信小程序开发框架之脚本语言WXS
  17. phpspreadsheet使用导出excel
  18. 什么是大端法和小端法?
  19. 2万5千字大厂面经 | 掘金技术征文
  20. 校招前端二面常考react面试题(边面边更)

热门文章

  1. IP协议——IP地址的基础知识
  2. Linux系统 应急响应自动化检测工具 GScan ——使用教程
  3. xp系统计算机怎么连接到网络打印机,xp打印机共享怎么设置(教你如何在电脑上连接XP系统的共享打印机)...
  4. 为什么有些微信群要不定期清理不活跃成员?
  5. Aspose.Barcode创建二维码应用代码示例
  6. FFmpeg无损转换ts为mp4
  7. 内网穿透到cs上线--部署到公网上!!
  8. 阿里云商标智能注册申请图文教程(亲踩坑)
  9. x265中的lookahead
  10. ego-planner论文阅读笔记