七 实现Futures--主要例子




如果您希望跟随我们的步骤,可以通过创建一个新的文件夹初始化一个新的 cargo 项目,并在其中运行 cargo init。所有的一切都在main.rs文件中.



use std::{future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex},task::{Context, Poll, RawWaker, RawWakerVTable, Waker},thread::{self, JoinHandle}, time::{Duration, Instant}





  1. Future返回Ready,然后就可以调度其他任何后续操作.

  2. 这个Future从未被轮询过,所以传入一个Waker,然后将它挂起

  3. 这个Future已经被轮询过,但是返回Pending

Rust通过Waker为Reactor和执行器提供了通信方式. reactor存储这个Waker,然后在Future等待的事件完成的时候调用Waker: : wake (),这样Future就会被再次轮询.


// Our executor takes any object which implements the `Future` trait
fn block_on<F: Future>(mut future: F) -> F::Output {// the first thing we do is to construct a `Waker` which we'll pass on to// the `reactor` so it can wake us up when an event is ready.let mywaker = Arc::new(MyWaker{ thread: thread::current() }); let waker = waker_into_waker(Arc::into_raw(mywaker));// The context struct is just a wrapper for a `Waker` object. Maybe in the// future this will do more, but right now it's just a wrapper.let mut cx = Context::from_waker(&waker);// So, since we run this on one thread and run one future to completion// we can pin the `Future` to the stack. This is unsafe, but saves an// allocation. We could `Box::pin` it too if we wanted. This is however// safe since we shadow `future` so it can't be accessed again and will// not move until it's dropped.let mut future = unsafe { Pin::new_unchecked(&mut future) };// We poll in a loop, but it's not a busy loop. It will only run when// an event occurs, or a thread has a "spurious wakeup" (an unexpected wakeup// that can happen for no good reason).let val = loop {match Future::poll(pinned, &mut cx) {// when the Future is ready we're finishedPoll::Ready(val) => break val,// If we get a `pending` future we just go to sleep...Poll::Pending => thread::park(),};};val

在本章的所有例子中,我都选择了对代码进行广泛的注释。 我发现沿着这条路走会更容易一些,所以我不会在这里重复自己的话,只关注一些可能需要进一步解释的重要方面。

现在你已经阅读了这么多关于生成器和 Pin 的内容,这应该很容易理解。 Future是一个状态机,每一个await点也是一个yield点。我们可以跨越await借用,我们遇到的问题与跨yield借用时完全一样。

Context只是 Waker 的包装器, 至少在我写这本书的时候,它仅仅是这样。在未来,Context对象可能不仅仅是包装一个Waker(译者注,原文是Future,应该有误),因此这种额外的抽象可以提供一些灵活性。






// This is the definition of our `Waker`. We use a regular thread-handle here.
// It works but it's not a good solution. It's easy to fix though, I'll explain
// after this code snippet.
struct MyWaker {thread: thread::Thread,
}// This is the definition of our `Future`. It keeps all the information we
// need. This one holds a reference to our `reactor`, that's just to make
// this example as easy as possible. It doesn't need to hold a reference to
// the whole reactor, but it needs to be able to register itself with the
// reactor.
pub struct Task {id: usize,reactor: Arc<Mutex<Box<Reactor>>>,data: u64,
}// These are function definitions we'll use for our waker. Remember the
// "Trait Objects" chapter earlier.
fn mywaker_wake(s: &MyWaker) {let waker_ptr: *const MyWaker = s;let waker_arc = unsafe {Arc::from_raw(waker_ptr)};waker_arc.thread.unpark();
}// Since we use an `Arc` cloning is just increasing the refcount on the smart
// pointer.
fn mywaker_clone(s: &MyWaker) -> RawWaker {let arc = unsafe { Arc::from_raw(s) };std::mem::forget(arc.clone()); // increase ref countRawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}// This is actually a "helper funtcion" to create a `Waker` vtable. In contrast
// to when we created a `Trait Object` from scratch we don't need to concern
// ourselves with the actual layout of the `vtable` and only provide a fixed
// set of functions
const VTABLE: RawWakerVTable = unsafe {RawWakerVTable::new(|s| mywaker_clone(&*(s as *const MyWaker)),     // clone|s| mywaker_wake(&*(s as *const MyWaker)),      // wake|s| mywaker_wake(*(s as *const &MyWaker)),      // wake by ref|s| drop(Arc::from_raw(s as *const MyWaker)),   // decrease refcount)
};// Instead of implementing this on the `MyWaker` object in `impl Mywaker...` we
// just use this pattern instead since it saves us some lines of code.
fn waker_into_waker(s: *const MyWaker) -> Waker {let raw_waker = RawWaker::new(s as *const (), &VTABLE);unsafe { Waker::from_raw(raw_waker) }
}impl Task {fn new(reactor: Arc<Mutex<Box<Reactor>>>, data: u64, id: usize) -> Self {Task { id, reactor, data }}
}// This is our `Future` implementation
impl Future for Task {type Output = usize;// Poll is the what drives the state machine forward and it's the only// method we'll need to call to drive futures to completion.fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {// We need to get access the reactor in our `poll` method so we acquire// a lock on that.let mut r = self.reactor.lock().unwrap();// First we check if the task is marked as readyif r.is_ready(self.id) {// If it's ready we set its state to `Finished`*r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;Poll::Ready(self.id)// If it isn't finished we check the map we have stored in our Reactor// over id's we have registered and see if it's there} else if r.tasks.contains_key(&self.id) {// This is important. The docs says that on multiple calls to poll,// only the Waker from the Context passed to the most recent call// should be scheduled to receive a wakeup. That's why we insert// this waker into the map (which will return the old one which will// get dropped) before we return `Pending`.r.tasks.insert(self.id, TaskState::NotReady(cx.waker().clone()));Poll::Pending} else {// If it's not ready, and not in the map it's a new task so we// register that with the Reactor and return `Pending`r.register(self.data, cx.waker().clone(), self.id);Poll::Pending}// Note that we're holding a lock on the `Mutex` which protects the// Reactor all the way until the end of this scope. This means that// even if our task were to complete immidiately, it will not be// able to call `wake` while we're in our `Poll` method.// Since we can make this guarantee, it's now the Executors job to// handle this possible race condition where `Wake` is called after// `poll` but before our thread goes to sleep.}

这大部分都是直截了当的。令人困惑的部分是我们需要构建 Waker 的奇怪方式,但是由于我们已经从原始部分创建了我们自己的 trait 对象,这看起来很熟悉。事实上,这更简单。


在我们这种特定场景下,我们选择不使用Arc. 而使用这种更低层次方式实现的Waker才可以允许我们这么做.

事实上,如果我们只使用 Arc,那么我们就没有理由费尽心思去创建自己的 vtable 和 RawWaker。我们可以实现一个普通的trait。


我们选择在这里传入一个整个reactor的引用, 这不正常。reactor通常是一个全局性的资源,让我们注册感兴趣的事而不需要传入一个引用.



  1. 一个future可以在另一个不同的线程上unpark执行器线程

  2. 我们的执行器认为数据准备好了,然后醒来去轮询这个Future

  3. 当被轮询时,这个Future还没有准备好,但是恰在此时,Reactor收到事件,调用了Wake()来unpark我们的线程.

  4. 这可能发生在我们再次睡眠之前,因为这些操作完全是并行的.

  5. 我们的reactor已经调用过wake,但是我们的线程仍然在睡眠,因为刚刚调用wake的时候,我们的线程是醒着的.

  6. 我们发生了死锁,然后我们的程序停止工作.

有一种情况是,我们的线程可能会出现所谓的虚假唤醒(可能会出乎意料地发生) ,如果我们运气不好,这可能会导致同样的死锁


  • std::sync::CondVar

  • crossbeam::sync::Parker




这就是reacotor的工作. 大多数时候你看到的reactor都是用Mio这个库. 它早多个平台上提供了非阻塞API和事件通知机制.





// This is a "fake" reactor. It does no real I/O, but that also makes our
// code possible to run in the book and in the playground
// The different states a task can have in this Reactor
enum TaskState {Ready,NotReady(Waker),Finished,
// The different states a task can have in this Reactor
enum TaskState {Ready,NotReady(Waker),Finished,
}
// code possible to run in the book and in the playground
struct Reactor {// we need some way of registering a Task with the reactor. Normally this// would be an "interest" in an I/O eventdispatcher: Sender<Event>,handle: Option<JoinHandle<()>>,// This is a list of taskstasks: HashMap<usize, TaskState>,
}// This represents the Events we can send to our reactor thread. In this
// example it's only a Timeout or a Close event.
enum Event {Close,Timeout(u64, usize),
}impl Reactor {// We choose to return an atomic reference counted, mutex protected, heap// allocated `Reactor`. Just to make it easy to explain... No, the reason// we do this is://// 1. We know that only thread-safe reactors will be created.// 2. By heap allocating it we can obtain a reference to a stable address// that's not dependent on the stack frame of the function that called `new`fn new() -> Arc<Mutex<Box<Self>>> {let (tx, rx) = channel::<Event>();let reactor = Arc::new(Mutex::new(Box::new(Reactor {dispatcher: tx,handle: None,tasks: HashMap::new(),})));// Notice that we'll need to use `weak` reference here. If we don't,// our `Reactor` will not get `dropped` when our main thread is finished// since we're holding internal references to it.// Since we're collecting all `JoinHandles` from the threads we spawn// and make sure to join them we know that `Reactor` will be alive// longer than any reference held by the threads we spawn here.let reactor_clone = Arc::downgrade(&reactor);// This will be our Reactor-thread. The Reactor-thread will in our case// just spawn new threads which will serve as timers for us.let handle = thread::spawn(move || {let mut handles = vec![];// This simulates some I/O resourcefor event in rx {println!("REACTOR: {:?}", event);let reactor = reactor_clone.clone();match event {Event::Close => break,Event::Timeout(duration, id) => {// We spawn a new thread that will serve as a timer// and will call `wake` on the correct `Waker` once// it's done.let event_handle = thread::spawn(move || {thread::sleep(Duration::from_secs(duration));let reactor = reactor.upgrade().unwrap();reactor.lock().map(|mut r| r.wake(id)).unwrap();});handles.push(event_handle);}}}// This is important for us since we need to know that these// threads don't live longer than our Reactor-thread. Our// Reactor-thread will be joined when `Reactor` gets dropped.handles.into_iter().for_each(|handle| handle.join().unwrap());});reactor.lock().map(|mut r| r.handle = Some(handle)).unwrap();reactor}// The wake function will call wake on the waker for the task with the// corresponding id.fn wake(&mut self, id: usize) {self.tasks.get_mut(&id).map(|state| {// No matter what state the task was in we can safely set it// to ready at this point. This lets us get ownership over the// the data that was there before we replaced it.match mem::replace(state, TaskState::Ready) {TaskState::NotReady(waker) => waker.wake(),TaskState::Finished => panic!("Called 'wake' twice on task: {}", id),_ => unreachable!()}}).unwrap();}// Register a new task with the reactor. In this particular example// we panic if a task with the same id get's registered twicefn register(&mut self, duration: u64, waker: Waker, id: usize) {if self.tasks.insert(id, TaskState::NotReady(waker)).is_some() {panic!("Tried to insert a task with id: '{}', twice!", id);}self.dispatcher.send(Event::Timeout(duration, id)).unwrap();}// We send a close event to the reactor so it closes down our reactor-threadfn close(&mut self) {self.dispatcher.send(Event::Close).unwrap();}// We simply checks if a task with this id is in the state `TaskState::Ready`fn is_ready(&self, id: usize) -> bool {self.tasks.get(&id).map(|state| match state {TaskState::Ready => true,_ => false,}).unwrap_or(false)}
}impl Drop for Reactor {fn drop(&mut self) {self.handle.take().map(|h| h.join().unwrap()).unwrap();}



fn main() {// This is just to make it easier for us to see when our Future was resolvedlet start = Instant::now();// Many runtimes create a glocal `reactor` we pass it as an argumentlet reactor = Reactor::new();// Since we'll share this between threads we wrap it in a// atmically-refcounted- mutex.let reactor = Arc::new(Mutex::new(reactor));// We create two tasks:// - first parameter is the `reactor`// - the second is a timeout in seconds// - the third is an `id` to identify the tasklet future1 = Task::new(reactor.clone(), 1, 1);let future2 = Task::new(reactor.clone(), 2, 2);// an `async` block works the same way as an `async fn` in that it compiles// our code into a state machine, `yielding` at every `await` point.let fut1 = async {let val = future1.await;let dur = (Instant::now() - start).as_secs_f32();println!("Future got {} at time: {:.2}.", val, dur);};let fut2 = async {let val = future2.await;let dur = (Instant::now() - start).as_secs_f32();println!("Future got {} at time: {:.2}.", val, dur);};// Our executor can only run one and one future, this is pretty normal// though. You have a set of operations containing many futures that// ends up as a single future that drives them all to completion.let mainfut = async {fut1.await;fut2.await;};// This executor will block the main thread until the futures is resolvedblock_on(mainfut);// When we're done, we want to shut down our reactor thread so our program// ends nicely.reactor.lock().map(|mut r| r.close()).unwrap();


  1. Waker这个对象如何像前面我们讨论的trai对象

  2. 事件以何种顺序向reactor注册感兴趣的信息


Async 关键字可以用在 async fn (...)中的函数上,也可以用在 async 中的块上。两者都可以讲一个函数或者代码块转换成一个Future



我们的mainfut包含两个non-leaf-future,它将在轮询中调用。non-leaf-future有一个poll方法, 这个方法简单的轮询他自己的内部Future,它内部的Future会被继续轮询,直到leaf-future返回Ready或者Pending.



Future got 1 at time: 1.00.
Future got 2 at time: 3.00.
Future got 1 at time: 1.00.
Future got 2 at time: 2.00.



下一步应该是了解更高级的运行时是如何工作的,以及它们如何实现不同的运行 Futures 的方式。





正如我们在本章前面解释的那样,仅仅调用thread::sleep 并不足以实现一个合适的反应器。你也可以使用类似crossbeam::sync::Parker中的Parker 这样的工具.

因为我们自己创建一个这样的Parker也不需要很多行代码,所以我们将展示如何通过使用 Condvar 和 Mutex 来解决这个问题。


struct Parker(Mutex<bool>, Condvar);impl Parker {fn park(&self) {// We aquire a lock to the Mutex which protects our flag indicating if we// should resume execution or not.let mut resumable = self.0.lock().unwrap();// We put this in a loop since there is a chance we'll get woken, but// our flag hasn't changed. If that happens, we simply go back to sleep.while !*resumable {// We sleep until someone notifies usresumable = self.1.wait(resumable).unwrap();}// We immidiately set the condition to false, so that next time we call `park` we'll// go right to sleep.*resumable = false;}fn unpark(&self) {// We simply acquire a lock to our flag and sets the condition to `runnable` when we// get it.*self.0.lock().unwrap() = true;// We notify our `Condvar` so it wakes up and resumes.self.1.notify_one();}

在 Rust 中的 Condvar 被设计为与互斥对象一起工作。通常,您会认为在我们进入休眠之前,self.0.lock().unwrap()不会释放锁, 这意味着我们的unpark永远获取不到锁,我们会陷入死锁。


fn block_on<F: Future>(mut future: F) -> F::Output {let parker = Arc::new(Parker::default()); // <--- NB!let mywaker = Arc::new(MyWaker { parker: parker.clone() }); <--- NB!let waker = mywaker_into_waker(Arc::into_raw(mywaker));let mut cx = Context::from_waker(&waker);// SAFETY: we shadow `future` so it can't be accessed again.let mut future = unsafe { Pin::new_unchecked(&mut future) }; loop {match Future::poll(future.as_mut(), &mut cx) {Poll::Ready(val) => break val,Poll::Pending => parker.park(), // <--- NB!};}


struct MyWaker {parker: Arc<Parker>,
}fn mywaker_wake(s: &MyWaker) {let waker_arc = unsafe { Arc::from_raw(s) };waker_arc.parker.unpark();

你可以查看由park/unpark引起的微妙问题的连接. 你可以在这里查看我们最终的版本如何避免了这个问题.

