聊聊C++跨类通信机制之消息总线及其实现
如果没有怎么写过项目,更确切地说是没有用面向对象的语言写过项目。就不会明白跨类通信这个需求是多么刚需。
为什么要跨类通信?把所有的逻辑都写在一个类中,都是一家人,那就不需要通信了啊。是,确实是这样,当功能不是很多的时候,这样做确实好。但是,随着项目的规模增大,各种功能堆积,各种模块的增加,会使得这个类非常臃肿与庞大。臃肿庞大就意味着不好维护和扩展。
因此,我们需要把功能划分出来,把模块划分得细一些,尽量做到类的职责单一且明显。这样可以做到高内聚,低耦合。
既然需要很多类,那么类与类之间必然会存在一些关系,如何来维护?对,就是跨类通信,沟通起来。
想想之前,两个.c
文件需要通信时,我们会怎么做,搞一个全局变量就可以了啊。
对,跨类通信就是通过全局变量来做到的(好像也只有这个办法哦),这里也要说下原理:我们实现一个程序,这个程序有很多类,那么最终这些类肯定是组成了一个进程,而全局变量在一个进程中是共享的。
既然全局变量可以做到跨类通信,那么任意两个类之间需要通信就要搞一个全局变量。这样子全局变量也变得臃肿,这不是我们想要的。
概念
我们可以参考计算机硬件的一个概念:消息总线。这个概念其实是把网状结构,变成了星性结构,类与类的通信有一个中心机构(消息总线)管理。
设计
消息总线,有消息二字。这是什么意思呢?我需要你干什么,我给你发个消息,你就把这个事给干了。对,这就是消息总线的设计。大体设计就是这样,现在来看看消息总线需要哪些原料。
首先,我们要提前写好处理消息的函数,然后等待别人发消息,这里,我们要把消息和函数对应起来,因此这里选择std::multimap
,为什么不选择std::map
呢,因为,同一个消息可能有几个函数对之感兴趣;
设计的过程中一定要考虑到同步和异步,尤其是异步,因为潜意识中都会写同步,不会怎么考虑异步,考虑这样一种情况:我现在很渴,给你发个消息,让你去楼下买水,然而你不怎么锻炼,买水送给我这个过程需要一个小时,你说我是等一个小时再喝水还是等个十分钟,看看你回来没,如果没回来就叫别人去买?这里就牵扯到消息总线的同步和异步,这个在程序中就有体现。
基于上面需要异步以及等待,因此消息总线还需要一个线程池支持异步以及一个相对任务定时器来等待。
然后就跑下面的流程图就行了:
这里提供MessageBus
的声明,可以看看消息总线有哪些功能:
#include <experimental/any>class MessageBus
{public:using any = std::experimental::any;typedef std::function<void(any, any)> FUN;typedef std::function<void(any, any)> AsynTimeoutCallback;public:MessageBus() = default;~MessageBus() = default;
private:MessageBus(const MessageBus &) = delete;MessageBus(MessageBus &&) = delete;MessageBus & operator = (const MessageBus &) = delete;MessageBus & operator = (MessageBus &&) = delete;public:uint32_t RigisterMessageByMainType(uint32_t mainType, const FUN & callback);uint32_t RigisterMessage(uint32_t mainType, uint32_t minorType, const FUN & callback);void CacelMessageByID(uint32_t id);void CacelMessageByMainType(uint32_t mainType);void CacelMessage(uint32_t mainType, uint32_t mimorType);void SyncSendMessageByMainType(uint32_t mainType, any mainAny = any(), any minorAny = any());void SyncSendMessage(uint32_t mainType, uint32_t minorType, any mainAny = any(), any minorAny = any());void AsynSendMessageByMainType(uint32_t mainType, std::chrono::seconds d = std::chrono::seconds(1),AsynTimeoutCallback = [] (any, any) {}, any mainAny = any(), any minorAny = any());void AsynSendMessage(uint32_t mainType, uint32_t minorType, std::chrono::seconds d = std::chrono::seconds(1),AsynTimeoutCallback = [] (any, any) {}, any mainAny = any(), any minorAny = any());private:void DoFunctionByMainType(uint32_t mainType, any mainAny, any minorAny);void DoFunction(uint32_t mainType, uint32_t minorType, any mainAny, any minorAny);
private:struct Blob{Blob() {}Blob(uint32_t id, const FUN & fun) : id(id), fun(fun) {}uint32_t id;FUN fun = nullptr;};struct CompByMainType{bool operator () (const uint32_t val, const std::pair< std::pair<uint32_t , uint32_t>, std::shared_ptr<Blob> > & ele);bool operator () (const std::pair< std::pair<uint32_t , uint32_t>, std::shared_ptr<Blob> > & ele, const uint32_t val);};
private:std::mutex mutex_;uint32_t id_;std::multimap< std::pair<uint32_t, uint32_t>, std::shared_ptr<Blob> > blobs_;ThreadPool pool_;RelativeTimer relativeTimer_;
};
实现
bool MessageBus::CompByMainType::operator()(const std::pair<std::pair<uint32_t, uint32_t>, std::shared_ptr<MessageBus::Blob>> &ele, const uint32_t val)
{return ele.first.first < val;
}bool MessageBus::CompByMainType::operator()(const uint32_t val,const std::pair<std::pair<uint32_t, uint32_t>, std::shared_ptr<MessageBus::Blob>> &ele)
{return val < ele.first.first;
}uint32_t MessageBus::RigisterMessage(uint32_t mainType, uint32_t minorType, const MessageBus::FUN &callback)
{std::lock_guard<std::mutex> lock(mutex_);blobs_.emplace(std::make_pair(mainType, minorType), std::make_shared<Blob>(id_, callback));return id_++;
}uint32_t MessageBus::RigisterMessageByMainType(uint32_t mainType, const MessageBus::FUN &callback)
{std::lock_guard<std::mutex> lock(mutex_);blobs_.emplace(std::make_pair(mainType, -1), std::make_shared<Blob>(id_, callback));return id_++;
}void MessageBus::CacelMessage(uint32_t mainType, uint32_t mimorType)
{std::lock_guard<std::mutex> lock(mutex_);auto range = blobs_.equal_range(std::make_pair(mainType, mimorType));blobs_.erase(range.first, range.second);
}void MessageBus::CacelMessageByMainType(uint32_t mainType)
{std::lock_guard<std::mutex> lock(mutex_);auto range = std::equal_range(std::begin(blobs_), std::end(blobs_), mainType, CompByMainType {});blobs_.erase(range.first, range.second);
}void MessageBus::CacelMessageByID(uint32_t id)
{std::lock_guard<std::mutex> lock(mutex_);blobs_.erase(std::find_if(std::begin(blobs_), std::end(blobs_),[id] (const std::pair< std::pair<uint32_t , uint32_t >, std::shared_ptr<Blob> > & ele){return id == ele.second->id;}));
}void MessageBus::DoFunction(uint32_t mainType, uint32_t minorType, any mainAny, any minorAny)
{std::vector< std::shared_ptr<Blob> > vec;{std::lock_guard<std::mutex> lock(mutex_);auto range = blobs_.equal_range(std::make_pair(mainType, minorType));for (auto it = range.first; it != range.second; vec.push_back(it++->second)) {}}std::for_each(std::begin(vec), std::end(vec), [=] (const std::shared_ptr<Blob> & blob){blob->fun(mainAny, minorAny);});
}void MessageBus::DoFunctionByMainType(uint32_t mainType, MessageBus::any mainAny, MessageBus::any minorAny)
{std::vector< std::shared_ptr<Blob> > vec;{std::lock_guard<std::mutex> lock(mutex_);auto range = std::equal_range(std::begin(blobs_), std::end(blobs_), mainType, CompByMainType {});for (auto it = range.first; it != range.second; vec.push_back(it++->second)) {}}std::for_each(std::begin(vec), std::end(vec), [=] (const std::shared_ptr<Blob> & blob){blob->fun(mainAny, minorAny);});
}void MessageBus::SyncSendMessage(uint32_t mainType, uint32_t minorType, MessageBus::any mainAny, MessageBus::any minorAny)
{DoFunction(mainType, minorType, mainAny, minorAny);
}void MessageBus::SyncSendMessageByMainType(uint32_t mainType, MessageBus::any mainAny, MessageBus::any minorAny)
{DoFunctionByMainType(mainType, mainAny, minorAny);
}void MessageBus::AsynSendMessage(uint32_t mainType, uint32_t minorType, std::chrono::seconds d,MessageBus::AsynTimeoutCallback callback, MessageBus::any mainAny, MessageBus::any minorAny)
{std::shared_future<void> future = pool_.Submit([=] (){DoFunction(mainType, minorType, mainAny, minorAny);});relativeTimer_.AddTimerTask(std::to_string((static_cast<uint64_t>(mainType) << 32) + minorType), [=] (){if (future.wait_for(std::chrono::seconds::zero()) != std::future_status::ready) callback(mainAny, minorAny);}, d, false, true);
}void MessageBus::AsynSendMessageByMainType(uint32_t mainType, std::chrono::seconds d,MessageBus::AsynTimeoutCallback callback, MessageBus::any mainAny, MessageBus::any minorAny)
{std::shared_future<void> future = pool_.Submit([=] (){DoFunctionByMainType(mainType, mainAny, minorAny);});relativeTimer_.AddTimerTask(std::to_string((static_cast<uint64_t>(mainType) << 32) + static_cast<uint32_t>(-1)),[=] (){if (future.wait_for(std::chrono::seconds::zero()) != std::future_status::ready) callback(mainAny, minorAny);}, d, false, true);
}
细节
std::experimental::any
为了实现函数签名的统一性,参数类型使用any
,标准库中的any
是C++17才有的内容,这里使用<experimental>
头文件中的any
,any
可以擦除类型信息,有点类似void *
指针。
struct CompByMainType
我设置了主消息号和副消息号,这样可以同一类的消息分在一个主消息号中。有利于管理和维护,但是有时候我们需要给所有注册主消息号的函数发消息,因此需要这个比较函数。
异步
仔细看看异步发送消息那个函数。把消息处理函数提交到线程池中,然后开一个定时任务,如果在规定时间内,消息处理函数没有完成,那么就调用超时处理函数;
future
是用了std::shared_future
,这是因为std::future
是不可复制的,而lambdalambdalambda表达式需要复制一次future
,因此选用可复制的std::shared_future
。
测试
消息总线通常用来跨类通信,由于是测试的小程序,就在main
函数中测试消息总线,只需要观察控制台的输出是否符合期望就好了。
using namespace std::experimental;MessageBus messageBus;int main()
{messageBus.RigisterMessage(0, 0, [] (any, any){std::cout << "0 + 0" << std::endl;});messageBus.RigisterMessage(0, 0, [] (any, any){std::cout << "00 + 00" << std::endl;});messageBus.RigisterMessageByMainType(0, [] (any, any){std::cout << "0" << std::endl;});messageBus.RigisterMessageByMainType(2, [] (any, any){/* // */std::this_thread::sleep_for(std::chrono::seconds(5));std::cout << "2222" << std::endl;});int data = 1, data2 = 1;messageBus.RigisterMessageByMainType(3, [] (any mainAny, any){int *p = any_cast<int *>(mainAny);*p = 2;std::cout << "3333" << std::endl;});messageBus.RigisterMessageByMainType(4, [] (any mainAny, any){int *p = any_cast<int *>(mainAny);*p = 2;std::cout << "4444" << std::endl;std::this_thread::sleep_for(std::chrono::hours(1));});messageBus.SyncSendMessage(0, 0);std::cout << "-------" << std::endl;messageBus.SyncSendMessageByMainType(0);std::cout << "-------" << std::endl;messageBus.AsynSendMessageByMainType(2, std::chrono::seconds(2), [] (any, any){std::cout << "timeout" << std::endl;});std::cout << "-------" << std::endl;std::cout << "before data : " << data << std::endl;messageBus.SyncSendMessageByMainType(3, &data);std::cout << "after data : " << data << std::endl;std::cout << "-------" << std::endl;std::cout << "-------" << std::endl;std::cout << "before data2 : " << data2 << std::endl;messageBus.AsynSendMessageByMainType(4, std::chrono::seconds(2), [] (any mainAny, any){int *p = any_cast<int *>(mainAny);*p = 2;std::cout << "timeout" << std::endl;}, &data2);std::cout << "after data2 : " << data2 << std::endl;std::cout << "-------" << std::endl;while (true){std::this_thread::sleep_for(std::chrono::seconds(5));}return 0;
}
参考:
- 白话跨平台C++线程池实现
- 聊聊C++任务定时器的设计与具体实现
- Zeus框架
聊聊C++跨类通信机制之消息总线及其实现相关推荐
- HTML5项目笔记8:使用HTML5 的跨域通信机制进行数据同步
离线应用系统的设计目标就是在网络离线情况下依然可以操作我们的应用系统,并在网络畅通的情况下与服务器进行数据交互. 所以离线应用系统最终会做成类似C/S架构的客户端应用程序.这边基于Chrome或者 S ...
- springcloud微服务架构开发实战:分布式消息总线
消息总线的定义 前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制. 在3.6节中也详细介绍了消息通信的实现方式.消息总线就是一种基于消息的通信机制. 消息总线是 ...
- Android跨进程通信Binder机制与AIDL实例
文章目录 进程通信 1.1 进程空间划分 1.2 跨进程通信IPC 1.3 Linux跨进程通信 1.4 Android进程通信 Binder跨进程通信 2.1 Binder简介 2.2 Binder ...
- 【朝花夕拾】Android跨进程通信总结篇
前言 原文:https://www.cnblogs.com/andy-songwei/p/10256379.html 只要是面试高级工程师岗位,Android跨进程通信就是最受面试官青睐的知识点之一. ...
- 【朝花夕拾】Android性能篇之(七)Android跨进程通信篇
前言 转载请声明,转自[https://www.cnblogs.com/andy-songwei/p/10256379.html],谢谢! 只要是面试高级工程师岗位,Android跨进程通信就是最受面 ...
- 【朝花夕拾】Android性能篇之(七)Android跨进程通信篇...
前言 原文:https://www.cnblogs.com/andy-songwei/p/10256379.html 只要是面试高级工程师岗位,Android跨进程通信就是最受面试官青睐的知识点之一. ...
- linux 跨进程读取内存,Android之Linux跨进程通信的方式
As we all know,Android是基于Linux内核开发的,而市面上几乎所有的App都离开跨进程通信.可能你会说Android是通过Binder完成进程之间的通信的.但是Binder是怎么 ...
- 再谈Android Binder跨进程通信原理
在谈Android的跨进程通信问题上时,总会问到Android的IPC机制,是指两个进程之间进行数据交换的过程.按操作系统的中的描述,线程是CPU调度最小的单元,同时线程是一种有限的系统资源,而进程是 ...
- 【Binder】Android 跨进程通信原理解析
前言 在Android开发的过程中,用到跨进程通信的地方非常非常多,我们所使用的Activity.Service等组件都需要和AMS进行跨进程通信,而这种跨进程的通信都是由Binder完成的. 甚至一 ...
最新文章
- 满足实时人工智能的计算需求
- 《HTML5网页开发实例详解》连载(四)HTML5中的FileSystem接口
- java sort()怎么实现的_Java中Array.sort()的排序原理
- 机器学习算法Python实现:word2vec 求词语相似度
- 【转载保存】java8新特性学习
- layer右下脚弹窗
- 吴恩达深度学习——浅层神经网络
- java debug try catch 打印发生错误的代码的详细信息 代码行数
- 读书笔记:普林斯顿微积分读本
- 【论文学习】10、物联网安全WiFi设备的监控与识别
- 转载.NET技术-.NET各大网站-编程技术网址
- obs摄像头模糊_【OBS虚拟摄像头插件】OBS虚拟摄像头下载OBS VirtualCam v2020 官方版-趣致软件园...
- Linux crontab 定时执行任务
- 单频点单输入功率只含基波X模型的提取与验证
- 求圆和直线之间的交点
- 6 cocos2dx粒子效果,类图关系,系统原生粒子和自定义粒子效果,粒子编译器软件,爆炸粒子效果,烟花效果,火焰效果,流星效果,漩涡粒子效果,雪花效果,烟雾效果,太阳效果,下雨效果
- centos7连接外网详细教程
- 利用闲置电脑安装虚拟机搭建hadoop集群
- 云闪付怎么对接三方php,第三方支付-银联云闪付开发教程
- java项目-第137期jsp+servlet的周公算命预测系统-java毕业设计