seastar介绍及源码分析
目录
1.seastar介绍
1.1.机制简介
1.2.使用方法简介
1.2.1 app_template::run
1.2.2 future promise continuation
2.源码分析
2.1.初始化过程
2.1.1.入口
2.1.2.run_deprecated
2.1.3.smp::configure
2.1.4.reactor::run
2.1.5.run_some_tasks
2.1.6. poll_once
2.2.future
2.2.1then
2.2.2then_impl
2.2.3. then_impl_nrvo
2.2.4. future::schedule
2.2.5. seastar::schedule
2.2.6. reactor::add_task
2.3. IO流程
2.3.1. write_dma
2.3.2. io_queue_submission_pollfn::poll
2.3.3. kernel_submit_work_pollfn::poll
2.3.4. reap_kernel_completions_pollfn::poll
1.seastar介绍
1.1.机制简介
在代码运行过程中,如果遇到io、网络这些阻塞的操作时,是不会占用cpu资源的,此时如果有其他任务等待调度,会切换到其他线程进行运行;而如果没有其他任务在等待调度,就会造成cpu空闲,cpu资源被浪费。
考虑一种场景:接收一些数据,进行一些处理,再写入到硬盘,写入完成后,再进行一些写入的结果。
为了避免io阻塞导致的cpu空闲,我们可以:
- 假如我们是一个单线程的设计,那么我们进行io时就需要使用异步io,将io请求下发后先进行后续的操作而不是等待io完成,后续寻找一个合适的时机(通常就是定期检查)检查io是否完成,如果完成再进行io完成后的后续处理。
- 假如我们是一个多线程的设计,或者io的完成是通过中断通知的,那么我们可以有两个线程,一个线程执行预处理、下发异步io请求,另一个线程等待io完成、进行io完成后的处理。多线程的设计需要考虑线程同步问题,或有锁、cache bouncing(就是cpu的不同core直接进行cache同步)、memory fences等额外开销。而且多线程设计通常比较复杂,需要仔细斟酌锁的设计、使用范围等,在保证线程同步的同时,避免造成死锁、锁范围过大导致接口卡顿等问题。
seastar是一个异步编程框架,大致意思就是使用了前一种设计,且可以比较简便的进行编程,io完成不需要自己编写代码进行判断,而是将后续需要进行的操作作为func参数传给seastar的io请求即可。
当然,我们的代码不可能是单线程运行的,否则就只能使用一个core了,seastar解决这个问题的方法是将内存进行拆分,每个core上运行一个线程,并分配到一部分内存,各个core之间的内存不能共享,就是说一个core不能直接访问到另一个core的内存。而线程间的通讯通过共享内存完成,这个通讯是seastar自己完成的,不需要使用者关心。
1.2.简单使用方法
这个不详细讲了,参考seastar的turorial,在网站seastar.io可以找到
1.2.1 app_template::run
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <iostream>int main(int argc, char** argv) {seastar::app_template app;app.run(argc, argv, [] {std::cout << "Hello world\n";return seastar::make_ready_future<>();});
}
最简单的程序,使用app.run进入seastar框架运行
1.2.2 future
#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
#include <iostream>int main(int argc, char** argv) {seastar::app_template app;app.run(argc, argv, [] {std::cout << "Sleeping... " << std::flush;using namespace std::chrono_literals;return seastar::sleep(1s).then([] {std::cout << "Done.\n";});});
}
seastar::sleep函数返回一个future,sleep到1s之后,会执行then中的内容
2.源码分析
seastar中设计的主要机制有:
a) 资源管理
线程:每个core上一个线程。
内存:每个线程使用一部分内存,不能直接访问其他线程的内存。
io:每个线程管理一部分io,例如有4个设备需要进行io,且有4个线程,那么可能就是一个线程负责一个设备的io。
b) future promise continuation
future是一个模板类,可以指定类型,future就是对这个类型进行了一下封装,增加了可异步操作的特性。可以对future设置/获取这个类型的值。
future绑定一个promise,调用promise的set_value时,future会变成ready状态。
future没有ready时,调用future的wait方法,线程会阻塞到ready状态为止;调用get方法,同样会阻塞住,等到ready之后会返回future模板类定义时指定类型的值。
future有一个then方法,其参数是一个可执行的func(例如lambda表达式),等到调用相应promise的set_value方法之后,会将func封装到continuation类中加入到任务队列中,等待seastar调度。then返回的也是一个future,这个future会在func执行完成之后变成ready状态。可以链式调用,例如:handle().then(func1).then(func2),handle()返回的是一个future,该future变为ready状态之后将会顺序执行func1、func2。
Ps.
seatar的future、promise是自己设计的,不是直接使用的C++自带的,但是概念还是有些类似的。
c) 主体循环
seastar初始化完成之后,将会进入主体循环,主要执行:执行任务队列中的任务;执行初始化时注册的所有poller的poll操作,不同poller的操作不同。
如果没有需要运行的任务,循环将暂时进入sleep。
d) poller
处理网络、IO等,下发异步请求后,调用poller的poll函数时,会查询是否有请求已经完成,有完成的请求时,就会执行后续任务。
举例io请求的poller:
io请求加入到my_io_queue队列当中,执行io_queue_submission_poller的poll函数时,会批量处理队列中的请求,并加入到_pending_io队列中
再通过kernel_submit_work_pollfn的poll函数下发aio请求到内核当中
再通过reap_kernel_completions_pollfn的poll函数调用io_getevents,查询是否有异步io请求已经完成
上图中已经画出大致的流程了,下面再贴一些代码具体看看。
2.1.初始化过程
2.1.1.入口
int
app_template::run(int ac, char ** av, std::function<future<int> ()>&& func) {return run_deprecated(ac, av, [func = std::move(func)] () mutable {auto func_done = make_lw_shared<promise<>>();engine().at_exit([func_done] { return func_done->get_future(); });// No need to wait for this future.// func's returned exit_code is communicated via engine().exit()(void)futurize_invoke(func).finally([func_done] {func_done->set_value();}).then([] (int exit_code) {return engine().exit(exit_code);}).or_terminate();});
}
参数说明:
ac和av是传入的相关配置,例如可以配置内存上限、使用的cpu核数。
func是需要运行的函数,通常是使用lambda表达式。
在主程序中调用app_template::run函数,传入需要运行的代码(func参数,可以使用lambda表达式),seastar框架在进行一些初始化操作时候,就会开始运行func。
2.1.2.run_deprecated
run函数后续调用的是run_deprecated
int app_template::run_deprecated(int ac, char ** av, std::function<void ()>&& func);
参数说明:
参数中ac和av与run函数的一样,func是run函数中定义的lambda表达式,其中调用了futurize_invoke(func),这个是主要内容,lambda表单式中的其他内容我们暂且忽略。
run_deprecated中,调用了smp::configure函数,这是一个主要内容,启动了除了cpu0以外的每个cpu core上的线程。
try {smp::configure(configuration, reactor_config_from_app_config(_cfg));} catch (...) {std::cerr << "Could not initialize seastar: " << std::current_exception() << std::endl;return 1;}
而需要运行的参数func,则通过future类的then方法挂到了任务队列当中。这里使用的future是_start_promise.get_future(),也就是对_start_promise设置值(set_value)之后,参数func才会开始运行,而对_start_promise的set_value是在engine().run()中进行的。
我们再整理一下:
run_deprecated函数接收了需要运行的任务func,将其挂到engine的任务队列当中,而任务队列当中的任务的运行条件有:1.调度到该任务;2.该任务的前置条件满足(一般就是future对应的promise值被设置)。
条件1在运行engine().run()中得到满足,该函数中有个循环,会获取任务队列中的任务进行处理。
条件2也在运行engine().run()中得到满足,在进行一些初始化之后,会对任务func前置的future设置值,设置完之后,调度到该任务时就会开始运行。
int
app_template::run_deprecated(int ac, char ** av, std::function<void ()>&& func) {
#ifdef SEASTAR_DEBUGfmt::print("WARNING: debug mode. Not for benchmarking or production\n");
#endifbpo::variables_map configuration;try {bpo::store(bpo::command_line_parser(ac, av).options(_opts).positional(_pos_opts).run(), configuration);_conf_reader(configuration);} catch (bpo::error& e) {fmt::print("error: {}\n\nTry --help.\n", e.what());return 2;}if (configuration.count("help")) {if (!_cfg.description.empty()) {std::cout << _cfg.description << "\n";}std::cout << _opts << "\n";return 1;}if (configuration["help-loggers"].as<bool>()) {log_cli::print_available_loggers(std::cout);return 1;}bpo::notify(configuration);// Needs to be before `smp::configure()`.try {apply_logging_settings(log_cli::extract_settings(configuration));} catch (const std::runtime_error& exn) {std::cout << "logging configuration error: " << exn.what() << '\n';return 1;}configuration.emplace("argv0", boost::program_options::variable_value(std::string(av[0]), false));try {smp::configure(configuration, reactor_config_from_app_config(_cfg));} catch (...) {std::cerr << "Could not initialize seastar: " << std::current_exception() << std::endl;return 1;}_configuration = {std::move(configuration)};// No need to wait for this future.// func is waited on via engine().run()(void)engine().when_started().then([this] {return seastar::metrics::configure(this->configuration()).then([this] {// set scollectd use the metrics configuration, so the later// need to be set firstscollectd::configure( this->configuration());});}).then(std::move(func)).then_wrapped([] (auto&& f) {try {f.get();} catch (std::exception& ex) {std::cout << "program failed with uncaught exception: " << ex.what() << "\n";engine().exit(1);}});auto exit_code = engine().run();smp::cleanup();return exit_code;
}
2.1.3.smp::configure
void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg);
这个函数比较长,我就不贴代码了,讲一下它的主要内容。
(1)将当前线程绑定到cpu0
(2)初始化内存管理,每个cpu core分别使用不同的内存,互不干扰
(3)为除cpu0之外的所有cpu core(也可以配置成只使用部分cpu core)起一个线程,线程中分配一块内存给reactor类的对象,每个线程都有这个对象,然后再调用reactor::run方法开始运行主体内容。engine()获取到的就是reactor对象,每个线程都有一个独立的对象。
因内容较多,部分内容还不清楚具体作用,需要更详细了解的还需要阅读一下源码。
2.1.4.reactor::run
这是seastar框架的主体函数,初始化完之后,就一直在这个函数中运行了。
(1)poller
首先,会定义一些poller,下面是其中一部分:
poller smp_poller(std::make_unique<smp_pollfn>(*this));poller reap_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));poller io_queue_submission_poller(std::make_unique<io_queue_submission_pollfn>(*this));poller kernel_submit_work_poller(std::make_unique<kernel_submit_work_pollfn>(*this));poller final_real_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
smp_poller处理其他cpu core的event,io_queue_submission_poller处理io任务,其他poller没有每个具体看,因为我对io比较熟悉,所以后面以io_queue_submission_poller为例介绍一下poller的机制。
(2)主体循环
poller等初始化完成后,就进入主体循环运行了,循环中主要是:
- 调用run_some_tasks
- 调用check_for_work,check_for_work中调用了poll_once,处理poller
- 空闲时进入sleep
2.1.5.run_some_tasks
从_active_task_queues队列中取出任务运行,直到_active_task_queues变为空为止。
另外,会调用insert_activating_task_queues将activating队列中的内容插入到active队列中。
reactor::run_some_tasks() {if (!have_more_tasks()) {return;}sched_print("run_some_tasks: start");reset_preemption_monitor();sched_clock::time_point t_run_completed = std::chrono::steady_clock::now();STAP_PROBE(seastar, reactor_run_tasks_start);_cpu_stall_detector->start_task_run(t_run_completed);do {auto t_run_started = t_run_completed;insert_activating_task_queues();auto tq = _active_task_queues.front();_active_task_queues.pop_front();sched_print("running tq {} {}", (void*)tq, tq->_name);tq->_current = true;_last_vruntime = std::max(tq->_vruntime, _last_vruntime);run_tasks(*tq);tq->_current = false;t_run_completed = std::chrono::steady_clock::now();auto delta = t_run_completed - t_run_started;account_runtime(*tq, delta);sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",(void*)tq, tq->_name, delta / 1us, tq->_vruntime, tq->_q.empty());if (!tq->_q.empty()) {insert_active_task_queue(tq);} else {tq->_active = false;}} while (have_more_tasks() && !need_preempt());_cpu_stall_detector->end_task_run(t_run_completed);STAP_PROBE(seastar, reactor_run_tasks_end);*internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group runsched_print("run_some_tasks: end");
}
2.1.6. poll_once
调用所有poller的poll方法
bool
reactor::poll_once() {bool work = false;for (auto c : _pollers) {work |= c->poll();}return work;
}
2.2.future
主要讲一下then的流程,相关机制大致都可以看到了。
2.2.1then
template <typename Func, typename Result = futurize_t<std::result_of_t<Func(T&&...)>>>GCC6_CONCEPT( requires ::seastar::CanInvoke<Func, T...> )Resultthen(Func&& func) noexcept {
#ifndef SEASTAR_TYPE_ERASE_MOREreturn then_impl(std::move(func));
#elsereturn then_impl(noncopyable_function<Result (T&&...)>([func = std::forward<Func>(func)] (T&&... args) mutable {return futurize_apply(func, std::forward_as_tuple(std::move(args)...));}));
#endif}
就是直接调用的then_impl,SEASTAR_TYPE_ERASE_MORE这个宏是应该调试使用的,一般是调用的上面这一句。
2.2.2then_impl
template <typename Func, typename Result = futurize_t<std::result_of_t<Func(T&&...)>>>Resultthen_impl(Func&& func) noexcept {using futurator = futurize<std::result_of_t<Func(T&&...)>>;if (available() && !need_preempt()) {if (failed()) {return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));} else {return futurator::apply(std::forward<Func>(func), get_available_state_ref().take_value());}}return then_impl_nrvo<Func, Result>(std::forward<Func>(func));}
两种情况:
1.future已经完成,直接运行
2.future未完成,则挂到任务队列中
available()就是future值已经获取到了,会调用futurator::apply函数直接执行。
need_preempt指有任务需要马上执行,这种情况下则不会离开运行then传入的任务,而是先放到任务队列当中。
then_impl_nrvo就是调用schedule函数将任务加入到任务队列当中。
2.2.3. then_impl_nrvo
// Keep this simple so that Named Return Value Optimization is used.template <typename Func, typename Result>Result then_impl_nrvo(Func&& func) noexcept {using futurator = futurize<std::result_of_t<Func(T&&...)>>;typename futurator::type fut(future_for_get_promise_marker{});// If there is a std::bad_alloc in schedule() there is nothing that can be done about it, we cannot break future// chain by returning ready future while 'this' future is not ready. The noexcept will call std::terminate if// that happens.[&] () noexcept {using pr_type = decltype(fut.get_promise());memory::disable_failure_guard dfg;schedule(fut.get_promise(), [func = std::forward<Func>(func)] (pr_type& pr, future_state<T...>&& state) mutable {if (state.failed()) {pr.set_exception(static_cast<future_state_base&&>(std::move(state)));} else {try {futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {return ::seastar::apply(std::forward<Func>(func), std::move(state).get_value());});} catch (...) {pr.set_to_current_exception();}}});} ();return fut;}
主要就是调用schedule函数将任务加到的队列当中,已经进行了一些异常处理。这里调用的是future::schedule
2.2.4. future::schedule
void schedule(Pr&& pr, Func&& func) noexcept {if (_state.available() || !_promise) {if (__builtin_expect(!_state.available() && !_promise, false)) {_state.set_to_broken_promise();}::seastar::schedule(new continuation<Pr, Func, T...>(std::move(pr), std::move(func), std::move(_state)));} else {assert(_promise);detach_promise()->schedule(std::move(pr), std::move(func));_state._u.st = future_state_base::state::invalid;}}
这里有两种情况:
(1)promise已经完成,或者future没有设置promise,则调用seastar::schedule将任务直接加入到任务队列当中。
(2)还需要等待promise完成,则将任务封装成continuation挂到promise下的_task,等到promise的set_value方法被调用后,再将任务挂到任务队列当中
2.2.5. seastar::schedule
void schedule(task* t) noexcept {engine().add_task(t);
}
schedule函数调用了engine().add_task(t),也就是reactor::add_task方法
这里传入的参数t是task类型,看看参数类型的变化:
传入 | 传到下一层 | |
then_impl | Func&& func | std::forward<Func>(func) |
then_impl_nrvo | Func&& func |
[func = std::forward<Func>(func)] (pr_type& pr, future_state<T...>&& state) mutable { 使用lambda表达式对func进行了一些封装 |
future::schedule | Func&& func |
new continuation<Pr, Func, T...>(std::move(pr), std::move(func), std::move(_state)) 封装成continuation |
seastar::schedule | task* t |
seastar::schedule接受的是task类型,而传入的是continuation,是task的派生类,增加的内容:
(1)Promise _pr; 有一个promise成员
(2)future_state<T...> _state; 有一个future_state成员
(3)重载了run_and_dispose,进行了实现,就是调用了_func,就是需要执行的任务。
2.2.6. reactor::add_task
void add_task(task* t) noexcept {auto sg = t->group();auto* q = _task_queues[sg._id].get();bool was_empty = q->_q.empty();q->_q.push_back(std::move(t));
#ifdef SEASTAR_SHUFFLE_TASK_QUEUEshuffle(q->_q.back(), *q);
#endifif (was_empty) {activate(*q);}}
加到了_task_queues当中。
sg._id是new continuation时赋值的。
2.3. IO流程
时间原因不展开将了,这里就贴一下主要的代码,流程图中都有说明了,要详细了解的话还是需要对着完整源码再看一下。
2.3.1. write_dma
future<size_t>
posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept {auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment);auto req = internal::io_request::make_writev(_fd, pos, iov);return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
}
future<size_t>
reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {++_io_stats.aio_writes;_io_stats.aio_write_bytes += len;return ioq->queue_request(pc, len, std::move(req));
}
future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {auto start = std::chrono::steady_clock::now();return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {_queued_requests++;// First time will hit here, and then we create the class. It is important// that we create the shared pointer in the same shard it will be used at later.auto& pclass = find_or_create_class(pc, owner);pclass.nr_queued++;unsigned weight;size_t size;if (req.is_write()) {weight = _config.disk_req_write_to_read_multiplier;size = _config.disk_bytes_write_to_read_multiplier * len;} else if (req.is_read()) {weight = io_queue::read_request_base_count;size = io_queue::read_request_base_count * len;} else {throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));}auto desc = std::make_unique<io_desc_read_write>(this, weight, size);auto fq_ticket = desc->fq_ticket();auto fut = desc->get_future();_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), desc = desc.release(), len, this] () mutable noexcept {_queued_requests--;_requests_executing++;try {pclass.nr_queued--;pclass.ops++;pclass.bytes += len;pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);engine().submit_io(desc, std::move(req));} catch (...) {desc->set_exception(std::current_exception());}});return fut;});
}
void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func) {// We need to return a future in this function on which the caller can wait.// Since we don't know which queue we will use to execute the next request - if ours or// someone else's, we need a separate promise at this point.push_priority_class(pc);_resources_queued += desc;pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)});_requests_queued++;
}
void
reactor::submit_io(kernel_completion* desc, io_request req) {req.attach_kernel_completion(desc);_pending_io.push_back(std::move(req));
}
2.3.2. io_queue_submission_pollfn::poll
class reactor::io_queue_submission_pollfn final : public reactor::pollfn {reactor& _r;
public:io_queue_submission_pollfn(reactor& r) : _r(r) {}virtual bool poll() final override {return _r.flush_pending_aio();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {// This is a passive poller, so if a previous poll// returned false (idle), there's no more work to do.return true;}virtual void exit_interrupt_mode() override final {}
};
bool
reactor::flush_pending_aio() {for (auto& ioq : my_io_queues) {ioq->poll_io_queue();}return false;
}
// Dispatch requests that are pending in the I/O queuevoid poll_io_queue() {_fq.dispatch_requests();}
void fair_queue::dispatch_requests() {while (can_dispatch()) {priority_class_ptr h;do {h = pop_priority_class();} while (h->_queue.empty());auto req = std::move(h->_queue.front());h->_queue.pop_front();_resources_executing += req.desc;;_resources_queued -= req.desc;_requests_executing++;_requests_queued--;auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);auto req_cost = req.desc.normalize(_maximum_capacity) / h->_shares;auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;float next_accumulated = h->_accumulated + cost;while (std::isinf(next_accumulated)) {normalize_stats();// If we have renormalized, our time base will have changed. This should happen very infrequentlydelta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;next_accumulated = h->_accumulated + cost;}h->_accumulated = next_accumulated;if (!h->_queue.empty()) {push_priority_class(h);}req.func();}
}
req.func中调用engine().submit_io
void
reactor::submit_io(kernel_completion* desc, io_request req) {req.attach_kernel_completion(desc);_pending_io.push_back(std::move(req));
}
2.3.3. kernel_submit_work_pollfn::poll
class reactor::kernel_submit_work_pollfn final : public reactor::pollfn {reactor& _r;
public:kernel_submit_work_pollfn(reactor& r) : _r(r) {}virtual bool poll() override final {return _r._backend->kernel_submit_work();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {return true;}virtual void exit_interrupt_mode() override {// nothing to do}
};
bool reactor_backend_aio::kernel_submit_work() {_hrtimer_poll_completion.maybe_queue(_polling_io);bool did_work = _polling_io.flush();did_work |= _storage_context.submit_work();return did_work;
}
bool
aio_storage_context::submit_work() {size_t pending = _r->_pending_io.size();size_t to_submit = 0;bool did_work = false;_submission_queue.resize(0);while ((pending > to_submit) && _iocb_pool.has_capacity()) {auto& req = _r->_pending_io[to_submit++];auto& io = _iocb_pool.get_one();prepare_iocb(req, io);if (_r->_aio_eventfd) {set_eventfd_notification(io, _r->_aio_eventfd->get_fd());}if (aio_nowait_supported) {set_nowait(io, true);}_submission_queue.push_back(&io);}size_t submitted = 0;while (to_submit > submitted) {auto nr = to_submit - submitted;auto iocbs = _submission_queue.data() + submitted;auto r = io_submit(_io_context, nr, iocbs);size_t nr_consumed;if (r == -1) {nr_consumed = handle_aio_error(iocbs[0], errno);} else {nr_consumed = size_t(r);}submitted += nr_consumed;}_r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + submitted);if (!_pending_aio_retry.empty()) {schedule_retry();did_work = true;}return did_work;
}
int io_submit(aio_context_t io_context, long nr, iocb** iocbs) {return ::syscall(SYS_io_submit, io_context, nr, iocbs);
}
2.3.4. reap_kernel_completions_pollfn::poll
class reactor::reap_kernel_completions_pollfn final : public reactor::pollfn {reactor& _r;
public:reap_kernel_completions_pollfn(reactor& r) : _r(r) {}virtual bool poll() final override {return _r.reap_kernel_completions();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {return _r._backend->kernel_events_can_sleep();}virtual void exit_interrupt_mode() override final {}
};
bool
reactor::reap_kernel_completions() {auto reaped = _backend->reap_kernel_completions();for (auto& ioq : my_io_queues) {ioq->process_completions();}return reaped;
}
bool reactor_backend_aio::reap_kernel_completions() {bool did_work = await_events(0, nullptr);did_work |= _storage_context.reap_completions();return did_work;
}
bool aio_storage_context::reap_completions()
{struct timespec timeout = {0, 0};auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r->_force_io_getevents_syscall);if (n == -1 && errno == EINTR) {n = 0;}assert(n >= 0);unsigned nr_retry = 0;for (size_t i = 0; i < size_t(n); ++i) {auto iocb = get_iocb(_ev_buffer[i]);if (_ev_buffer[i].res == -EAGAIN) {++nr_retry;set_nowait(*iocb, false);_pending_aio_retry.push_back(iocb);continue;}_iocb_pool.put_one(iocb);auto desc = reinterpret_cast<kernel_completion*>(_ev_buffer[i].data);desc->complete_with(_ev_buffer[i].res);}return n;
}
seastar介绍及源码分析相关推荐
- Stable Diffusion 原理介绍与源码分析(一)
Stable Diffusion 原理介绍与源码分析(一) 文章目录 Stable Diffusion 原理介绍与源码分析(一) 前言(与正文无关,可以忽略) 总览 说明 Stable Diffusi ...
- GAT 算法原理介绍与源码分析
GAT 算法原理介绍与源码分析 文章目录 GAT 算法原理介绍与源码分析 零. 前言 (与正文无关, 请忽略) 广而告之 一. 文章信息 二. 核心观点 三. 核心观点解读 四. 源码分析 4.1 G ...
- ThreadLocal介绍以及源码分析
ThreadLocal 线程主变量 前面部分引用其他优秀博客,后面源码自己分析的,如有冒犯请私聊我. 用Java语言开发的同学对 ThreadLocal 应该都不会陌生,这个类的使用场景很多,特别是在 ...
- Spring_AOP架构介绍与源码分析(含事务深度分析)
请见链接:http://edu.51cto.com/course/16573.html?source=so 第1章课程介绍6分钟1节 1-1课程介绍[免费试看]06:11 第2章AOP深入分析52分钟 ...
- Redis 的 Sentinel哨兵介绍与源码分析(1):初始化部分
http://www.redis.cn/topics/sentinel.html redis-6.0.8 本文是在官方中文文档的基础上进行的源码分析,其中包含完整的原文,并在此基础上,添加源码介绍. ...
- ArrayList相关方法介绍及源码分析
目录 ArrayList简介: ArrayList 相关方法介绍 代码表示 相关方法源码分析 ArrayList简介: java.util.ArrayList 是我们最常用的一个类,ArrayList ...
- SDIO_WiFi驱动学习之SDIO架构介绍及源码分析
一.引言 因为WiFi驱动比较复杂,所以WiFi驱动的博客将多分几篇来写. 本篇博客主要介绍Linux下的SDIO架构及源码分析. 本文部分内容摘抄自网络,若有侵权,请联系删除. 二.SDIO WiF ...
- python Django之 DRF(一)框架介绍、源码分析
文章目录 一.django rest framework 框架的介绍 1.什么是RESTful规范? 2.RESTful API的介绍 二.drf框架源码解读 1.drf框架的使用 2.APIView ...
- python logistic回归_logistic回归介绍与源码分析
1. 介绍(由线性模型引出logistic回归) 首先介绍一下什么是线性模型呢? 线性模型的定义如下:给定 个属性描述的样本 , 代表样本在第 个属性上的取值. 线性模型的目的是学习一个函数,它可以通 ...
最新文章
- 33.搜索旋转排序数组
- LIVE 预告 | 佐治亚理工杨笛一:少数据,多框架的自然语言处理
- 关于python的一些好的书籍推荐-如果只能推荐3本关于python的书,你会推荐哪3本?...
- UIButton-内边距
- 武汉理工大学c语言实验 编程解决鸡兔同笼问题,C语言程序设计实验指导
- IT项目管理总结:第四章 项目综合管理
- kicad最小布线宽度默认是多少_你想知道建仓库时叉车通道宽度留多少吗?
- Java Instanceof
- layui文档,镜像站
- 大学本科基于html5毕业设计题目50例
- linux-巴斯勒相机 GigE通信配置步骤
- 微信小程序父子组件传值
- Drupal7导入语言包
- 最齐全的文化石 艺术石VRay材质球素材,速来收藏
- 一个简单的例子来理解监督学习和非监督学习及其区别
- 中断系统的相关知识(二)(可位寻址、不可位寻址)
- 再生龙盘对盘拷贝Linux
- 每日一诗词 —— 忆秦娥·娄山关
- Pycharm远程访问ssh,远程访问服务器(xshell访问服务器)
- hadoop最新官网如何下载之前版本(2.7.1)
热门文章
- 快速找到python第三方库
- wkhtmltopdf 字体无效问题
- python tkinter数据库通讯录_python连接Mysql数据库写的小电话本
- php lottery,lottery.php · 韩志洋/CUPT2019_official_website - Gitee.com
- cuda无法使用nvprof命令,找不到cupti64_2021_3.0.dll
- 22.9.24 比赛
- 芯片破壁者(十六):德州仪器的“罗生门”
- oracle+union+连接,Oracle中union/union all/Intersect/Minus用法
- SeaJS 是什么?
- 【金融量化分析】#Financial Computation(利率、债券、期权相关数理知识与代码实现)