Seastar future

promise with future

一个future对象必须绑定在一个promise对象上,当promise::set_value()被调用时,该future对象变为就绪态。

我们以一个future的常见用途为起点分析:

template <typename Clock = steady_clock_type, typename Rep, typename Period>
future<> sleep(std::chrono::duration<Rep, Period> dur) {struct sleeper {promise<> done;timer<Clock> tmr;sleeper(std::chrono::duration<Rep, Period> dur): tmr([this] { done.set_value(); }){tmr.arm(dur);}};sleeper *s = new sleeper(dur);future<> fut = s->done.get_future();return fut.then([s] { delete s; });
}

可以看到future对象是通过调用promise::get_future方法生成的。

一个promise对象有以下数据成员:

future_base* _future;
future_state_base* _state; // 始终指向_local_state实体,如果后者被移动,那么_state会更新指向。
task* _task;
future_state _local_state;

一个future对象有以下数据成员:

promise_base* _promise;
future_state _state;

调用get_future方法会返回以当前promise地址为参数来构造的一个future:

template <typename SEASTAR_ELLIPSIS T>
inline
future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {assert(!this->_future && this->_state && !this->_task);return future<T SEASTAR_ELLIPSIS>(this);
}future(promise<T SEASTAR_ELLIPSIS>* pr) noexcept : future_base(pr, &_state), _state(std::move(pr->_local_state)) { }

promise(和promise_base)的构造函数做以下事情:

  • promise._state指向自身promise对象中的_local_state成员。
  • _future_task等指针成员被初始化为nullptr。

future(和future_base)的构造函数做以下事情:

  • future._promise指向当前promise对象。
  • promise._future指向当前future对象。
  • future._state被移动赋值为当前promise对象中的_local_state成员。
  • promise._state指向当前future对象中的_state成员。

至此,仅有promise._task成员未赋值,而当前的promise对象和future对象已经一对一相互绑定了。

future_state

promise和future对象中的future_state数据成员包含了当前future调用链的状态,其继承自两个类:

template <typename T>
struct future_state :  public future_state_base, private internal::uninitialized_wrapper<T> {// ...
}

future_state_base类保存当前的状态(valid, available, failed等)和exception_ptr,而internal::uninitialized_wrapper<T>类保存value。(一个future就绪时,exception_ptr和value中有且只有一个有效)。

set_value

现在让我们看看一个promise如何resolve一个future,让它成为就绪态并传递相关的值(或异常)。

sleep(std::chrono::duration<Rep, Period> dur)的例子中,定时器到期后会调用promise对象的set_value()方法:

sleeper(std::chrono::duration<Rep, Period> dur): tmr([this] { done.set_value(); }) // 这里注册定时器回调{tmr.arm(dur); // 开始定时}
template <typename... A>
void set_value(A&&... a) noexcept {if (auto *s = get_state()) {s->set(std::forward<A>(a)...);make_ready<urgent::no>();}
}

set_value()函数中get_state()会返回promise._state(目前指向future对象的_state字段),调用future_state::set()方法会原地构造一个已经就绪的future_state:

template <typename... A>
void set(A&&... a) noexcept {assert(_u.st == state::future);new (this) future_state(ready_future_marker(), std::forward<A>(a)...);
}

此时future变为就绪态,但可能需要调度then任务(见后文分析):

template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {if (_task) {if (Urgent == urgent::yes) {::seastar::schedule_urgent(std::exchange(_task, nullptr));} else {::seastar::schedule(std::exchange(_task, nullptr));}}
}
void schedule(task* t) noexcept {engine().add_task(t);
}

then and schedule

仅仅一个future变为就绪态不会对用户产生什么影响,而如果用户用then指明了就绪后的任务,那么这个任务将会在就绪后被运行。

then方法可以追溯到call_then_impl::run()->future::then_impl()->future::then_impl_nrvo()

