目录

1.seastar介绍

1.1.机制简介

1.2.使用方法简介

1.2.1 app_template::run

1.2.2 future promise continuation

2.源码分析

2.1.初始化过程

2.1.1.入口

2.1.2.run_deprecated

2.1.3.smp::configure

2.1.4.reactor::run

2.1.5.run_some_tasks

2.1.6. poll_once

2.2.future

2.2.1then

2.2.2then_impl

2.2.3. then_impl_nrvo

2.2.4. future::schedule

2.2.5. seastar::schedule

2.2.6. reactor::add_task

2.3. IO流程

2.3.1. write_dma

2.3.2. io_queue_submission_pollfn::poll

2.3.3. kernel_submit_work_pollfn::poll

2.3.4. reap_kernel_completions_pollfn::poll


1.seastar介绍

1.1.机制简介

在代码运行过程中,如果遇到io、网络这些阻塞的操作时,是不会占用cpu资源的,此时如果有其他任务等待调度,会切换到其他线程进行运行;而如果没有其他任务在等待调度,就会造成cpu空闲,cpu资源被浪费。

考虑一种场景:接收一些数据,进行一些处理,再写入到硬盘,写入完成后,再进行一些写入的结果。

为了避免io阻塞导致的cpu空闲,我们可以:

  • 假如我们是一个单线程的设计,那么我们进行io时就需要使用异步io,将io请求下发后先进行后续的操作而不是等待io完成,后续寻找一个合适的时机(通常就是定期检查)检查io是否完成,如果完成再进行io完成后的后续处理。
  • 假如我们是一个多线程的设计,或者io的完成是通过中断通知的,那么我们可以有两个线程,一个线程执行预处理、下发异步io请求,另一个线程等待io完成、进行io完成后的处理。多线程的设计需要考虑线程同步问题,或有锁、cache bouncing(就是cpu的不同core直接进行cache同步)、memory fences等额外开销。而且多线程设计通常比较复杂,需要仔细斟酌锁的设计、使用范围等,在保证线程同步的同时,避免造成死锁、锁范围过大导致接口卡顿等问题。

seastar是一个异步编程框架,大致意思就是使用了前一种设计,且可以比较简便的进行编程,io完成不需要自己编写代码进行判断,而是将后续需要进行的操作作为func参数传给seastar的io请求即可。

当然,我们的代码不可能是单线程运行的,否则就只能使用一个core了,seastar解决这个问题的方法是将内存进行拆分,每个core上运行一个线程,并分配到一部分内存,各个core之间的内存不能共享,就是说一个core不能直接访问到另一个core的内存。而线程间的通讯通过共享内存完成,这个通讯是seastar自己完成的,不需要使用者关心。

1.2.简单使用方法

这个不详细讲了,参考seastar的turorial,在网站seastar.io可以找到

1.2.1 app_template::run

#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <iostream>int main(int argc, char** argv) {seastar::app_template app;app.run(argc, argv, [] {std::cout << "Hello world\n";return seastar::make_ready_future<>();});
}

最简单的程序,使用app.run进入seastar框架运行

1.2.2 future

#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
#include <iostream>int main(int argc, char** argv) {seastar::app_template app;app.run(argc, argv, [] {std::cout << "Sleeping... " << std::flush;using namespace std::chrono_literals;return seastar::sleep(1s).then([] {std::cout << "Done.\n";});});
}

seastar::sleep函数返回一个future,sleep到1s之后,会执行then中的内容

2.源码分析

seastar中设计的主要机制有:

a) 资源管理

线程:每个core上一个线程。

内存:每个线程使用一部分内存,不能直接访问其他线程的内存。

io:每个线程管理一部分io,例如有4个设备需要进行io,且有4个线程,那么可能就是一个线程负责一个设备的io。

b) future promise continuation

future是一个模板类,可以指定类型,future就是对这个类型进行了一下封装,增加了可异步操作的特性。可以对future设置/获取这个类型的值。

future绑定一个promise,调用promise的set_value时,future会变成ready状态。

future没有ready时,调用future的wait方法,线程会阻塞到ready状态为止;调用get方法,同样会阻塞住,等到ready之后会返回future模板类定义时指定类型的值。

