asio::io_context类剖析

前言

“源码面前,了无秘密”
所有使用asio的程序都需要至少有一个I/O执行上下文,例如io_context或thread_pool对象。I/O执行上下文提供对I/O功能的访问。

本文使用的asio为non-boost-asio,版本号:asio-1.24.0

pc环境:ubuntu 20

开发工具:vs2022 windows通过ssh远程unubtu调试代码

一、io_context类概述

先将io_context类源码声明贴出来。

class io_context: public execution_context
{private:typedef detail::io_context_impl impl_type;
#if defined(ASIO_HAS_IOCP)friend class detail::win_iocp_overlapped_ptr;
#endifpublic:class executor_type;friend class executor_type;#if !defined(ASIO_NO_DEPRECATED)class work;friend class work;
#endif // !defined(ASIO_NO_DEPRECATED)class service;#if !defined(ASIO_NO_EXTENSIONS)class strand;
#endif // !defined(ASIO_NO_EXTENSIONS)/// 用于计算上下文执行的处理程序数的类型。typedef std::size_t count_type;/// 构造ASIO_DECL io_context();/// 构造/*** 使用并发进行构造。** @param concurrency_hint 并发数量*/ASIO_DECL explicit io_context(int concurrency_hint);/// 析构./*** 销毁时,io_context执行以下序列* 操作:** @li 对于io_context集中的每个服务对象@c svc,按相反顺序* 服务对象生命周期开始时,执行* @c svc->shutdown().** @li 计划用于延迟调用的未调用处理程序对象* 在io_context或任何相关联的链上,都会被销毁。** @li 对于io_context集中的每个服务对象@c svc,按相反的顺序* 服务对象生命周期开始时,执行* <tt>delete static_cast<io_context::service*>(svc)</tt>.** @note 上述销毁顺序允许程序* 通过使用@cshared_ptr来简化他们的资源管理。如果* 对象的生存期与连接的生存期(或其他* 异步操作序列),对象的@c shared_ptr将* 绑定到与关联的所有异步操作的处理程序中* 它的工作原理如下:** @li 当单个连接结束时,所有关联的异步操作* 完整。相应的处理程序对象被销毁,并且* @c shared_ptr对对象的引用被销毁。** @li 要关闭整个程序,io_context函数stop()是* 调用以尽快终止任何run()调用。io_context* 上面定义的析构函数破坏所有处理程序,导致所有@c shared_ptr* 对所有要销毁的连接对象的引用。*/ASIO_DECL ~io_context();/// 获取与io_context关联的执行器。executor_type get_executor() ASIO_NOEXCEPT;/// 运行io_context对象的事件处理循环。/*** run()函数会阻塞,直到所有工作都完成,并且没有* 要调度更多的处理程序,或者直到io_context停止。** 多个线程可以调用run()函数来设置线程池* io_context可以从中执行处理程序。所有的线程    * 在池中等待是等效的,io_context可以选择任何一个    * 以调用一个处理程序。** run()函数的正常退出意味着io_context对象* 已停止(stopped()函数返回@ctrue)。对的后续调用 * run()、run_one()、poll()或poll_one()将立即返回,除非存在* 是对restart()的先前调用。** @return 已执行的处理程序数。** @note 从当前正在调用的线程调用run()函数* 上的run()、run_one()、run _for()、run_until()、poll()或poll_one()之一* 相同的io_context对象可能会引入死锁的可能性。它是* 呼叫方避免这种情况的责任。** poll()函数也可以用于调度准备就绪的处理程序,但是没有阻塞。*/ASIO_DECL count_type run();#if !defined(ASIO_NO_DEPRECATED)/// (不推荐:使用非error_code重载。)运行io_context对象的事件处理循环。/*** run()函数会阻塞,直到所有工作都完成,并且没有* 要调度更多的处理程序,或者直到io_context停止。** 多个线程可以调用run()函数来设置线程池* io_context可以从中执行处理程序。所有的线程  * 在池中等待是等效的,io_context可以选择任何一个 * 以调用一个处理程序。** run()函数的正常退出意味着io_context对象* 已停止(stopped()函数返回@ctrue)。对的后续调用* run()、run_one()、poll()或poll_one()将立即返回,除非存在* 是对restart()的先前调用。** @param ec 设置为指示发生了什么错误(如果有)。** @return 已执行的处理程序数。** @note 从当前正在调用的线程调用run()函数* 上的run()、run_one()、run _for()、run_until()、poll()或poll_one()之一* 相同的io_context对象可能会引入死锁的可能性。它是* 呼叫方避免这种情况的责任。** poll()函数也可以用于调度准备就绪的处理程序,但是没有阻塞。*/ASIO_DECL count_type run(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)#if defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)/// 为指定的对象运行io_context对象的事件处理循环///持续时间。/*** run_for()函数会阻塞,直到所有工作都完成,并且没有    * 要调度的更多处理程序,直到io_context停止,或者   * 直到指定的持续时间过去。** @param rel_time 呼叫可能被阻止的持续时间。** @return 已执行的处理程序数。*/template <typename Rep, typename Period>std::size_t run_for(const chrono::duration<Rep, Period>& rel_time);///运行io_context对象的事件处理循环,直到指定的时间。/*** run_until()函数会阻塞,直到所有工作都完成,并且* 在io_context停止之前,不再调度任何处理程序,* 或者直到达到指定的时间为止。** @param abs_time 呼叫可能被阻塞的时间点。** @return 已执行的处理程序数。*/template <typename Clock, typename Duration>std::size_t run_until(const chrono::time_point<Clock, Duration>& abs_time);
#endif // defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)///运行io_context对象的事件处理循环,最多执行一个///处理程序。/*** run_one()函数会阻塞,直到调度了一个处理程序,或者* 直到io_context已经停止。** @return 已执行的处理程序数。零返回值* 意味着io_context对象已停止(stopped()函数* 返回@ctrue)。随后调用run()、run_one()、poll()或* poll_one()将立即返回,除非之前有对的调用* restart().** @note 从当前正在运行的线程调用run_one()函数* 调用run()、run_one()、run_for()、run_until()、poll()或* 对同一io_context对象的poll_one()可能会引入* 死锁。避免这种情况是呼叫方的责任。*/ASIO_DECL count_type run_one();#if !defined(ASIO_NO_DEPRECATED)/// (不推荐:使用nonerror_code overlaod。)运行io_context对象的///事件处理循环最多执行一个处理程序。/***run_one()函数会阻塞,直到调度了一个处理程序,或者*直到io_context已经停止。** @return 已执行的处理程序数。零返回值* 意味着io_context对象已停止(stopped()函数* 返回@ctrue)。随后调用run()、run_one()、poll()或* poll_one()将立即返回,除非之前有对的调用* restart().** @return 已执行的处理程序数。** @note 从当前正在运行的线程调用run_one()函数* 调用run()、run_one()、run_for()、run_until()、poll()或* 对同一io_context对象的poll_one()可能会引入* 死锁。避免这种情况是呼叫方的责任。*/ASIO_DECL count_type run_one(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)#if defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)/// 在指定的持续时间内运行io_context对象的事件处理循环/// 最多执行一个处理程序。/***函数run_one_for()阻塞,直到调度了一个处理程序,*直到io_context停止,或者直到指定的持续时间*经过。** @param rel_time 呼叫可能被阻止的持续时间。** @return 已执行的处理程序数。*/template <typename Rep, typename Period>std::size_t run_one_for(const chrono::duration<Rep, Period>& rel_time);///运行io_context对象的事件处理循环,直到指定时间///最多执行一个处理程序。/*** run_one_until()函数阻塞,直到调度了一个处理程序,* 直到io_context停止,或者直到指定的时间* 已联系到。** @param abs_time 呼叫可能被阻塞的时间点。** @return 已执行的处理程序数。*/template <typename Clock, typename Duration>std::size_t run_one_until(const chrono::time_point<Clock, Duration>& abs_time);
#endif // defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)/// 运行io_context对象的事件处理循环以准备执行/// 处理程序。/*** poll()函数运行的处理程序可以在没有阻塞的情况下运行,* 直到io_context已经停止或者没有更多准备好的处理程序。** @return 已执行的处理程序数。*/ASIO_DECL count_type poll();#if !defined(ASIO_NO_DEPRECATED)///(不推荐:使用非error_code重载。)运行io_context对象的/// 用于执行就绪处理程序的事件处理循环。/*** poll()函数运行的处理程序可以在没有阻塞的情况下运行,* 直到io_context已经停止或者没有更多准备好的处理程序。** @param ec 设置为指示发生了什么错误(如果有)。** @return 已执行的处理程序数。*/ASIO_DECL count_type poll(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)/// 运行io_context对象的事件处理循环以执行一个就绪的/// 处理程序。/*** poll_one()函数最多运行一个准备运行的处理程序,* 没有阻塞。** @return 已执行的处理程序数。*/ASIO_DECL count_type poll_one();#if !defined(ASIO_NO_DEPRECATED)///(不推荐:使用非error_code重载。)运行io_context对象的/// 事件处理循环以执行一个就绪的处理程序。/*** poll_one()函数最多运行一个准备运行的处理程序,* 没有阻塞。** @param ec 设置为指示发生了什么错误(如果有)。** @return 已执行的处理程序数。*/ASIO_DECL count_type poll_one(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)/// 停止io_context对象的事件处理循环。/*** 此函数不阻塞,而是简单地向* 停止。所有对其run()或run_one()成员函数的调用都应该* 尽快返回。对run()、run_one()、poll()的后续调用* 或者poll_one()将立即返回,直到调用restart()为止。*/ASIO_DECL void stop();/// 确定io_context对象是否已停止。/*** 此函数用于确定io_context对象是否已* 已停止,要么是通过对stop()的显式调用,要么是由于已用完* 工作。当io_context对象停止时,调用run()、run_one(),* poll()或poll_one()将立即返回,而不调用任何* 处理程序。** @return 如果io_context对象已停止,则为@c true,否则为@c false。*/ASIO_DECL bool stopped() const;/// 重新启动io_context,为后续的run()调用做准备。/*** 此函数必须在任何第二组或以后的* 当* 之前对这些函数的调用由于io_context而返回* 被停止工作或失业。调用restart()后* io_context对象的stopped()函数将返回@c false。** 当有任何未完成的调用时,不得调用此函数* run()、run_one()、poll()或poll_one()函数。*/ASIO_DECL void restart();#if !defined(ASIO_NO_DEPRECATED)///(已弃用:请使用restart()。)重置io_context以准备///随后的run()调用。/*** 此函数必须在任何第二组或以后的* 当* 之前对这些函数的调用由于io_context而返回* 被停止工作或失业。调用restart()后* io_context对象的stopped()函数将返回@c false。* * 当有任何未完成的调用时,不得调用此函数* run()、run_one()、poll()或poll_one()函数。*/void reset();/// (已弃用:使用asio::dispatch().)向请求io_context/// 调用给定的处理程序。/*** 此函数用于要求io_context执行给定的处理程序。** io_context保证只在线程中调用处理程序* 其中run()、run_one()、poll()或poll_one()成员函数* 当前正在调用。处理程序可以在此函数内执行* 如果保证能够得到满足。** @param handler要调用的处理程序。io_context将使* 根据需要提供处理程序对象的副本。的函数签名* 处理程序必须是:@code void handler()@结束代码** @note 只有在以下情况下,此函数才会引发异常:** @li 处理程序的@casio_handler_allocate函数;或** @li 处理程序的复制构造函数** 引发异常。*/template <typename LegacyCompletionHandler>ASIO_INITFN_RESULT_TYPE(LegacyCompletionHandler, void ())dispatch(ASIO_MOVE_ARG(LegacyCompletionHandler) handler);///(已弃用:使用asio::post()。)请求要调用的io_context/// 给定的处理程序并立即返回。/*** 该函数用于要求io_ text执行给定的处理程序,* 但不允许io_context从内部调用处理程序* 功能。** io_context保证只在线程中调用处理程序* 其中run()、run_one()、poll()或poll_one()成员函数* 当前正在调用。** @param handler要调用的处理程序。io_context将使* 根据需要提供处理程序对象的副本。的函数签名* 处理程序必须是:@code void handler()@结束代码** @note 只有在以下情况下,此函数才会引发异常:** @li 处理程序的@casio_handler_allocate函数;或** @li 处理程序的复制构造函数** 引发异常。*/template <typename LegacyCompletionHandler>ASIO_INITFN_RESULT_TYPE(LegacyCompletionHandler, void ())post(ASIO_MOVE_ARG(LegacyCompletionHandler) handler);///(已弃用:请使用asio::bind_executor()。)创建一个新的处理程序/// 自动在io_context上分派包装的处理程序。/*** 此函数用于创建一个新的处理程序函数对象,当* 调用时,将自动将包装的处理程序传递给io_context* 对象的调度函数。** @param handler要包装的处理程序。io_context将进行复制* 处理程序对象的。处理程序的函数签名* 必须是:@code void handler(A1 A1,…An An)@结束代码** @return 一个函数对象,当被调用时,它将包装的处理程序传递给* io_context对象的调度函数。给定一个函数对象* 签名:* @code R f(A1 a1, ... An an); @endcode* 如果将此函数对象传递给wrap函数,如下所示:* @code io_context.wrap(f); @endcode* 则返回值是具有签名的函数对象* @code void g(A1 a1, ... An an); @endcode* 当被调用时,执行等效于以下内容的代码:* @code io_context.dispatch(boost::bind(f, a1, ... an)); @endcode*/template <typename Handler>
#if defined(GENERATING_DOCUMENTATION)unspecified
#elsedetail::wrapped_handler<io_context&, Handler>
#endifwrap(Handler handler);
#endif // !defined(ASIO_NO_DEPRECATED)private:// 用于添加实现的Helper函数。ASIO_DECL impl_type& add_impl(impl_type* impl);// Backwards compatible overload for use with services derived from// io_context::service.template <typename Service>friend Service& use_service(io_context& ioc);#if defined(ASIO_WINDOWS) || defined(__CYGWIN__)detail::winsock_init<> init_;
#elif defined(__sun) || defined(__QNX__) || defined(__hpux) || defined(_AIX) \|| defined(__osf__)detail::signal_init<> init_;
#endif// The implementation.impl_type& impl_;
};