template <typename Func, typename Result>Result then_impl_nrvo(Func&& func) noexcept {    using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>;    typename futurator::type fut(future_for_get_promise_marker{});    using pr_type = decltype(fut.get_promise());    schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) {        if (state.failed()) {            pr.set_exception(static_cast<future_state_base&&>(std::move(state)));        } else {            futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {                // clang thinks that "state" is not used, below, for future<>.                // Make it think it is used to avoid an unused-lambda-capture warning.                (void)state;                return internal::future_invoke(func, std::move(state).get_value());            });        }    });    return fut;}

then_impl_nrvo()中首先构造了一个空的future对象fut,然后调用get_promise()在其上绑定构造了一个新的promise对象(promise._state指向future._state),然后调用future::schedule()并返回fut对象。

template <typename Pr, typename Func, typename Wrapper>void schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {    // If this new throws a std::bad_alloc there is nothing that    // can be done about it. The corresponding future is not ready    // and we cannot break the chain. Since this function is    // noexcept, it will call std::terminate if new throws.    memory::scoped_critical_alloc_section _;    auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>         (std::move(pr), std::move(func), std::move(wrapper));    // In a debug build we schedule ready futures, but not in    // other build modes.    #ifdef SEASTAR_DEBUG    if (_state.available()) {        tws->set_state(std::move(_state));        ::seastar::schedule(tws);        return;    }    #endif    schedule(tws);    _state._u.st = future_state_base::state::invalid;}void schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept {        future_base::schedule(tws, &tws->_state);}

**future::schedule()**接受此fut对象对应的pr promise,func函数体和一个就绪时回调函数wrapper并把它们打包成一个continuation,然后判断_state是否已就绪:

  • 如果已就绪,那么立即调用seastar::schedule将此continuation加入工作队列。
  • (大多数情况)如果未就绪,那么调用future_base::schedule(),设置future_state为invalid态后返回。

我们现在考察future_base::schedule()做了什么:

void schedule(task* tws, future_state_base* state) noexcept {    promise_base* p = detach_promise();    p->_state = state;    p->_task = tws;}

可以看到它解除了promise与当前future对象的绑定,然后为此promise分配了future_state和task(即刚创建的continuation)。

我们再回顾上面的promise_base::make_ready():

template <promise_base::urgent Urgent>void promise_base::make_ready() noexcept {    if (_task) {        if (Urgent == urgent::yes) {            ::seastar::schedule_urgent(std::exchange(_task, nullptr));        } else {            ::seastar::schedule(std::exchange(_task, nullptr));        }    }}void schedule(task* t) noexcept {    engine().add_task(t);}

那么整个then的分配过程就很明显了:

  • 创建一个新的future对象和一个新的promise对象。
  • 设置与当前future对象所绑定的promise的_task成员为包含实际任务的continuation。
  • 当与当前future对象所绑定的promise对象调用set_value()时,_task成员被加入reactor的工作队列中。
  • 新的future对象被返回,后续的then()方法将在其上被调用。

但是还有一个问题,既然调用then后返回的是一个新的future对象,那么如何保证then的按顺序联系调用呢?即如何建立新老promise以及task之间的联系呢?

continuation

continuation有以下数据成员:

// 继承自continuation_basescheduling_group _sg;future_state _state;Promise _pr;Func _func;Wrapper _wrapper;

其构造函数会做以下事情:

  • 移动构造保存传入的pr,func,wrapper。
  • 默认初始化其他成员。

接着上文分析,既然continuation已经作为一个task被赋给了原promise的_task成员,当原promise调用set_value时会将此_task添加到reactor的工作队列中,而在reactor的事件循环中,会调用task::run_and_dispose()来执行一个任务:

void reactor::run_tasks(task_queue& tq) {    // Make sure new tasks will inherit our scheduling group    *internal::current_scheduling_group_ptr() = scheduling_group(tq._id);    auto& tasks = tq._q;    while (!tasks.empty()) {        auto tsk = tasks.front();        tasks.pop_front();        STAP_PROBE(seastar, reactor_run_tasks_single_start);        task_histogram_add_task(*tsk);        _current_task = tsk;        tsk->run_and_dispose();        // ...    }}

此处的task是一个continuation对象,那么对应的run_and_dispose()方法就是continuation::run_and_dispose():

virtual void run_and_dispose() noexcept override {    try {        _wrapper(std::move(this->_pr), _func, std::move(this->_state));    } catch (...) {        this->_pr.set_to_current_exception();    }    delete this;}

而_wrapper函数的内容如前所见:

[](pr_type&& pr, Func& func, future_state&& state) {    if (state.failed()) {        pr.set_exception(static_cast<future_state_base&&>(std::move(state)));    } else {        futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {            // clang thinks that "state" is not used, below, for future<>.            // Make it think it is used to avoid an unused-lambda-capture warning.            (void)state;            return internal::future_invoke(func, std::move(state).get_value());        });    }}template<typename T>template<typename Func>SEASTAR_CONCEPT( requires std::invocable<Func> )void futurize<T>::satisfy_with_result_of(promise_base_with_type&& pr, Func&& func) {    using ret_t = decltype(func());    if constexpr (std::is_void_v<ret_t>) {        func();        pr.set_value();    } else if constexpr (is_future<ret_t>::value) {        func().forward_to(std::move(pr));    } else {        pr.set_value(func());    }}

所以_wrapper函数总会调用promise::set_value()或者promise::set_exception(),来resolve promise对应的future。

所以整个future链式异步调用的实现就明了了:

  • 初始的future经一个promise对象分配而来,promise对象的所有者要在合适的时机调用set_value()(或set_exception())。

  • 对future调用then会创建一个新的future对象和一个对应的新的promise对象,新的promise对象被一个continuation对象包装,而原来的promise._task指向该continuaiton对象,而continuaiton的run_and_dispose()方法会调用helper函数——以执行任务并调用新的promise对象的set_value()(或set_exception())方法。

    注意then返回的是新的future对象,因为future对象的作用就是链接新的then任务;可以这么理解,一个promise对应一个实际的任务,而每次then都返回当前链上的最后一个promise对应的future,以实现尾部追加任务,保证顺序性。

  • 所以当原promise的set_value()(或set_exception())方法被调用时,continuaiton的run_and_dispose()被加入工作队列,(过一会)reacotr事件循环调用run_and_dispose()方法时新的promise的set_value()(或set_exception())方法被调用。

还要注意一种情况,那就是如果then中函数体也返回一个future,那么如何令后续的then在此future就绪后才加入工作队列呢?这就是satisfy_with_result_of所做的工作,它首先使用if constexpr识别then中函数体的返回值是不是future,不是则直接执行并调用下一个执行链上的任务;如果返回值是一个future,那么将使用forward_to方法():

/// \brief Satisfy some \ref promise object with this future as a result.// Arranges so that when this future is resolve, it will be used to/// satisfy an unrelated promise.  This is similar to scheduling a/// continuation that moves the result of this future into the promise/// (using promise::set_value() or promise::set_exception(), except/// that it is more efficient.// \param pr a promise that will be fulfilled with the results of this/// future.void forward_to(promise<T SEASTAR_ELLIPSIS>&& pr) noexcept {    if (_state.available()) {        pr.set_urgent_state(std::move(_state));    } else if (&pr._local_state != pr._state) {        // The only case when _state points to _local_state is        // when get_future was never called. Given that pr will        // soon be destroyed, we know get_future will never be        // called and we can just ignore this request.        *detach_promise() = std::move(pr);    }}

forward_to()方法的原理很简单,就是将第一个then中返回的新新future的promise替换成上文提到的新promise,当(已经消失的)新新promise的提供方调用set_value()方法时,实际会调用新promise的set_value()方法,那么将再次重复上述过程。

如注释所说,*detach_promise() = std::move(pr)等价于创建一个新的continuation(该continuation会将前一个promise的值传递给下一个promise)并在当前promise就绪时启动它。

至于在promise和工作队列之外如何实现异步调用set_value()(或set_exception())方法,那就是另一个问题了。

Seastar源码阅读(三)future相关推荐

  1. 24 UsageEnvironment使用环境抽象基类——Live555源码阅读(三)UsageEnvironment

    24 UsageEnvironment使用环境抽象基类--Live555源码阅读(三)UsageEnvironment 24 UsageEnvironment使用环境抽象基类--Live555源码阅读 ...

  2. mybatis源码阅读(三):mybatis初始化(下)mapper解析

    转载自 mybatis源码阅读(三):mybatis初始化(下)mapper解析 MyBatis 的真正强大在于它的映射语句,也是它的魔力所在.由于它的异常强大,映射器的 XML 文件就显得相对简单. ...

  3. SDWebImage源码阅读(三)UIImage+GIF

    UIImage+GIF 是UIImage 类的一个GIF 分类,在之前的版本里面这个分类是用了处理GIF 动态图片的但是会有内存暴增的bug.在当前 '4.0.0-beta2' 的版本里GIF 动态图 ...

  4. Struts2源码阅读(三)_DispatcherConfigurationProvider

    首先强调一下struts2的线程程安全,在Struts2中大量采用ThreadLocal线程局部变量的方法来保证线程的安全,像Dispatcher等都是通过ThreadLocal来保存变量值,使得每个 ...

  5. SpringMVC源码阅读(三)

    先理一下Bean的初始化路线 org.springframework.beans.factory.support.AbstractBeanDefinitionReader public int loa ...

  6. redis源码阅读-持久化之RDB

    持久化介绍: redis的持久化有两种方式: rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot) aof : 记录redis执行的所有写操作命令 根 ...

  7. redis源码阅读-zset

    前段时间给小伙伴分享redis,顺带又把redis撸了一遍了,对其源码,又有了比较深入的了解.(ps: 分享的文章再丰富下再放出来). 数据结构 我们先看下redis 5.0的代码.本次讲解主要是zs ...

  8. redis源码阅读-持久化之aof与aof重写详解

    aof相关配置 aof-rewrite-incremental-fsync yes # aof 开关,默认是关闭的,改为yes表示开启 appendonly no # aof的文件名,默认 appen ...

  9. mysql 1260,MYSQL 源码阅读 六

    前期节要 MYSQL源码阅读 一 MYSQL源码阅读 二 MYSQL源码阅读 三 MYSQL 源码阅读 四 MYSQL 源码阅读 五 上次有两个问题没搞明白 1 是 为什么一定要开启调试线程 ? 因为 ...

最新文章

  1. 从红旗5.0提及——看Linux的内存办理
  2. wordpress 分类使用不同的模版
  3. C++ 类访问控制(public/protected/private)小结
  4. socket编程中的异常处理
  5. Nginx的nginx.conf配置文件中文注释说明
  6. Jeecg平台扩展性不好的地方收集启动。
  7. ffmbc——广播电视以及专业用途量身定制的FFmpeg
  8. 四年级打字计算机上册教案,2019四年级上信息技术教案(A)打字速度靠指法_泰山版教育.doc.docx...
  9. Spyder远程连接矩池云
  10. Linux文件类型及颜色标识整理
  11. 医院职工离职申请证明模板,共计10篇
  12. 新课改计算机论文,新课改中职计算机职业教育论文
  13. jQuery ajaxSubmit 自动重复提交表单问题解决
  14. 2020 智能零售领域最具商业合作价值企业盘点
  15. 通达OA2017版 手机签章会撑大表格的处理
  16. openjtag openocd libftd2xx
  17. C++华氏温度和摄氏温度的转换
  18. 408复习策略(强化阶段)
  19. Springboot毕设项目电商系统设计与实现t32la(java+VUE+Mybatis+Maven+Mysql)
  20. 联想一体机用u盘装linux教程,联想一体机如何用u盘装系统

热门文章

  1. 数字图像处理matlab实践
  2. C++ Lambda 表达式
  3. python获取gps数据_Python GPS模块:读取最新的GPS数据
  4. 射频识别RFID:七个问题 让你看透RFID
  5. vba报错:不能设置类worksheet的visible属性
  6. 火狐浏览器webdriver下载
  7. 简单的解释下什么是CNAME?
  8. 魔众大转盘抽奖系统PHP源码
  9. java.io.FileNotFoundException: class path resource [applicationContext.xml] cannot be opened becaus
  10. oracle中的冲销日记账,OraEBSR12GL日记账业务操作09:日记账冲销处理