future有一个then方法,其参数是一个可执行的func(例如lambda表达式),等到调用相应promise的set_value方法之后,会将func封装到continuation类中加入到任务队列中,等待seastar调度。then返回的也是一个future,这个future会在func执行完成之后变成ready状态。可以链式调用,例如:handle().then(func1).then(func2),handle()返回的是一个future,该future变为ready状态之后将会顺序执行func1、func2。

Ps.

seatar的future、promise是自己设计的,不是直接使用的C++自带的,但是概念还是有些类似的。

c) 主体循环

seastar初始化完成之后,将会进入主体循环,主要执行:执行任务队列中的任务;执行初始化时注册的所有poller的poll操作,不同poller的操作不同。

如果没有需要运行的任务,循环将暂时进入sleep。

d) poller
处理网络、IO等,下发异步请求后,调用poller的poll函数时,会查询是否有请求已经完成,有完成的请求时,就会执行后续任务。

举例io请求的poller:

io请求加入到my_io_queue队列当中,执行io_queue_submission_poller的poll函数时,会批量处理队列中的请求,并加入到_pending_io队列中

再通过kernel_submit_work_pollfn的poll函数下发aio请求到内核当中

再通过reap_kernel_completions_pollfn的poll函数调用io_getevents,查询是否有异步io请求已经完成

上图中已经画出大致的流程了,下面再贴一些代码具体看看。

2.1.初始化过程

2.1.1.入口

int
app_template::run(int ac, char ** av, std::function<future<int> ()>&& func) {return run_deprecated(ac, av, [func = std::move(func)] () mutable {auto func_done = make_lw_shared<promise<>>();engine().at_exit([func_done] { return func_done->get_future(); });// No need to wait for this future.// func's returned exit_code is communicated via engine().exit()(void)futurize_invoke(func).finally([func_done] {func_done->set_value();}).then([] (int exit_code) {return engine().exit(exit_code);}).or_terminate();});
}

参数说明:

ac和av是传入的相关配置,例如可以配置内存上限、使用的cpu核数。

func是需要运行的函数,通常是使用lambda表达式。

在主程序中调用app_template::run函数,传入需要运行的代码(func参数,可以使用lambda表达式),seastar框架在进行一些初始化操作时候,就会开始运行func。

2.1.2.run_deprecated

run函数后续调用的是run_deprecated

int app_template::run_deprecated(int ac, char ** av, std::function<void ()>&& func);

参数说明:

参数中ac和av与run函数的一样,func是run函数中定义的lambda表达式,其中调用了futurize_invoke(func),这个是主要内容,lambda表单式中的其他内容我们暂且忽略。

run_deprecated中,调用了smp::configure函数,这是一个主要内容,启动了除了cpu0以外的每个cpu core上的线程。

    try {smp::configure(configuration, reactor_config_from_app_config(_cfg));} catch (...) {std::cerr << "Could not initialize seastar: " << std::current_exception() << std::endl;return 1;}

而需要运行的参数func,则通过future类的then方法挂到了任务队列当中。这里使用的future是_start_promise.get_future(),也就是对_start_promise设置值(set_value)之后,参数func才会开始运行,而对_start_promise的set_value是在engine().run()中进行的。

我们再整理一下:

run_deprecated函数接收了需要运行的任务func,将其挂到engine的任务队列当中,而任务队列当中的任务的运行条件有:1.调度到该任务;2.该任务的前置条件满足(一般就是future对应的promise值被设置)。

条件1在运行engine().run()中得到满足,该函数中有个循环,会获取任务队列中的任务进行处理。

条件2也在运行engine().run()中得到满足,在进行一些初始化之后,会对任务func前置的future设置值,设置完之后,调度到该任务时就会开始运行。

int
app_template::run_deprecated(int ac, char ** av, std::function<void ()>&& func) {
#ifdef SEASTAR_DEBUGfmt::print("WARNING: debug mode. Not for benchmarking or production\n");
#endifbpo::variables_map configuration;try {bpo::store(bpo::command_line_parser(ac, av).options(_opts).positional(_pos_opts).run(), configuration);_conf_reader(configuration);} catch (bpo::error& e) {fmt::print("error: {}\n\nTry --help.\n", e.what());return 2;}if (configuration.count("help")) {if (!_cfg.description.empty()) {std::cout << _cfg.description << "\n";}std::cout << _opts << "\n";return 1;}if (configuration["help-loggers"].as<bool>()) {log_cli::print_available_loggers(std::cout);return 1;}bpo::notify(configuration);// Needs to be before `smp::configure()`.try {apply_logging_settings(log_cli::extract_settings(configuration));} catch (const std::runtime_error& exn) {std::cout << "logging configuration error: " << exn.what() << '\n';return 1;}configuration.emplace("argv0", boost::program_options::variable_value(std::string(av[0]), false));try {smp::configure(configuration, reactor_config_from_app_config(_cfg));} catch (...) {std::cerr << "Could not initialize seastar: " << std::current_exception() << std::endl;return 1;}_configuration = {std::move(configuration)};// No need to wait for this future.// func is waited on via engine().run()(void)engine().when_started().then([this] {return seastar::metrics::configure(this->configuration()).then([this] {// set scollectd use the metrics configuration, so the later// need to be set firstscollectd::configure( this->configuration());});}).then(std::move(func)).then_wrapped([] (auto&& f) {try {f.get();} catch (std::exception& ex) {std::cout << "program failed with uncaught exception: " << ex.what() << "\n";engine().exit(1);}});auto exit_code = engine().run();smp::cleanup();return exit_code;
}

2.1.3.smp::configure

void smp::configure(boost::program_options::variables_map configuration, reactor_config reactor_cfg);

这个函数比较长,我就不贴代码了,讲一下它的主要内容。

(1)将当前线程绑定到cpu0

(2)初始化内存管理,每个cpu core分别使用不同的内存,互不干扰

(3)为除cpu0之外的所有cpu core(也可以配置成只使用部分cpu core)起一个线程,线程中分配一块内存给reactor类的对象,每个线程都有这个对象,然后再调用reactor::run方法开始运行主体内容。engine()获取到的就是reactor对象,每个线程都有一个独立的对象。

因内容较多,部分内容还不清楚具体作用,需要更详细了解的还需要阅读一下源码。

2.1.4.reactor::run

这是seastar框架的主体函数,初始化完之后,就一直在这个函数中运行了。

(1)poller

首先,会定义一些poller,下面是其中一部分:

    poller smp_poller(std::make_unique<smp_pollfn>(*this));poller reap_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));poller io_queue_submission_poller(std::make_unique<io_queue_submission_pollfn>(*this));poller kernel_submit_work_poller(std::make_unique<kernel_submit_work_pollfn>(*this));poller final_real_kernel_completions_poller(std::make_unique<reap_kernel_completions_pollfn>(*this));poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());