二、io_context成员变量impl_剖析

impl_变量声明追踪

  // The implementation.impl_type& impl_;

跳转impl_type类声明

class io_context: public execution_context
{private:typedef detail::io_context_impl impl_type;
#if defined(ASIO_HAS_IOCP)friend class detail::win_iocp_overlapped_ptr;
#endif

跳转detail::io_context_impl类声明

namespace detail {#if defined(ASIO_HAS_IOCP)typedef class win_iocp_io_context io_context_impl;class win_iocp_overlapped_ptr;
#elsetypedef class scheduler io_context_impl;
#endif
} // namespace detail

追踪impl_变量到这里,我们可以知道,impl_参数其实是类scheduler
下面主要研究下类scheduler

三、scheduler类剖析

先看scheduler类声明

struct scheduler_thread_info;class scheduler: public execution_context_service_base<scheduler>,public thread_context
{public:typedef scheduler_operation operation;//构造函数。指定可能发生以下情况的并发线程数//运行调度程序。如果设置为1,则执行某些优化。ASIO_DECL scheduler(asio::execution_context& ctx,int concurrency_hint = 0);// 销毁服务所拥有的所有用户定义的处理程序对象。ASIO_DECL void shutdown();// 如果需要,初始化任务。ASIO_DECL void init_task();// 运行事件循环,直到中断或没有更多工作为止。ASIO_DECL std::size_t run(asio::error_code& ec);// 运行直到中断或执行一个操作。ASIO_DECL std::size_t run_one(asio::error_code& ec);// 运行直到超时、中断或执行一个操作。ASIO_DECL std::size_t wait_one(long usec, asio::error_code& ec);// 轮询操作而不阻塞。ASIO_DECL std::size_t poll(asio::error_code& ec);// 轮询一个操作而不阻塞。ASIO_DECL std::size_t poll_one(asio::error_code& ec);// 中断事件处理循环。ASIO_DECL void stop();// 确定计划程序是否已停止。ASIO_DECL bool stopped() const;// 重新启动,为后续的运行调用做准备ASIO_DECL void restart();// 通知某些工作已开始。void work_started(){++outstanding_work_;}//用于补偿即将到来的work_finished调用。必须调用//来自调度程序拥有的线程。ASIO_DECL void compensating_work_started();// 通知某些工作已经完成。void work_finished(){if (--outstanding_work_ == 0)stop();}// 返回是否可以立即调度处理程序。bool can_dispatch(){return thread_call_stack::contains(this) != 0;}//请求调用给定的操作并立即返回。假设//尚未为该操作调用work_started()。ASIO_DECL void post_immediate_completion(operation* op, bool is_continuation);//请求调用给定的操作并立即返回。假设//之前为该操作调用了work_started()。ASIO_DECL void post_deferred_completion(operation* op);//请求调用给定的操作并立即返回。假设//之前为每个操作调用了work_started()。ASIO_DECL void post_deferred_completions(op_queue<operation>& ops);//在尝试调度失败后,将给定的操作排队//用于立即调用的操作。ASIO_DECL void do_dispatch(operation* op);//将未完成的操作作为停机操作的一部分进行处理。假设//work_started()之前是为操作调用的。ASIO_DECL void abandon_operations(op_queue<operation>& ops);// 获取用于初始化调度程序的并发提示。int concurrency_hint() const{return concurrency_hint_;}private:// The mutex type used by this scheduler.此计划程序使用的互斥对象类型。typedef conditionally_enabled_mutex mutex;// The event type used by this scheduler.此计划程序使用的事件类型。typedef conditionally_enabled_event event;// Structure containing thread-specific data.包含线程特定数据的结构。typedef scheduler_thread_info thread_info;// 最多运行一个操作。可能会阻塞。ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,thread_info& this_thread, const asio::error_code& ec);// 在超时的情况下最多运行一个操作。可能会阻塞。ASIO_DECL std::size_t do_wait_one(mutex::scoped_lock& lock,thread_info& this_thread, long usec, const asio::error_code& ec);// 最多轮询一个操作。ASIO_DECL std::size_t do_poll_one(mutex::scoped_lock& lock,thread_info& this_thread, const asio::error_code& ec);// 停止任务和所有空闲线程。ASIO_DECL void stop_all_threads(mutex::scoped_lock& lock);// 唤醒单个空闲线程或任务,并始终解锁互斥锁。ASIO_DECL void wake_one_thread_and_unlock(mutex::scoped_lock& lock);// 帮助程序类,用于在块退出时执行与任务相关的操作。struct task_cleanup;friend struct task_cleanup;// 在块退出时调用与工作相关的操作的帮助程序类。struct work_cleanup;friend struct work_cleanup;// 是否针对单线程用例进行优化。const bool one_thread_;// 锁mutable mutex mutex_;// 唤醒被阻止线程的事件。event wakeup_event_;// 此服务要运行的任务。reactor* task_;// 操作对象,用于表示任务在队列中的位置。struct task_operation : operation{task_operation() : operation(0) {}} task_operation_;// 任务中断标识bool task_interrupted_;// 原子操作,未完成工作的数量。atomic_count outstanding_work_;// 任务队列op_queue<operation> op_queue_;// 停止标识bool stopped_;// 关闭标识bool shutdown_;// 用于初始化调度程序的并发提示const int concurrency_hint_;
};

3.1、scheduler类构造剖析

1、最开始我们看到io_context的构造函数

io_context::io_context(): impl_(add_impl(new impl_type(*this, ASIO_CONCURRENCY_HINT_DEFAULT)))
{}// 添加服务
io_context::impl_type& io_context::add_impl(io_context::impl_type* impl)
{asio::detail::scoped_ptr<impl_type> scoped_impl(impl);asio::add_service<impl_type>(*this, scoped_impl.get());return *scoped_impl.release();
}

2、而impl_type我们追踪到是scheduler类,如此,再看看scheduler类对应构造函数

