ASIO协程彻底转变你的思维
avbot 发布了许久了, 最近突然有个用户跑来说,希望能增加个调用 “外部脚本” 的功能,方便扩展。
我一向对设计一个 plugin 机制极力的避免,不喜欢动态载入的模块扩展程序本身的功能。何况 avbot 是 c++开发的,调用脚本并不是容易的事情。(好吧,真实的原因是我被 mingw (VC 不支持 utf8源码,我已经抛弃了) 折腾怕了,不想再搞个 python 。windows实在是恐怖的平台,写点程序麻烦的要死,编译麻烦的要死。可是 avbot 又必须跨平台,结果是我一天写好的东西要在 windows (虚拟机) 里折腾好几天,累死人 )
于是我决定提供一个 JSON 接口,内置一个简单的 HTTP Server, 用脚本(python应该 HTTP JSON 模块有的是,对吧)连接到 avbot ,然后 avbot 将发生的每条消息以 json 的形式返回给 外部脚本。
另外,默认使用 HTTP 的connection: keep-alive 模式,所以保持一个长连接即可。
那么,avbot 需要支持 不确定数目的消息接收方了。
对于链接到 avbot 的客户端而言, avbot 并不保留之前的所有消息,而是从连接上的那一刻开始,后续的消息才能通知到。
一个很明显的思路就是,将链接上的客户端做成一个链表/列队, avbot 收到消息后,遍历这个列队执行消息发送。
这个思路很简单,可是如果要求 : 必须单线程异步呢?
avbot 是一个纯粹的单线程程序,绝对不允许多线程化。所有的逻辑必须使用异步处理。
那么,这个问题就复杂化了, “avbot 收到消息后,遍历这个列队执行消息发送” 这个做法,不可避免的带来了阻塞。好吧,异步遍历吧。
要是异步遍历还没遍历完,又来一个消息呢? 考虑这个问题,你会发疯的。因为异步,太多的细节需要考虑了。真的。
好吧,又有个好主意了,为每个客户端建立一个列队,每次遍历就是把要发送的消息挂入列队即可。这样也不需要异步遍历了,同步就可以。解决了异步遍历的时候又来一个消息导致的痛苦的调度。
然后细分,考虑每个客户端,就是等待 “发送列队” 不为空!等等,一直这么等待也不行,如果客户断开了链接呢? 所以要 “同时等待发送列队不为空&&客户正常在线,并且已经发送了 HTTP 请求头部”
好绕口,不过也只能如此了。
avbot 因为默认使用了 keep-alive , 所以发送是一个死循环,知道客户端主动断开链接或者网络发生错误。如果 客户端死了,那么,发送列队兴许会出现 爆队 的情况。所以要限制发送列队的大小。不是满了就不发送,而是满了后就把早的消息踢掉,也就是让 客户端发生“暂时性卡死”后,还能继续处理最后的几条信息。
诶,复杂的逻辑终于理清了,代码呢?!
啊累?
靠,这么复杂的 逻辑,得写一长段代码,调试几百年了吧?
错,我只花了几个小时, 不到 100 行的代码就轻松实现了全部要求。
!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!
这种功能不可能不用个千把行代码的吧?!
如果使用以前的老办法,确实如此。
可是,自从发现了 ASIO 后,我被 ASIO 爸爸发明的协程深深的 震惊了!
利用 ASIO 爸爸提出的协程思想,我只用了不到 100行代码就全部完成了以上复杂的逻辑,而且,全部都是异步的哦~ 。
好,废话不多,先贴代码。然后解释。
- // avbot_rpc_server 由 acceptor_server 这个辅助类调用
- // 为其构造函数传入一个 m_socket, 是 shared_ptr 的.
- class avbot_rpc_server
- {
- public:
- typedef boost::signals2::signal<
- void( std::string protocol, std::string room, std::string who,
- std::string message, sender_flags )
- > on_message_signal_type;
- static on_message_signal_type on_message;
- typedef boost::asio::ip::tcp Protocol;
- typedef boost::asio::basic_stream_socket<Protocol> socket_type;
- typedef void result_type;
- avbot_rpc_server(boost::shared_ptr<socket_type> _socket)
- : m_socket(_socket)
- , m_request(new boost::asio::streambuf)
- , m_responses(new boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> >(20) )
- {
- m_socket->get_io_service().post(
- boost::asio::detail::bind_handler(*this, boost::coro::coroutine(), boost::system::error_code(), 0)
- );
- }
- // 数据操作跑这里,嘻嘻.
- void operator()(boost::coro::coroutine coro, boost::system::error_code ec, std::size_t bytestransfered)
- {
- boost::shared_ptr<boost::asio::streambuf> sendbuf;
- if (ec){
- m_socket->close(ec);
- // 看来不是 HTTP 客户端,诶,滚蛋啊!
- // 沉默,直接关闭链接. 取消信号注册.
- if (m_connect && m_connect->connected())
- m_connect->disconnect();
- return;
- }
- CORO_REENTER(&coro)
- {
- do{
- // 发起 HTTP 处理操作.
- _yield boost::asio::async_read_until(*m_socket, *m_request, "\r\n\r\n", boost::bind(*this, coro, _1, _2));
- m_request->consume(bytestransfered);
- // 解析 HTTP
- // 等待消息.
- if (m_responses->empty())
- {
- if (!m_connect){
- // 将自己注册到 avbot 的 signal 去
- // 等 有消息的时候,on_message 被调用,也就是下面的 operator() 被调用.
- _yield m_connect = boost::make_shared<boost::signals2::connection>
- (on_message.connect(boost::bind(*this, coro, _1, _2, _3, _4, _5)));
- // 就这么退出了,但是消息来的时候,om_message 被调用,然后下面的那个
- // operator() 就被调用了,那个 operator() 接着就会重新回调本 operator()
- // 结果就是随着 coroutine 的作用,代码进入这一行,然后退出 if 判定
- // 然后进入发送过程.
- }else{
- // 如果已经注册,直接返回。时候如果消息来了,on_message 被调用,也就
- // 是下面的 operator() 被调用. 结果就是随着 coroutine 的作用,代码
- // 进入上面那行,然后退出 if 判定。然后进入发送过程.
- return;
- }
- // signals2 回调的时候会进入到这一行.
- }
- // 进入发送过程
- sendbuf = m_responses->front();
- _yield boost::asio::async_write(*m_socket, *sendbuf, boost::bind(*this, coro, _1, _2) );
- m_responses->pop_front();
- // 写好了,重新开始我们的处理吧!
- }while(1);
- }
- }
- // signal 的回调到了这里, 这里我们要区分对方是不是用了 keep-alive 呢.
- void operator()(boost::coro::coroutine coro, std::string protocol, std::string room, std::string who, std::string message, sender_flags)
- {
- pt::ptree jsonmessage;
- boost::shared_ptr<boost::asio::streambuf> buf(new boost::asio::streambuf);
- std::ostream stream(buf.get());
- std::stringstream teststream;
- jsonmessage.put("protocol", protocol);
- jsonmessage.put("root", room);
- jsonmessage.put("who", who);
- jsonmessage.put("msg", message);
- js::write_json(teststream, jsonmessage);
- // 直接写入 json 格式的消息吧!
- stream << "HTTP/1.1 200 OK\r\n" << "Content-type: application/json\r\n";
- stream << "connection: keep-alive\r\n" << "Content-length: ";
- stream << teststream.str().length() << "\r\n\r\n";
- js::write_json(stream, jsonmessage);
- // 检查 发送缓冲区.
- if (m_responses->empty()){
- // 打通仁督脉.
- m_socket->get_io_service().post(boost::asio::detail::bind_handler(*this, coro, boost::system::error_code(), 0));
- }
- // 写入 m_responses
- m_responses->push_back(buf);
- }
- private:
- boost::shared_ptr<socket_type> m_socket;
- boost::shared_ptr<boost::signals2::connection> m_connect;
- boost::shared_ptr<boost::asio::streambuf> m_request;
- boost::shared_ptr<boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> > > m_responses;
- };
- }
复制代码
首先这个 avbot_rpc_server 由一个 acceptor_service 辅助类调用。 acceptor_service 是一个模板,大家可以去 acceptor_server.hpp 膜拜。
acceptor_service 以 Protocol 和一个 处理类 为模板。在 main.cpp里,我以 asio::ip::tcp 作为 Protocl 的参数 avbot_rpc_server为 ProtocolProcesser的参数 调用acceptor_service。acceptor_service 进入一个死循环(协程的)不停的 accept , 然后将 accept 到的 socket 交给 ProtocolProcesser,也就是 avbot_rpc_server 。
avbot_rpc_server 处理一下客户的请求头,然后把自己注册到 on_message 信号处理。
然后,然后就没然后了。
on_message 在 avbot 接收到消息的时候发出。结果就是 avbot_rpc_server 的 第二个 operator() 被调用。然后就继续发送了。
当然,并不是每一个 on_message 都会导致 avbot_rpc_server 的 第二个 operator() 被调用的,必须是列队为空的时候。不为空的时候就不需要调用。发送循环会继续循环的,避免竞争出现
ASIO协程彻底转变你的思维相关推荐
- asio c++20 协程在windows下和linux下设定
c++20 协程 无栈协程 co_spawn 创建 1 个新协程 co_await 设置协程可主动让出 CPU 片的代码点 1.windows 下比较简单 在vs2017 里面加上一条c++ 命令 / ...
- Boost中的协程—Boost.Asio中的coroutine类
Boost.Asio中有两处涉及协程,本文介绍其中的coroutine类. Boost.Asio中的stackless协程是由coroutine类和一些宏来实现的.coroutine类非常简单,包括四 ...
- boost asio——基于协程的TCP服务器
boost的asio提供了无锁队列和协程.基于这两者可以搭建一个用于基础通讯的tcp服务器.TCP服务器可以自己创建连接连接外部主机也可以在本地端口监听并形成回话. 首先定义接口: #pragma o ...
- 基于协程的并发框架orchid简介
2019独角兽企业重金招聘Python工程师标准>>> orchid简介 什么是orchid? orchid是一个构建于boost库基础上的C++库,类似于python下的geven ...
- java 协程框架_GitHub - yaozhang0105/dactor: Dactor是基于Java的轻量级同步异步统一处理框架,基于协程思想构建...
DActor Introduction DActor框架基于协程思想设计,可同时支持同步和异步代码,简化在线异步代码的开发,用同步代码的思维来开发异步代码,兼顾异步代码的高并发.无阻塞和同步代码的易读 ...
- C++ 协程与网络编程
协程 协程,即协作式程序,其思想是,一系列互相依赖的协程间依次使用CPU,每次只有一个协程工作,而其他协程处于休眠状态.协程可以在运行期间的某个点上暂停执行,并在恢复运行时从暂停的点上继续执行. 协 ...
- 干货 | 携程基于Quasar协程的NIO实践
作者简介 Ryan,携程Java开发工程师,对高并发.网络编程等领域有浓厚兴趣. IO密集型系统在高并发场景下,会有大量线程处于阻塞状态,性能低下,JAVA上成熟的非阻塞IO(NIO)技术可解决该问题 ...
- cmake导入so库_libgo - 协程库、并行编程库
libgo是一个使用C++11编写的协作式调度的stackful协程库, 同时也是一个强大的并行编程库, 是专为Linux服务端程序开发设计的底层框架. 目前支持三个平台: Linux (GCC4.8 ...
- 基于Boost::beast模块的HTTP客户端协程
基于Boost::beast模块的异步HTTP客户端协程 实现功能 C++实现代码 实现功能 基于Boost::beast模块的HTTP客户端协程 C++实现代码 #include <boost ...
最新文章
- Kotlin替换Dagger2/Hilt的依赖注入框架--Koin。
- 卫星参数大全_【视频】早期国外做工精良的海事卫星电话机拆解
- Vue + webpack 项目配置化、接口请求统一管理
- android listview asynctask,关于android:ListView + ArrayList + AsyncTask
- springboot整合jpa_SpringBoot与SpringDataJPA整合 Ehcache
- 基于FPGA实现ADC7768数据采集系统设计(8路)
- 万亿级新基建战场,阿里云的安全“防线”
- java自学难点_学习JAVA遇到的难点总结
- POJ-1903 Jurassic Remains
- Vue路由如何设置导航选中(高亮)
- 如何让语音芯片与功放芯片之间更好的配合,使得产品音效更好
- ym——Android从零开始(3)(常用控件+下拉框视图)(新)
- python处理FITS 3:处理头文件和数据单元
- Java使用阿里云视频点播
- 您的云,您做主:Google Distributed Cloud Hosted 全面可用
- 分布式事务(三):分布式事务解决方案之TCC(Try、Confirm、Cancel)
- python 过滤相似图片_Python过滤纯色图片,挑选视频封面
- python入门教程NO.1 用python打印你的宠物小精灵吧
- 小程序图片缓存策略(不改代码更换OSS图片)
- 一分钟教你知道乐观锁和悲观锁的区别
热门文章
- iPhone7p与iPhoneX布局出现右边白边问题
- android evaluater_android通过WebView的evaluateJavascript()调用JS
- sqlserver安装时尽量少的占用c盘_安装3dmax出现command line option 报错,如何解决
- 获取最大轮廓 opencv
- 【C++】46.宏定义##连接符和符#的使用
- 走近人脸检测:从VJ到深度学习(下)
- 日志 log4j.xml配置详解
- 论面向组合子程序设计方法 之 燃烧的荆棘
- 10个实用的但偏执的Java编程技术
- Java 8的6个问题