smp_poller处理其他cpu core的event,io_queue_submission_poller处理io任务,其他poller没有每个具体看,因为我对io比较熟悉,所以后面以io_queue_submission_poller为例介绍一下poller的机制。

(2)主体循环

poller等初始化完成后,就进入主体循环运行了,循环中主要是:

  • 调用run_some_tasks
  • 调用check_for_work,check_for_work中调用了poll_once,处理poller
  • 空闲时进入sleep

2.1.5.run_some_tasks

从_active_task_queues队列中取出任务运行,直到_active_task_queues变为空为止。

另外,会调用insert_activating_task_queues将activating队列中的内容插入到active队列中。

reactor::run_some_tasks() {if (!have_more_tasks()) {return;}sched_print("run_some_tasks: start");reset_preemption_monitor();sched_clock::time_point t_run_completed = std::chrono::steady_clock::now();STAP_PROBE(seastar, reactor_run_tasks_start);_cpu_stall_detector->start_task_run(t_run_completed);do {auto t_run_started = t_run_completed;insert_activating_task_queues();auto tq = _active_task_queues.front();_active_task_queues.pop_front();sched_print("running tq {} {}", (void*)tq, tq->_name);tq->_current = true;_last_vruntime = std::max(tq->_vruntime, _last_vruntime);run_tasks(*tq);tq->_current = false;t_run_completed = std::chrono::steady_clock::now();auto delta = t_run_completed - t_run_started;account_runtime(*tq, delta);sched_print("run complete ({} {}); time consumed {} usec; final vruntime {} empty {}",(void*)tq, tq->_name, delta / 1us, tq->_vruntime, tq->_q.empty());if (!tq->_q.empty()) {insert_active_task_queue(tq);} else {tq->_active = false;}} while (have_more_tasks() && !need_preempt());_cpu_stall_detector->end_task_run(t_run_completed);STAP_PROBE(seastar, reactor_run_tasks_end);*internal::current_scheduling_group_ptr() = default_scheduling_group(); // Prevent inheritance from last group runsched_print("run_some_tasks: end");
}