  //构造函数。指定可能发生以下情况的并发线程数//运行调度程序。如果设置为1,则执行某些优化。ASIO_DECL scheduler(asio::execution_context& ctx,int concurrency_hint = 0);scheduler::scheduler(asio::execution_context& ctx, int concurrency_hint): asio::detail::execution_context_service_base<scheduler>(ctx),one_thread_(concurrency_hint == 1|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(SCHEDULER, concurrency_hint)|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(REACTOR_IO, concurrency_hint)),mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(SCHEDULER, concurrency_hint)),task_(0),task_interrupted_(true),outstanding_work_(0),stopped_(false),shutdown_(false),concurrency_hint_(concurrency_hint)
{ASIO_HANDLER_TRACKING_INIT;
}    

3、one_thread_

one_thread_(concurrency_hint == 1|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(SCHEDULER, concurrency_hint)|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(REACTOR_IO, concurrency_hint))...// 是否针对单线程用例进行优化。const bool one_thread_;

4、mutex_

    mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(SCHEDULER, concurrency_hint)),

5、task_追踪

  // 此服务要运行的任务。reactor* task_;...typedef class epoll_reactor reactor;...epoll_reactor类初始化// Constructor.ASIO_DECL epoll_reactor(asio::execution_context& ctx);

6、ASIO_DECL epoll_reactor(asio::execution_context& ctx);剖析

