Seastar源码阅读(三)future
Seastar future
promise with future
一个future对象必须绑定在一个promise对象上,当promise::set_value()被调用时,该future对象变为就绪态。
我们以一个future的常见用途为起点分析:
template <typename Clock = steady_clock_type, typename Rep, typename Period>
future<> sleep(std::chrono::duration<Rep, Period> dur) {struct sleeper {promise<> done;timer<Clock> tmr;sleeper(std::chrono::duration<Rep, Period> dur): tmr([this] { done.set_value(); }){tmr.arm(dur);}};sleeper *s = new sleeper(dur);future<> fut = s->done.get_future();return fut.then([s] { delete s; });
}
可以看到future对象是通过调用promise::get_future方法生成的。
一个promise对象有以下数据成员:
future_base* _future;
future_state_base* _state; // 始终指向_local_state实体,如果后者被移动,那么_state会更新指向。
task* _task;
future_state _local_state;
一个future对象有以下数据成员:
promise_base* _promise;
future_state _state;
调用get_future方法会返回以当前promise地址为参数来构造的一个future:
template <typename SEASTAR_ELLIPSIS T>
inline
future<T SEASTAR_ELLIPSIS>
promise<T SEASTAR_ELLIPSIS>::get_future() noexcept {assert(!this->_future && this->_state && !this->_task);return future<T SEASTAR_ELLIPSIS>(this);
}future(promise<T SEASTAR_ELLIPSIS>* pr) noexcept : future_base(pr, &_state), _state(std::move(pr->_local_state)) { }
promise(和promise_base)的构造函数做以下事情:
promise._state
指向自身promise对象中的_local_state成员。_future
,_task
等指针成员被初始化为nullptr。
future(和future_base)的构造函数做以下事情:
future._promise
指向当前promise对象。promise._future
指向当前future对象。future._state
被移动赋值为当前promise对象中的_local_state成员。promise._state
指向当前future对象中的_state成员。
至此,仅有promise._task
成员未赋值,而当前的promise对象和future对象已经一对一相互绑定了。
future_state
promise和future对象中的future_state数据成员包含了当前future调用链的状态,其继承自两个类:
template <typename T>
struct future_state : public future_state_base, private internal::uninitialized_wrapper<T> {// ...
}
future_state_base
类保存当前的状态(valid, available, failed等)和exception_ptr,而internal::uninitialized_wrapper<T>
类保存value。(一个future就绪时,exception_ptr和value中有且只有一个有效)。
set_value
现在让我们看看一个promise如何resolve一个future,让它成为就绪态并传递相关的值(或异常)。
在sleep(std::chrono::duration<Rep, Period> dur)
的例子中,定时器到期后会调用promise对象的set_value()方法:
sleeper(std::chrono::duration<Rep, Period> dur): tmr([this] { done.set_value(); }) // 这里注册定时器回调{tmr.arm(dur); // 开始定时}
template <typename... A>
void set_value(A&&... a) noexcept {if (auto *s = get_state()) {s->set(std::forward<A>(a)...);make_ready<urgent::no>();}
}
在set_value()
函数中get_state()
会返回promise._state
(目前指向future对象的_state字段),调用future_state::set()方法会原地构造一个已经就绪的future_state:
template <typename... A>
void set(A&&... a) noexcept {assert(_u.st == state::future);new (this) future_state(ready_future_marker(), std::forward<A>(a)...);
}
此时future变为就绪态,但可能需要调度then任务(见后文分析):
template <promise_base::urgent Urgent>
void promise_base::make_ready() noexcept {if (_task) {if (Urgent == urgent::yes) {::seastar::schedule_urgent(std::exchange(_task, nullptr));} else {::seastar::schedule(std::exchange(_task, nullptr));}}
}
void schedule(task* t) noexcept {engine().add_task(t);
}
then and schedule
仅仅一个future变为就绪态不会对用户产生什么影响,而如果用户用then指明了就绪后的任务,那么这个任务将会在就绪后被运行。
then方法可以追溯到call_then_impl::run()
->future::then_impl()
->future::then_impl_nrvo()
:
template <typename Func, typename Result>Result then_impl_nrvo(Func&& func) noexcept { using futurator = futurize<internal::future_result_t<Func, T SEASTAR_ELLIPSIS>>; typename futurator::type fut(future_for_get_promise_marker{}); using pr_type = decltype(fut.get_promise()); schedule(fut.get_promise(), std::move(func), [](pr_type&& pr, Func& func, future_state&& state) { if (state.failed()) { pr.set_exception(static_cast<future_state_base&&>(std::move(state))); } else { futurator::satisfy_with_result_of(std::move(pr), [&func, &state] { // clang thinks that "state" is not used, below, for future<>. // Make it think it is used to avoid an unused-lambda-capture warning. (void)state; return internal::future_invoke(func, std::move(state).get_value()); }); } }); return fut;}
在then_impl_nrvo()
中首先构造了一个空的future对象fut,然后调用get_promise()在其上绑定构造了一个新的promise对象(promise._state
指向future._state
),然后调用future::schedule()并返回fut对象。
template <typename Pr, typename Func, typename Wrapper>void schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept { // If this new throws a std::bad_alloc there is nothing that // can be done about it. The corresponding future is not ready // and we cannot break the chain. Since this function is // noexcept, it will call std::terminate if new throws. memory::scoped_critical_alloc_section _; auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS> (std::move(pr), std::move(func), std::move(wrapper)); // In a debug build we schedule ready futures, but not in // other build modes. #ifdef SEASTAR_DEBUG if (_state.available()) { tws->set_state(std::move(_state)); ::seastar::schedule(tws); return; } #endif schedule(tws); _state._u.st = future_state_base::state::invalid;}void schedule(continuation_base<T SEASTAR_ELLIPSIS>* tws) noexcept { future_base::schedule(tws, &tws->_state);}
**future::schedule()**接受此fut对象对应的pr promise,func函数体和一个就绪时回调函数wrapper并把它们打包成一个continuation,然后判断_state是否已就绪:
- 如果已就绪,那么立即调用seastar::schedule将此continuation加入工作队列。
- (大多数情况)如果未就绪,那么调用future_base::schedule(),设置future_state为invalid态后返回。
我们现在考察future_base::schedule()做了什么:
void schedule(task* tws, future_state_base* state) noexcept { promise_base* p = detach_promise(); p->_state = state; p->_task = tws;}
可以看到它解除了promise与当前future对象的绑定,然后为此promise分配了future_state和task(即刚创建的continuation)。
我们再回顾上面的promise_base::make_ready():
template <promise_base::urgent Urgent>void promise_base::make_ready() noexcept { if (_task) { if (Urgent == urgent::yes) { ::seastar::schedule_urgent(std::exchange(_task, nullptr)); } else { ::seastar::schedule(std::exchange(_task, nullptr)); } }}void schedule(task* t) noexcept { engine().add_task(t);}
那么整个then的分配过程就很明显了:
- 创建一个新的future对象和一个新的promise对象。
- 设置与当前future对象所绑定的promise的_task成员为包含实际任务的continuation。
- 当与当前future对象所绑定的promise对象调用set_value()时,_task成员被加入reactor的工作队列中。
- 新的future对象被返回,后续的then()方法将在其上被调用。
但是还有一个问题,既然调用then后返回的是一个新的future对象,那么如何保证then的按顺序联系调用呢?即如何建立新老promise以及task之间的联系呢?
continuation
continuation有以下数据成员:
// 继承自continuation_basescheduling_group _sg;future_state _state;Promise _pr;Func _func;Wrapper _wrapper;
其构造函数会做以下事情:
- 移动构造保存传入的pr,func,wrapper。
- 默认初始化其他成员。
接着上文分析,既然continuation已经作为一个task被赋给了原promise的_task
成员,当原promise调用set_value时会将此_task
添加到reactor的工作队列中,而在reactor的事件循环中,会调用task::run_and_dispose()来执行一个任务:
void reactor::run_tasks(task_queue& tq) { // Make sure new tasks will inherit our scheduling group *internal::current_scheduling_group_ptr() = scheduling_group(tq._id); auto& tasks = tq._q; while (!tasks.empty()) { auto tsk = tasks.front(); tasks.pop_front(); STAP_PROBE(seastar, reactor_run_tasks_single_start); task_histogram_add_task(*tsk); _current_task = tsk; tsk->run_and_dispose(); // ... }}
此处的task是一个continuation对象,那么对应的run_and_dispose()
方法就是continuation::run_and_dispose():
virtual void run_and_dispose() noexcept override { try { _wrapper(std::move(this->_pr), _func, std::move(this->_state)); } catch (...) { this->_pr.set_to_current_exception(); } delete this;}
而_wrapper函数的内容如前所见:
[](pr_type&& pr, Func& func, future_state&& state) { if (state.failed()) { pr.set_exception(static_cast<future_state_base&&>(std::move(state))); } else { futurator::satisfy_with_result_of(std::move(pr), [&func, &state] { // clang thinks that "state" is not used, below, for future<>. // Make it think it is used to avoid an unused-lambda-capture warning. (void)state; return internal::future_invoke(func, std::move(state).get_value()); }); }}template<typename T>template<typename Func>SEASTAR_CONCEPT( requires std::invocable<Func> )void futurize<T>::satisfy_with_result_of(promise_base_with_type&& pr, Func&& func) { using ret_t = decltype(func()); if constexpr (std::is_void_v<ret_t>) { func(); pr.set_value(); } else if constexpr (is_future<ret_t>::value) { func().forward_to(std::move(pr)); } else { pr.set_value(func()); }}
所以_wrapper函数总会调用promise::set_value()或者promise::set_exception(),来resolve promise对应的future。
所以整个future链式异步调用的实现就明了了:
初始的future经一个promise对象分配而来,promise对象的所有者要在合适的时机调用set_value()(或set_exception())。
对future调用then会创建一个新的future对象和一个对应的新的promise对象,新的promise对象被一个continuation对象包装,而原来的promise._task指向该continuaiton对象,而continuaiton的run_and_dispose()方法会调用helper函数——以执行任务并调用新的promise对象的set_value()(或set_exception())方法。
注意then返回的是新的future对象,因为future对象的作用就是链接新的then任务;可以这么理解,一个promise对应一个实际的任务,而每次then都返回当前链上的最后一个promise对应的future,以实现尾部追加任务,保证顺序性。
所以当原promise的set_value()(或set_exception())方法被调用时,continuaiton的run_and_dispose()被加入工作队列,(过一会)reacotr事件循环调用run_and_dispose()方法时新的promise的set_value()(或set_exception())方法被调用。
还要注意一种情况,那就是如果then中函数体也返回一个future,那么如何令后续的then在此future就绪后才加入工作队列呢?这就是satisfy_with_result_of所做的工作,它首先使用if constexpr识别then中函数体的返回值是不是future,不是则直接执行并调用下一个执行链上的任务;如果返回值是一个future,那么将使用forward_to方法():
/// \brief Satisfy some \ref promise object with this future as a result.// Arranges so that when this future is resolve, it will be used to/// satisfy an unrelated promise. This is similar to scheduling a/// continuation that moves the result of this future into the promise/// (using promise::set_value() or promise::set_exception(), except/// that it is more efficient.// \param pr a promise that will be fulfilled with the results of this/// future.void forward_to(promise<T SEASTAR_ELLIPSIS>&& pr) noexcept { if (_state.available()) { pr.set_urgent_state(std::move(_state)); } else if (&pr._local_state != pr._state) { // The only case when _state points to _local_state is // when get_future was never called. Given that pr will // soon be destroyed, we know get_future will never be // called and we can just ignore this request. *detach_promise() = std::move(pr); }}
forward_to()方法的原理很简单,就是将第一个then中返回的新新future的promise替换成上文提到的新promise,当(已经消失的)新新promise的提供方调用set_value()方法时,实际会调用新promise的set_value()方法,那么将再次重复上述过程。
如注释所说,*detach_promise() = std::move(pr)
等价于创建一个新的continuation(该continuation会将前一个promise的值传递给下一个promise)并在当前promise就绪时启动它。
至于在promise和工作队列之外如何实现异步调用set_value()(或set_exception())方法,那就是另一个问题了。
Seastar源码阅读(三)future相关推荐
- 24 UsageEnvironment使用环境抽象基类——Live555源码阅读(三)UsageEnvironment
24 UsageEnvironment使用环境抽象基类--Live555源码阅读(三)UsageEnvironment 24 UsageEnvironment使用环境抽象基类--Live555源码阅读 ...
- mybatis源码阅读(三):mybatis初始化(下)mapper解析
转载自 mybatis源码阅读(三):mybatis初始化(下)mapper解析 MyBatis 的真正强大在于它的映射语句,也是它的魔力所在.由于它的异常强大,映射器的 XML 文件就显得相对简单. ...
- SDWebImage源码阅读(三)UIImage+GIF
UIImage+GIF 是UIImage 类的一个GIF 分类,在之前的版本里面这个分类是用了处理GIF 动态图片的但是会有内存暴增的bug.在当前 '4.0.0-beta2' 的版本里GIF 动态图 ...
- Struts2源码阅读(三)_DispatcherConfigurationProvider
首先强调一下struts2的线程程安全,在Struts2中大量采用ThreadLocal线程局部变量的方法来保证线程的安全,像Dispatcher等都是通过ThreadLocal来保存变量值,使得每个 ...
- SpringMVC源码阅读(三)
先理一下Bean的初始化路线 org.springframework.beans.factory.support.AbstractBeanDefinitionReader public int loa ...
- redis源码阅读-持久化之RDB
持久化介绍: redis的持久化有两种方式: rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot) aof : 记录redis执行的所有写操作命令 根 ...
- redis源码阅读-zset
前段时间给小伙伴分享redis,顺带又把redis撸了一遍了,对其源码,又有了比较深入的了解.(ps: 分享的文章再丰富下再放出来). 数据结构 我们先看下redis 5.0的代码.本次讲解主要是zs ...
- redis源码阅读-持久化之aof与aof重写详解
aof相关配置 aof-rewrite-incremental-fsync yes # aof 开关,默认是关闭的,改为yes表示开启 appendonly no # aof的文件名,默认 appen ...
- mysql 1260,MYSQL 源码阅读 六
前期节要 MYSQL源码阅读 一 MYSQL源码阅读 二 MYSQL源码阅读 三 MYSQL 源码阅读 四 MYSQL 源码阅读 五 上次有两个问题没搞明白 1 是 为什么一定要开启调试线程 ? 因为 ...
最新文章
- 从红旗5.0提及——看Linux的内存办理
- wordpress 分类使用不同的模版
- C++ 类访问控制(public/protected/private)小结
- socket编程中的异常处理
- Nginx的nginx.conf配置文件中文注释说明
- Jeecg平台扩展性不好的地方收集启动。
- ffmbc——广播电视以及专业用途量身定制的FFmpeg
- 四年级打字计算机上册教案,2019四年级上信息技术教案(A)打字速度靠指法_泰山版教育.doc.docx...
- Spyder远程连接矩池云
- Linux文件类型及颜色标识整理
- 医院职工离职申请证明模板,共计10篇
- 新课改计算机论文,新课改中职计算机职业教育论文
- jQuery ajaxSubmit 自动重复提交表单问题解决
- 2020 智能零售领域最具商业合作价值企业盘点
- 通达OA2017版 手机签章会撑大表格的处理
- openjtag openocd libftd2xx
- C++华氏温度和摄氏温度的转换
- 408复习策略(强化阶段)
- Springboot毕设项目电商系统设计与实现t32la(java+VUE+Mybatis+Maven+Mysql)
- 联想一体机用u盘装linux教程,联想一体机如何用u盘装系统
热门文章
- 数字图像处理matlab实践
- C++ Lambda 表达式
- python获取gps数据_Python GPS模块:读取最新的GPS数据
- 射频识别RFID:七个问题 让你看透RFID
- vba报错:不能设置类worksheet的visible属性
- 火狐浏览器webdriver下载
- 简单的解释下什么是CNAME?
- 魔众大转盘抽奖系统PHP源码
- java.io.FileNotFoundException: class path resource [applicationContext.xml] cannot be opened becaus
- oracle中的冲销日记账,OraEBSR12GL日记账业务操作09:日记账冲销处理