2.1.6. poll_once

调用所有poller的poll方法

bool
reactor::poll_once() {bool work = false;for (auto c : _pollers) {work |= c->poll();}return work;
}

2.2.future

主要讲一下then的流程,相关机制大致都可以看到了。

2.2.1then

    template <typename Func, typename Result = futurize_t<std::result_of_t<Func(T&&...)>>>GCC6_CONCEPT( requires ::seastar::CanInvoke<Func, T...> )Resultthen(Func&& func) noexcept {
#ifndef SEASTAR_TYPE_ERASE_MOREreturn then_impl(std::move(func));
#elsereturn then_impl(noncopyable_function<Result (T&&...)>([func = std::forward<Func>(func)] (T&&... args) mutable {return futurize_apply(func, std::forward_as_tuple(std::move(args)...));}));
#endif}

就是直接调用的then_impl,SEASTAR_TYPE_ERASE_MORE这个宏是应该调试使用的,一般是调用的上面这一句。

2.2.2then_impl

    template <typename Func, typename Result = futurize_t<std::result_of_t<Func(T&&...)>>>Resultthen_impl(Func&& func) noexcept {using futurator = futurize<std::result_of_t<Func(T&&...)>>;if (available() && !need_preempt()) {if (failed()) {return futurator::make_exception_future(static_cast<future_state_base&&>(get_available_state_ref()));} else {return futurator::apply(std::forward<Func>(func), get_available_state_ref().take_value());}}return then_impl_nrvo<Func, Result>(std::forward<Func>(func));}

两种情况:
1.future已经完成,直接运行

2.future未完成,则挂到任务队列中

available()就是future值已经获取到了,会调用futurator::apply函数直接执行。

need_preempt指有任务需要马上执行,这种情况下则不会离开运行then传入的任务,而是先放到任务队列当中。

then_impl_nrvo就是调用schedule函数将任务加入到任务队列当中。

2.2.3. then_impl_nrvo

    // Keep this simple so that Named Return Value Optimization is used.template <typename Func, typename Result>Result then_impl_nrvo(Func&& func) noexcept {using futurator = futurize<std::result_of_t<Func(T&&...)>>;typename futurator::type fut(future_for_get_promise_marker{});// If there is a std::bad_alloc in schedule() there is nothing that can be done about it, we cannot break future// chain by returning ready future while 'this' future is not ready. The noexcept will call std::terminate if// that happens.[&] () noexcept {using pr_type = decltype(fut.get_promise());memory::disable_failure_guard dfg;schedule(fut.get_promise(), [func = std::forward<Func>(func)] (pr_type& pr, future_state<T...>&& state) mutable {if (state.failed()) {pr.set_exception(static_cast<future_state_base&&>(std::move(state)));} else {try {futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {return ::seastar::apply(std::forward<Func>(func), std::move(state).get_value());});} catch (...) {pr.set_to_current_exception();}}});} ();return fut;}

主要就是调用schedule函数将任务加到的队列当中,已经进行了一些异常处理。这里调用的是future::schedule

2.2.4. future::schedule

    void schedule(Pr&& pr, Func&& func) noexcept {if (_state.available() || !_promise) {if (__builtin_expect(!_state.available() && !_promise, false)) {_state.set_to_broken_promise();}::seastar::schedule(new continuation<Pr, Func, T...>(std::move(pr), std::move(func), std::move(_state)));} else {assert(_promise);detach_promise()->schedule(std::move(pr), std::move(func));_state._u.st = future_state_base::state::invalid;}}

这里有两种情况:

(1)promise已经完成,或者future没有设置promise,则调用seastar::schedule将任务直接加入到任务队列当中。

(2)还需要等待promise完成,则将任务封装成continuation挂到promise下的_task,等到promise的set_value方法被调用后,再将任务挂到任务队列当中

2.2.5. seastar::schedule

void schedule(task* t) noexcept {engine().add_task(t);
}

schedule函数调用了engine().add_task(t),也就是reactor::add_task方法

这里传入的参数t是task类型,看看参数类型的变化:

  传入 传到下一层
then_impl Func&& func std::forward<Func>(func)
then_impl_nrvo Func&& func

[func = std::forward<Func>(func)] (pr_type& pr, future_state<T...>&& state) mutable {
                if (state.failed()) {
                    pr.set_exception(static_cast<future_state_base&&>(std::move(state)));
                } else {
                    try {
                        futurator::satisfy_with_result_of(std::move(pr), [&func, &state] {
                            return ::seastar::apply(std::forward<Func>(func), std::move(state).get_value());
                        });
                    } catch (...) {
                        pr.set_to_current_exception();
                    }
                }
            }

使用lambda表达式对func进行了一些封装

future::schedule Func&& func

new continuation<Pr, Func, T...>(std::move(pr), std::move(func), std::move(_state))

封装成continuation

seastar::schedule task* t  

seastar::schedule接受的是task类型,而传入的是continuation,是task的派生类,增加的内容:

(1)Promise _pr; 有一个promise成员

(2)future_state<T...> _state; 有一个future_state成员

(3)重载了run_and_dispose,进行了实现,就是调用了_func,就是需要执行的任务。

2.2.6. reactor::add_task

    void add_task(task* t) noexcept {auto sg = t->group();auto* q = _task_queues[sg._id].get();bool was_empty = q->_q.empty();q->_q.push_back(std::move(t));
#ifdef SEASTAR_SHUFFLE_TASK_QUEUEshuffle(q->_q.back(), *q);
#endifif (was_empty) {activate(*q);}}

加到了_task_queues当中。

sg._id是new continuation时赋值的。

2.3. IO流程

时间原因不展开将了,这里就贴一下主要的代码,流程图中都有说明了,要详细了解的话还是需要对着完整源码再看一下。

2.3.1. write_dma

future<size_t>
posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept {auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment);auto req = internal::io_request::make_writev(_fd, pos, iov);return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
}
future<size_t>
reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {++_io_stats.aio_writes;_io_stats.aio_write_bytes += len;return ioq->queue_request(pc, len, std::move(req));
}
future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {auto start = std::chrono::steady_clock::now();return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {_queued_requests++;// First time will hit here, and then we create the class. It is important// that we create the shared pointer in the same shard it will be used at later.auto& pclass = find_or_create_class(pc, owner);pclass.nr_queued++;unsigned weight;size_t size;if (req.is_write()) {weight = _config.disk_req_write_to_read_multiplier;size = _config.disk_bytes_write_to_read_multiplier * len;} else if (req.is_read()) {weight = io_queue::read_request_base_count;size = io_queue::read_request_base_count * len;} else {throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));}auto desc = std::make_unique<io_desc_read_write>(this, weight, size);auto fq_ticket = desc->fq_ticket();auto fut = desc->get_future();_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), desc = desc.release(), len, this] () mutable noexcept {_queued_requests--;_requests_executing++;try {pclass.nr_queued--;pclass.ops++;pclass.bytes += len;pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);engine().submit_io(desc, std::move(req));} catch (...) {desc->set_exception(std::current_exception());}});return fut;});
}
void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func) {// We need to return a future in this function on which the caller can wait.// Since we don't know which queue we will use to execute the next request - if ours or// someone else's, we need a separate promise at this point.push_priority_class(pc);_resources_queued += desc;pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)});_requests_queued++;
}
void
reactor::submit_io(kernel_completion* desc, io_request req) {req.attach_kernel_completion(desc);_pending_io.push_back(std::move(req));
}

2.3.2. io_queue_submission_pollfn::poll

class reactor::io_queue_submission_pollfn final : public reactor::pollfn {reactor& _r;
public:io_queue_submission_pollfn(reactor& r) : _r(r) {}virtual bool poll() final override {return _r.flush_pending_aio();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {// This is a passive poller, so if a previous poll// returned false (idle), there's no more work to do.return true;}virtual void exit_interrupt_mode() override final {}
};
bool
reactor::flush_pending_aio() {for (auto& ioq : my_io_queues) {ioq->poll_io_queue();}return false;
}
    // Dispatch requests that are pending in the I/O queuevoid poll_io_queue() {_fq.dispatch_requests();}
void fair_queue::dispatch_requests() {while (can_dispatch()) {priority_class_ptr h;do {h = pop_priority_class();} while (h->_queue.empty());auto req = std::move(h->_queue.front());h->_queue.pop_front();_resources_executing += req.desc;;_resources_queued -= req.desc;_requests_executing++;_requests_queued--;auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);auto req_cost  = req.desc.normalize(_maximum_capacity) / h->_shares;auto cost  = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;float next_accumulated = h->_accumulated + cost;while (std::isinf(next_accumulated)) {normalize_stats();// If we have renormalized, our time base will have changed. This should happen very infrequentlydelta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);cost  = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;next_accumulated = h->_accumulated + cost;}h->_accumulated = next_accumulated;if (!h->_queue.empty()) {push_priority_class(h);}req.func();}
}

req.func中调用engine().submit_io

void
reactor::submit_io(kernel_completion* desc, io_request req) {req.attach_kernel_completion(desc);_pending_io.push_back(std::move(req));
}

2.3.3. kernel_submit_work_pollfn::poll

class reactor::kernel_submit_work_pollfn final : public reactor::pollfn {reactor& _r;
public:kernel_submit_work_pollfn(reactor& r) : _r(r) {}virtual bool poll() override final {return _r._backend->kernel_submit_work();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {return true;}virtual void exit_interrupt_mode() override {// nothing to do}
};
bool reactor_backend_aio::kernel_submit_work() {_hrtimer_poll_completion.maybe_queue(_polling_io);bool did_work = _polling_io.flush();did_work |= _storage_context.submit_work();return did_work;
}
bool
aio_storage_context::submit_work() {size_t pending = _r->_pending_io.size();size_t to_submit = 0;bool did_work = false;_submission_queue.resize(0);while ((pending > to_submit) && _iocb_pool.has_capacity()) {auto& req = _r->_pending_io[to_submit++];auto& io = _iocb_pool.get_one();prepare_iocb(req, io);if (_r->_aio_eventfd) {set_eventfd_notification(io, _r->_aio_eventfd->get_fd());}if (aio_nowait_supported) {set_nowait(io, true);}_submission_queue.push_back(&io);}size_t submitted = 0;while (to_submit > submitted) {auto nr = to_submit - submitted;auto iocbs = _submission_queue.data() + submitted;auto r = io_submit(_io_context, nr, iocbs);size_t nr_consumed;if (r == -1) {nr_consumed = handle_aio_error(iocbs[0], errno);} else {nr_consumed = size_t(r);}submitted += nr_consumed;}_r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + submitted);if (!_pending_aio_retry.empty()) {schedule_retry();did_work = true;}return did_work;
}
int io_submit(aio_context_t io_context, long nr, iocb** iocbs) {return ::syscall(SYS_io_submit, io_context, nr, iocbs);
}

2.3.4. reap_kernel_completions_pollfn::poll

class reactor::reap_kernel_completions_pollfn final : public reactor::pollfn {reactor& _r;
public:reap_kernel_completions_pollfn(reactor& r) : _r(r) {}virtual bool poll() final override {return _r.reap_kernel_completions();}virtual bool pure_poll() override final {return poll(); // actually performs work, but triggers no user continuations, so okay}virtual bool try_enter_interrupt_mode() override {return _r._backend->kernel_events_can_sleep();}virtual void exit_interrupt_mode() override final {}
};
bool
reactor::reap_kernel_completions() {auto reaped = _backend->reap_kernel_completions();for (auto& ioq : my_io_queues) {ioq->process_completions();}return reaped;
}
bool reactor_backend_aio::reap_kernel_completions() {bool did_work = await_events(0, nullptr);did_work |= _storage_context.reap_completions();return did_work;
}
bool aio_storage_context::reap_completions()
{struct timespec timeout = {0, 0};auto n = io_getevents(_io_context, 1, max_aio, _ev_buffer, &timeout, _r->_force_io_getevents_syscall);if (n == -1 && errno == EINTR) {n = 0;}assert(n >= 0);unsigned nr_retry = 0;for (size_t i = 0; i < size_t(n); ++i) {auto iocb = get_iocb(_ev_buffer[i]);if (_ev_buffer[i].res == -EAGAIN) {++nr_retry;set_nowait(*iocb, false);_pending_aio_retry.push_back(iocb);continue;}_iocb_pool.put_one(iocb);auto desc = reinterpret_cast<kernel_completion*>(_ev_buffer[i].data);desc->complete_with(_ev_buffer[i].res);}return n;
}

seastar介绍及源码分析相关推荐

  1. Stable Diffusion 原理介绍与源码分析(一)

    Stable Diffusion 原理介绍与源码分析(一) 文章目录 Stable Diffusion 原理介绍与源码分析(一) 前言(与正文无关,可以忽略) 总览 说明 Stable Diffusi ...

  2. GAT 算法原理介绍与源码分析

    GAT 算法原理介绍与源码分析 文章目录 GAT 算法原理介绍与源码分析 零. 前言 (与正文无关, 请忽略) 广而告之 一. 文章信息 二. 核心观点 三. 核心观点解读 四. 源码分析 4.1 G ...

  3. ThreadLocal介绍以及源码分析

    ThreadLocal 线程主变量 前面部分引用其他优秀博客,后面源码自己分析的,如有冒犯请私聊我. 用Java语言开发的同学对 ThreadLocal 应该都不会陌生,这个类的使用场景很多,特别是在 ...

  4. Spring_AOP架构介绍与源码分析(含事务深度分析)

    请见链接:http://edu.51cto.com/course/16573.html?source=so 第1章课程介绍6分钟1节 1-1课程介绍[免费试看]06:11 第2章AOP深入分析52分钟 ...

  5. Redis 的 Sentinel哨兵介绍与源码分析(1):初始化部分

    http://www.redis.cn/topics/sentinel.html redis-6.0.8 本文是在官方中文文档的基础上进行的源码分析,其中包含完整的原文,并在此基础上,添加源码介绍. ...

  6. ArrayList相关方法介绍及源码分析

    目录 ArrayList简介: ArrayList 相关方法介绍 代码表示 相关方法源码分析 ArrayList简介: java.util.ArrayList 是我们最常用的一个类,ArrayList ...

  7. SDIO_WiFi驱动学习之SDIO架构介绍及源码分析

    一.引言 因为WiFi驱动比较复杂,所以WiFi驱动的博客将多分几篇来写. 本篇博客主要介绍Linux下的SDIO架构及源码分析. 本文部分内容摘抄自网络,若有侵权,请联系删除. 二.SDIO WiF ...

  8. python Django之 DRF(一)框架介绍、源码分析

    文章目录 一.django rest framework 框架的介绍 1.什么是RESTful规范? 2.RESTful API的介绍 二.drf框架源码解读 1.drf框架的使用 2.APIView ...

  9. python logistic回归_logistic回归介绍与源码分析

    1. 介绍(由线性模型引出logistic回归) 首先介绍一下什么是线性模型呢? 线性模型的定义如下:给定 个属性描述的样本 , 代表样本在第 个属性上的取值. 线性模型的目的是学习一个函数,它可以通 ...

最新文章

  1. 33.搜索旋转排序数组
  2. LIVE 预告 | 佐治亚理工杨笛一:少数据,多框架的自然语言处理
  3. 关于python的一些好的书籍推荐-如果只能推荐3本关于python的书,你会推荐哪3本?...
  4. UIButton-内边距
  5. 武汉理工大学c语言实验 编程解决鸡兔同笼问题,C语言程序设计实验指导
  6. IT项目管理总结:第四章 项目综合管理
  7. kicad最小布线宽度默认是多少_你想知道建仓库时叉车通道宽度留多少吗?
  8. Java Instanceof
  9. layui文档,镜像站
  10. 大学本科基于html5毕业设计题目50例
  11. linux-巴斯勒相机 GigE通信配置步骤
  12. 微信小程序父子组件传值
  13. Drupal7导入语言包
  14. 最齐全的文化石 艺术石VRay材质球素材,速来收藏
  15. 一个简单的例子来理解监督学习和非监督学习及其区别
  16. 中断系统的相关知识(二)(可位寻址、不可位寻址)
  17. 再生龙盘对盘拷贝Linux
  18. 每日一诗词 —— 忆秦娥·娄山关
  19. Pycharm远程访问ssh,远程访问服务器(xshell访问服务器)
  20. hadoop最新官网如何下载之前版本(2.7.1)

热门文章

  1. 快速找到python第三方库
  2. wkhtmltopdf 字体无效问题
  3. python tkinter数据库通讯录_python连接Mysql数据库写的小电话本
  4. php lottery,lottery.php · 韩志洋/CUPT2019_official_website - Gitee.com
  5. cuda无法使用nvprof命令,找不到cupti64_2021_3.0.dll
  6. 22.9.24 比赛
  7. 芯片破壁者(十六):德州仪器的“罗生门”
  8. oracle+union+连接,Oracle中union/union all/Intersect/Minus用法
  9. SeaJS 是什么?
  10. 【金融量化分析】#Financial Computation(利率、债券、期权相关数理知识与代码实现)