epoll_reactor::epoll_reactor(asio::execution_context& ctx): execution_context_service_base<epoll_reactor>(ctx),scheduler_(use_service<scheduler>(ctx)),mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(REACTOR_REGISTRATION, scheduler_.concurrency_hint())),interrupter_(),epoll_fd_(do_epoll_create()),timer_fd_(do_timerfd_create()),shutdown_(false),registered_descriptors_mutex_(mutex_.enabled())
{// Add the interrupter's descriptor to epoll.epoll_event ev = { 0, { 0 } };ev.events = EPOLLIN | EPOLLERR | EPOLLET;ev.data.ptr = &interrupter_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);interrupter_.interrupt();// Add the timer descriptor to epoll.if (timer_fd_ != -1){ev.events = EPOLLIN | EPOLLERR;ev.data.ptr = &timer_fd_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);}
}

这里可以看到epoll初始化调用流程,对epoll进行初始化。

3.2、run()函数剖析

scheduler类主要研究下run()函数,由于在ubuntu系统上,我们将会看到epoll之类的代码。
函数声明

  // 运行事件循环,直到中断或没有更多工作为止。ASIO_DECL std::size_t run(asio::error_code& ec);

实现

std::size_t scheduler::run(asio::error_code& ec)
{ec = asio::error_code();// outstanding_work_原子操作,未完成工作的数量。// 当outstanding_work_为0,程序没有要完成的工作,停止退出,这里的退出指asio的io操作被停止,直到调用restart()恢复使用为止。if (outstanding_work_ == 0){stop();return 0;}// 绑定线程操作thread_info this_thread;this_thread.private_outstanding_work = 0;thread_call_stack::context ctx(this, this_thread);// 加锁mutex::scoped_lock lock(mutex_);// do_run_one返回1为真,lock解锁,运行循环,n++;// do_run_one返回0为假,退出循环,函数结束std::size_t n = 0;for (; do_run_one(lock, this_thread, ec); lock.lock())if (n != (std::numeric_limits<std::size_t>::max)())++n;return n;
}

