futures库是很多人学习rust异步编程的第一站,今天我将通过一个简单的hello world程序,来揭开futures的执行细节。

本文使用rust 1.44.0 stable版本。

hello world

首先来一个异步执行的hello world:

use futures::executor;async fn hello() {println!("Hello, world!");
}fn main() {let fut = hello();executor::block_on(fut);


hello::hello::{{closure}} main.rs:4
<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll mod.rs:66
futures_executor::local_pool::block_on::{{closure}} local_pool.rs:317
futures_executor::local_pool::run_executor::{{closure}} local_pool.rs:87
std::thread::local::LocalKey<T>::try_with local.rs:263
std::thread::local::LocalKey<T>::with local.rs:239
futures_executor::local_pool::run_executor local_pool.rs:83
futures_executor::local_pool::block_on local_pool.rs:317
hello::main main.rs:9
std::rt::lang_start::{{closure}} rt.rs:67
std::rt::lang_start_internal::{{closure}} rt.rs:52
std::panicking::try::do_call panicking.rs:331
std::panicking::try panicking.rs:274
std::panic::catch_unwind panic.rs:394
std::rt::lang_start_internal rt.rs:51
std::rt::lang_start rt.rs:67
main 0x0000000000401b2c
__tmainCRTStartup 0x00000000004013c7
mainCRTStartup 0x00000000004014fb



hello::main main.rs:9


pub fn block_on<F: Future>(f: F) -> F::Output {pin_mut!(f);run_executor(|cx| f.as_mut().poll(cx))


// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {CURRENT_THREAD_NOTIFY.with(|thread_notify| {let waker = waker_ref(thread_notify);let mut cx = Context::from_waker(&waker);loop {if let Poll::Ready(t) = f(&mut cx) {return t;}// Consume the wakeup that occurred while executing `f`, if any.let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);if !unparked {// No wakeup occurred. It may occur now, right before parking,// but in that case the token made available by `unpark()`// is guaranteed to still be available and `park()` is a no-op.thread::park();// When the thread is unparked, `unparked` will have been set// and needs to be unset before the next call to `f` to avoid// a redundant loop iteration.thread_notify.unparked.store(false, Ordering::Release);}}})

CURRENT_THREAD_NOTIFY是一个线程局部存储变量,由于ThreadNotify结构实现了ArcWake trait,所以可以通过futures::task::waker_ref()获得WakerRef,进而构造出context。

pub(crate) struct ThreadNotify {/// The (single) executor thread.thread: Thread,/// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"/// before the next `park()`, which may otherwise happen if the code/// being executed as part of the future(s) being polled makes use of/// park / unpark calls of its own, i.e. we cannot assume that no other/// code uses park / unpark on the executing `thread`.unparked: AtomicBool,
}thread_local! {static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {thread: thread::current(),unparked: AtomicBool::new(false),});
}impl ArcWake for ThreadNotify {fn wake_by_ref(arc_self: &Arc<Self>) {// Make sure the wakeup is remembered until the next `park()`.let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);if !unparked {// If the thread has not been unparked yet, it must be done// now. If it was actually parked, it will run again,// otherwise the token made available by `unpark`// may be consumed before reaching `park()`, but `unparked`// ensures it is not forgotten.arc_self.thread.unpark();}}


我们都听过,使用async修饰函数时,相当于返回一个实现了Future trait的trait object。比如上面的hello()等价于

fn hello() -> impl Future<Output = ()> {async {println!("Hello, world!");}


async fn hello() {async_std::task::sleep(Duration::from_secs(3)).await;println!("Hello, world!");



Generator是rust异步执行的基础,在上文中,hello()的函数体会成为generator::resume的函数体,如果hello() 阻塞在操作,则由await调用generator的yield返回。


/// Wrap a generator in a future.
/// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
/// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
whereT: Generator<ResumeTy, Yield = ()>,
{struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);// We rely on the fact that async/await futures are immovable in order to create// self-referential borrows in the underlying generator.impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {type Output = T::Return;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {// Safety: Safe because we're !Unpin + !Drop, and this is just a field projection.let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };// Resume the generator, turning the `&mut Context` into a `NonNull` raw pointer. The// `.await` lowering will safely cast that back to a `&mut Context`.match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {GeneratorState::Yielded(()) => Poll::Pending,GeneratorState::Complete(x) => Poll::Ready(x),}}}GenFuture(gen)



loop {match polling_future_block() {Pending => yield,Ready(x) => break x}