3.2.1、run函数剖析:

3.2.1.1、run函数先判断outstanding_work未完成工作的数量是否为0;当outstanding_work_为0,程序没有要完成的工作,停止退出,这里的退出指asio的io操作被停止,直到调用restart()恢复使用为止。
  if (outstanding_work_ == 0){stop();return 0;}
3.2.1.2、绑定线程及加锁
  // 绑定线程操作thread_info this_thread;this_thread.private_outstanding_work = 0;thread_call_stack::context ctx(this, this_thread);// 加锁mutex::scoped_lock lock(mutex_);

thread_info 为每个线程的独享变量,内部有private_op_queue, 用于临时存储触发事件的descriptor_state,以及完成io读写的reactor_op

struct scheduler_thread_info : public thread_info_base
{op_queue<scheduler_operation> private_op_queue;long private_outstanding_work;
};

context 为上下文,以链表形式存储context对象,context 的 id为schedule对象的地址,value为this_info , 所以可以根据一个io_context 找到它所有的thread_info,也即可以很容易却修改不同线程的局部operation队列 private_op_queue。

  class context: private noncopyable{public:// Push the key on to the stack.explicit context(Key* k): key_(k),next_(call_stack<Key, Value>::top_){value_ = reinterpret_cast<unsigned char*>(this);call_stack<Key, Value>::top_ = this;}// Push the key/value pair on to the stack.context(Key* k, Value& v): key_(k),value_(&v),next_(call_stack<Key, Value>::top_){call_stack<Key, Value>::top_ = this;}// Pop the key/value pair from the stack.~context(){call_stack<Key, Value>::top_ = next_;}// Find the next context with the same key.Value* next_by_key() const{context* elem = next_;while (elem){if (elem->key_ == key_)return elem->value_;elem = elem->next_;}return 0;}private:friend class call_stack<Key, Value>;// The key associated with the context.Key* key_;// The value associated with the context.Value* value_;// The next element in the stack.context* next_;};
3.2.1.3、run函数核心处理逻辑
  // do_run_one返回1为真,lock解锁,运行循环,n++;// do_run_one返回0为假,退出循环,函数结束std::size_t n = 0;for (; do_run_one(lock, this_thread, ec); lock.lock())if (n != (std::numeric_limits<std::size_t>::max)())++n;

3.2.2、do_run_one函数剖析

声明

  // 最多运行一个操作。可能会阻塞。ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,thread_info& this_thread, const asio::error_code& ec);

实现

std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,scheduler::thread_info& this_thread,const asio::error_code& ec)
{// stopped_结束标识while (!stopped_){// 不断读取op_queue_队列,队列有数据则处理if (!op_queue_.empty()){// 取出对列头节点,并移除operation* o = op_queue_.front();op_queue_.pop();bool more_handlers = (!op_queue_.empty());// 如果o对列头节点为最后一个节点if (o == &task_operation_){task_interrupted_ = more_handlers;if (more_handlers && !one_thread_)wakeup_event_.unlock_and_signal_one(lock);elselock.unlock();task_cleanup on_exit = { this, &lock, &this_thread };(void)on_exit;       // (void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。// Run the task. May throw an exception. Only block if the operation// queue is empty and we're not polling, otherwise we want to return// as soon as possible.task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);}else{std::size_t task_result = o->task_result_;if (more_handlers && !one_thread_)wake_one_thread_and_unlock(lock);elselock.unlock();// Ensure the count of outstanding work is decremented on block exit.work_cleanup on_exit = { this, &lock, &this_thread };(void)on_exit;// Complete the operation. May throw an exception. Deletes the object.o->complete(this, ec, task_result);return 1;}}else{wakeup_event_.clear(lock);wakeup_event_.wait(lock);}}return 0;
}
3.2.2.1、do_run_one简单剖析

1、stopped_结束标识,判断函数是否结束

  while (!stopped_)

2、op_queue_任务队列,不断循环处理,判断队列是否有元素

      // 不断读取op_queue_队列,队列有数据则处理if (!op_queue_.empty())

3、op_queue_任务队列如果为空,重置事件并等待事件发出信号。

    else{wakeup_event_.clear(lock);  // 重置事件。wakeup_event_.wait(lock);   // 等待事件发出信号。}

4、op_queue_任务队列如果不为空,先取出队列元素

      // 取出对列头节点,并移除operation* o = op_queue_.front();op_queue_.pop();bool more_handlers = (!op_queue_.empty());

5、然后对该元素判断,是否为最后个元素,如果不是,则完成操作,删除对象。

      if (o == &task_operation_){...}else{std::size_t task_result = o->task_result_;if (more_handlers && !one_thread_)wake_one_thread_and_unlock(lock);elselock.unlock();// 确保在块退出时减少未完成工作的计数。work_cleanup on_exit = { this, &lock, &this_thread };(void)on_exit;// 完成操作。可能引发异常。删除对象。o->complete(this, ec, task_result);return 1;}

6、o->complete追踪

o->complete(this, ec, task_result);...void complete(void* owner, const asio::error_code& ec,std::size_t bytes_transferred){func_(owner, this, ec, bytes_transferred);}...func_type func_;...typedef void (*func_type)(void*,scheduler_operation*,const asio::error_code&, std::size_t);

从op中获取执行结果(注意io操作,第一次结果为触发事件类型,第二次为io操作结果)
调用op的完成函数(第一次为根据返回事件类型读写数据,第二次为将读写的结果传给用户回调)

7、op_queue_取出的元素,如果是最后一个元素

      if (o == &task_operation_){// task_interrupted_任务中断标识task_interrupted_ = more_handlers;if (more_handlers && !one_thread_)wakeup_event_.unlock_and_signal_one(lock);elselock.unlock();task_cleanup on_exit = { this, &lock, &this_thread };(void)on_exit;       // (void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。// Run the task. May throw an exception. Only block if the operation// queue is empty and we're not polling, otherwise we want to return// as soon as possible.task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);}else{...}

8、将more_handlers赋值给task_interrupted_任务中断标识,more_handlers由判断前获取

bool more_handlers = (!op_queue_.empty());

解锁

        if (more_handlers && !one_thread_)wakeup_event_.unlock_and_signal_one(lock);elselock.unlock();

9、task_cleanup类只有一个析构函数,作用是将已完成的操作排队,并在结束时重新插入任务操作队列。

        task_cleanup on_exit = { this, &lock, &this_thread };(void)on_exit;

(void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。
10、task->run剖析

task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);

task->run函数追踪

  // 此服务要运行的任务。reactor* task_;...#if defined(ASIO_HAS_IOCP) || defined(ASIO_WINDOWS_RUNTIME)
typedef class null_reactor reactor;
#elif defined(ASIO_HAS_IOCP)
typedef class select_reactor reactor;
#elif defined(ASIO_HAS_EPOLL)
typedef class epoll_reactor reactor;
#elif defined(ASIO_HAS_KQUEUE)
typedef class kqueue_reactor reactor;
#elif defined(ASIO_HAS_DEV_POLL)
typedef class dev_poll_reactor reactor;
#else
typedef class select_reactor reactor;
#endif
// 这里的reactor是typedef class epoll_reactor reactor;
...

task->run函数

void epoll_reactor::run(long usec, op_queue<operation>& ops)
{int timeout;if (usec == 0)timeout = 0;else{timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);if (timer_fd_ == -1){mutex::scoped_lock lock(mutex_);timeout = get_timeout(timeout);}}// Block on the epoll descriptor.epoll_event events[128];int num_events = epoll_wait(epoll_fd_, events, 128, timeout);#if defined(ASIO_HAS_TIMERFD)bool check_timers = (timer_fd_ == -1);
#else // defined(ASIO_HAS_TIMERFD)bool check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)// Dispatch the waiting events.for (int i = 0; i < num_events; ++i){void* ptr = events[i].data.ptr;if (ptr == &interrupter_){#if defined(ASIO_HAS_TIMERFD)if (timer_fd_ == -1)check_timers = true;
#else // defined(ASIO_HAS_TIMERFD)check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)}
#if defined(ASIO_HAS_TIMERFD)else if (ptr == &timer_fd_){check_timers = true;}
#endif // defined(ASIO_HAS_TIMERFD)else{// The descriptor operation doesn't count as work in and of itself, so we// don't call work_started() here. This still allows the scheduler to// stop if the only remaining operations are descriptor operations.descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);if (!ops.is_enqueued(descriptor_data)){descriptor_data->set_ready_events(events[i].events);ops.push(descriptor_data);}else{descriptor_data->add_ready_events(events[i].events);}}}if (check_timers){mutex::scoped_lock common_lock(mutex_);timer_queues_.get_ready_timers(ops);#if defined(ASIO_HAS_TIMERFD)if (timer_fd_ != -1){itimerspec new_timeout;itimerspec old_timeout;int flags = get_timeout(new_timeout);timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);}
#endif // defined(ASIO_HAS_TIMERFD)}
}

这里的run函数,看代码直观了解到,是调用epoll的逻辑
epoll_wait

  // Block on the epoll descriptor.epoll_event events[128];int num_events = epoll_wait(epoll_fd_, events, 128, timeout);

对num_events的响应处理,塞入我们传入ops队列

  // Dispatch the waiting events.for (int i = 0; i < num_events; ++i){...descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);if (!ops.is_enqueued(descriptor_data)){// 读写descriptor_data->set_ready_events(events[i].events);ops.push(descriptor_data);}else{// 新加入链接descriptor_data->add_ready_events(events[i].events);}}

run()封装了epoll,并将触发了事件的文件描述符fd绑定的descriptor_state,传给thread_info的private_op_queue,本次循环结束后添加到op_queue中。

这里再返回task_->run函数,我们对scheduler::do_run_one函数剖析完成,scheduler::run函数也是。

参考链接:https://blog.csdn.net/qq_38166063/article/details/124278707

asio剖析(一)、asio::io_context类剖析相关推荐

  1. Spring Boot 2.x 启动全过程源码分析(上)入口类剖析

    转载自   Spring Boot 2.x 启动全过程源码分析(上)入口类剖析 Spring Boot 的应用教程我们已经分享过很多了,今天来通过源码来分析下它的启动过程,探究下 Spring Boo ...

  2. Asp.Net MVC SingleServiceResolver类剖析

    Asp.Net MVC SingleServiceResolver类剖析 SingleServiceResolver一般用于类工厂创建和注入点接口留白.类工厂创建比如Controller控制依赖于此类 ...

  3. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  4. Python爬虫之Scrapy框架系列(16)——深入剖析request和response类

    目录: Request和Response类: 1. 深入剖析Request类: 利用request.meta传递参数 拓展一:FormRequest类 2. 深入剖析Response类: Reques ...

  5. boost asio 文件服务器,使用boost ASIO库封装TCP服务器类

    使用异步TCP方式,可在此基础上增加更多功能. 头文件AsioTcp.h: #pragma once #include #include #include typedef boost::asio::i ...

  6. java的内部类和匿名类剖析

    Java 1.1通过对Java语言规范进行修改,显着简化了一些实用结构的实现.在那些修改中,最引人注目的就是内部类和匿名类.如运用得当,它们可使程序更易理解和维护.本文介绍内部类和匿名类在Java代码 ...

  7. Android-源码剖析CountDownTimer(倒计时类)

    简介 CounterDownTImer是Android系统自带的一个倒计时器,特别是在做app登录时会比较有用. 用法 非常简单,比如做个倒计时60s且每隔1s会刷新一下,可以这样写 new Coun ...

  8. java.lang 源码剖析_java.lang.Void类源码解析

    在一次源码查看ThreadGroup的时候,看到一段代码,为以下: /* * @throws NullPointerException if the parent argument is {@code ...

  9. 【C++深度剖析教程38】类模板深度剖析

    加qq1126137994 微信:liu1126137994 一起学习更多技术!!! 1.多参数类模板 类模板可以定义任意多个不同的类型参数 类模板可以被特化: 指定类模板的特定实现 部分类型参数必须 ...

最新文章

  1. 无线对讲调度服务器,无线对讲系统解决方案
  2. 什么是 CMS - Content Management System
  3. 02-Django基础知识
  4. 蔡高厅老师 - 高等数学阅读笔记 - 15 广义积分和伽马函数 定积分的应用(面积和体积) -(67、68、70、71)
  5. 关于前端一周知识的总结
  6. preferredsize JAVA_Java JScrollPane.getPreferredSize方法代码示例
  7. Proxmark3教程1:用PM3解密复制M1全加密门禁IC卡图文详细介绍
  8. Android 9 (P) Zygote进程启动源码分析指南一
  9. 多层材料热压工艺探索
  10. 用python画一个正方形
  11. 盛唐气象:李白的诗与酒
  12. 深度学习平台的搭建(anaconda-pytorch-pycharm)
  13. php下载文件并重命名,通过php下载文件并重命名
  14. python 画图colorbar 颜色大全 plt.cm.get_cmap
  15. What CANN Can?一辆小车背后的智能故事
  16. 揭秘:如何用主题公园的思路做一款VR高尔夫游戏
  17. 企业即时通讯软件FreeEIM飞鸽传书
  18. 程序人生:Hello’s P2P
  19. C++ 游戏服务端代码分层
  20. mysql里面column是什么意思_column是什么意思

热门文章

  1. FX5U与MT8101IE通信设置
  2. ES2019 中 8 个非常有用的功能
  3. 如何用栈实现深度优先算法-C语言解决迷宫问题
  4. 万维网源代码将作为NFT拍卖
  5. App Store协议刷评论软件源代码
  6. 哈啰、滴滴在出行市场展开全方位争夺战
  7. Ubuntu强制删除文件
  8. 休闲娱乐的计算机配置,电竞馆,升级的不仅仅是电脑配置
  9. uboot和系统移植-第1部分-2.1 uboot学习前传
  10. 韩国ONE store平台支付集成(android)