支持插件的消息中间件【msg broker with plugin】 - 知然 - 博客园
支持插件的消息中间件【msg broker with plugin】 - 知然 - 博客园
支持插件的消息中间件【msg broker with plugin】
支持插件的消息中间件
msg broker with plugin
Msg Broker概念:
msg broker是实现application 之间互通讯的组件。通常为实现application之间的解耦,消息都是通过msg broker完成转发。application只需知道其他applicatipn的逻辑名称,而不需要知道对方的具体位置。Broker中维护一个查找表,记录着哪个application注册在此逻辑名称之下,所以消息总是会被正确的投递到目的地。
msg broker不限于1-1的转发,也支持1-N的模式。其主要功能有:
- 实现多个application的互通讯,而隐藏彼此的位置
- 实现消息个格式的转换,如json to bin
- 安全控制,msg broker可以再转发消息前进行一定程度的安全验证
- 增大系统的可伸缩性,由于application通讯的目标变成了逻辑结点,而该逻辑结点可以对应多个物理结点,理论上可以动态的增加物理结点,来扩展该逻辑结点的吞吐量。
- msg broker可以用来集成服务,并且可以暴楼服务的部分接口
msg broker 具有的缺点是:
- 增加了复杂性,多了一层转发
- 可维护性降低,需要理清msg broker和各个application和服务的关系。
- 降低性能,主要是实时性能下降了,消息需要多转发一边,单次请求的延时大大增加了。
当前流行的Broker的特点和缺点:
Msg Broker的结构:
流行的Broker中间件介绍:
- RabbitMQ
项目地址:http://www.rabbitmq.com/
RabbitMQ是由Erlang开发的以高效、健壮以及高度伸缩性的消息服务器。其所包含的概念有Producer、Consumer、Exchange、Queue。RabbitMQ基于QMQP协议,支持的语言也非常丰富,文档也非常清晰。使用RabbitMQ可以实现订阅发布模型、RPC模型、路由模型等,参见RabbitMQ的例子:http://www.rabbitmq.com/getstarted.html。
但是它有如下局限性:
- RabbitMQ 没有针对连接做控制,它是为高效而生,它对外来的请求是信任的,不存在安全验证,如任何一个client都可以创建消息队列,所以RabbitMQ一定是放在内网的。
- 使用RabbitMQ ,我们是通过Client远程操作RabbitMQ,不能定制RabbitMQ的功能。
- ZeroMQ
项目地址:http://www.zeromq.org/
ZeroMQ是一个Socket封装库,号称是最快的消息内核。ZeroMQ可以支持TCP、UDP、IPC等多种通讯协议。ZeroMQ可以实现的通讯模型就更多了,几乎涵盖了消息通讯的所有模式,参见官网介绍http://www.zeromq.org/intro:read-the-manual 。
其局限性为:
- ZeroMQ虽然封装了消息传输的复杂性,但是它也隐藏了连接的建立、断开等过程。ZeroMQ传输消息更像是udp数据报,使用者不能知道对方何时连接建立、何时连接断开。
我们需要一个不一样的Broker
应用场景介绍
在网络游戏中,cliet和服务器是通过tcp长连接的。相对于HTTP+WebServer的不同在于:
- client连接到服务器,需要进行身份验证,通常是client第一个消息包含身份验证数据如用户名密码等,而验证通过后该连接为可信任连接。
- client 任意时间都可以向服务器发送请求,而不需要服务器立即返回,同样,服务器是在任意时间(当然会有实时性等约束)都可以像client推送消息。
- client断开连接时,服务器必须捕获该事件,以便完成一些数据清理操作。
- client对应的一般是个集群,但是client无从得知细节,因为它只连接最外层的一个,给他取个名字“MsgBroker”。
- Msg Broker 不许有一定的安全控制,如心跳、网络包频率限制等,防范某些可能的攻击。
- Msg Broker需要高度可定制。不同的游戏主要是逻辑不同,而MsgBroker大多大同小异。当然MsgBroker总是会根据需求稍作修改。
- Msg Broker 主要瓶颈是IO操作,因为它涉及大量的网络连接、断开、心跳、广播消息等。而它具有的领域逻辑则非常非常少。所以Msg Broker的逻辑可以使用动态脚本实现,其实时性、效率都能满足要求。
需要的broker具有的功能:
- 能够捕获client连接事件
- 能够捕获client断开事件
- 具有网络心跳功能
- 方便的消息发送接口
- broker可以以client的角色连接到其他Server,因为从其他逻辑角度看,Broker可能是其他服务的使用者。
- Broker 提供消息收发框架,逻辑层通过插件实现。
- 实现插件的方式有
- 动态链接库,可以将逻辑层封装到so链接库中
- python脚本,逻辑层可以有python脚本实现,Broker封装了载入python、调用python,封装消息发送接口到Pyhton
- Lua脚本,逻辑层也可以又Lua脚本实现,Broker封装了载入lua、调用lua、封装消息接口给lua。
Msg Broker 结构图
Msg Broker 的安装使用:
安装依赖库:
由于msg broker支持Python和lua作为插件,那么必须确保linux下安装了相应的头文件。示例中的插件均只实现了echo功能。
- 确保Linux系统安装了Python,推荐python2.6
- 确保安装了Python-devel,如果是centos,直接yum即可。
- 确保安装了Lua-5.1.4, 其他版本没有测试过
- 下载Msg Broker最新源码,目前处于0.1版本
svn co https://ffown.googlecode.com/svn/trunk/
- 编译源码:
- cd trunk/example/plugin_msg_broker/
- make
- 编译动态连接库插件
- cd plugin/plugin_echo_dll/
- sh gen_dll.sh
运行示例插件:
- 运行动态链接库
- ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_dll/libechoso
- 另开终端,telent 127.0.01 10241, 收入5 回车,再输入5个字符,通讯协议是body长度加回车加body,如图:
- 运行Python 脚本示例程序
- ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_py/echo.py
- 同样使用telnet 测试echo功能
- 运行Lua脚本示例程序
- ./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_lua/lua.py
- 同样使用telnet 测试echo功能
插件层设计分析:
插件接口:
#ifndef _PLUGIN_H_ #define _PLUGIN_H_#include "channel.h" #include "message.h"class plugin_i { public:virtual ~plugin_i(){}virtual int start() = 0;virtual int stop() = 0;virtual int handle_broken(channel_ptr_t channel_) = 0;virtual int handle_msg(const message_t& msg_, channel_ptr_t channel_) = 0; };typedef plugin_i* plugin_ptr_t; typedef int (*handle_channel_msg_func_t)(const message_t& msg_, channel_ptr_t); typedef int (*handle_channel_broken_func_t)(channel_ptr_t);#define HANDLE_CHANNEL_MSG "handle_channel_msg" #define HANDLE_CHANNEL_BROKEN "handle_channel_broken" #endif各个接口作用如下:
- start 实现插件载入,环境初始化
- stop实现优雅的退出
- handle msg 为消息到来通知
- handle_broken 为对方连接关闭
Channel 设计
channel 用来表示一个连接,可以理解成socket的抽象,也可直接理解成远程client。
#ifndef _CHANNEL_H_ #define _CHANNEL_H_#include "socket_i.h"class channel_t { public:channel_t(socket_ptr_t sock_);~channel_t();void set_data(void* p);void* get_data() const;template<typename T>T* get_data() const { return (T*)this->get_data(); }void async_send(const string& buff_);void close();private:socket_ptr_t m_socket;void* m_data; };typedef channel_t* channel_ptr_t; #endif各个接口作用如下:
- 构造,channel必须绑定一个socket
- set_data get_data用来操作channel私有数据,如我们可以在channel私有数据中存放该channel对应的uid,这样每个channel之需验证一次,以后自然知道到来的消息属于哪个channel。
- async_send 异步发送消息
- close 关闭连接
动态链接库插件:
流程如下:
- 载入动态库
- 获取动态库的接口,记录函数指针地址
- 若有msg到来,调用动态链接库的handle_msg
- 若连接关闭,调用动态链接库的handl_broken
int plugin_dll_t::start() {m_dll_handler = ::dlopen(m_dll_name.c_str(), RTLD_NOW|RTLD_GLOBAL);if (NULL == m_dll_handler){logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s>", dlerror()));return -1;}m_msg_cb = (handle_channel_msg_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_MSG);m_broken_cb = (handle_channel_broken_func_t)::dlsym(m_dll_handler, HANDLE_CHANNEL_BROKEN);if (NULL == m_msg_cb){logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_MSG));return -1;}if (NULL == m_broken_cb){logerror((PLUGIN_IMPL, "plugin_dll_t::start dlopen failed:<%s> not exist", HANDLE_CHANNEL_BROKEN));return -1;}return 0; }int plugin_dll_t::stop() {::dlclose(m_dll_handler);return 0; }int plugin_dll_t::handle_broken(channel_ptr_t channel_) {return m_broken_cb(channel_); }int plugin_dll_t::handle_msg(const message_t& msg_, channel_ptr_t channel_) {return m_msg_cb(msg_, channel_); }Python 插件
其工作流程如下:
- 初始化Python解释权,将封装的发送消息接口注册到Python虚拟机中
- 设置PythonPath
- 载入python文件
- 若msg到来,调用python全局函数handle_msg
- 若channel断开,调用Python 全局handle_broken 函数
#include "plugin_impl/plugin_python.h" #include "plugin_impl/pyext.h" #include "log_module.h"plugin_python_t::plugin_python_t(const string& name_):m_py_mod(NULL) {string pythonpath = "./";int pos = name_.find_last_of('/');if (-1 == pos){m_py_name = name_;}else{m_py_name = name_.substr(pos+1);pythonpath = name_.substr(0, pos+1);}pos = m_py_name.find_first_of('.');m_py_name = m_py_name.substr(0, pos);Py_InitializeEx(0);Py_SetPythonHome((char*)pythonpath.c_str());initpyext(this);PyRun_SimpleString("import channel;import sys;sys.path.append('./plugin/plugin_echo_py/')"); }plugin_python_t::~plugin_python_t() {Py_Finalize(); }int plugin_python_t::start() {if(load_py_mod()){return -1;} return 0; }int plugin_python_t::stop() {return 0; }int plugin_python_t::load_py_mod() {PyObject *pName, *pModule;pName = PyString_FromString(m_py_name.c_str());pModule = PyImport_Import(pName);if (!pModule ) {Py_DECREF(pName);logerror((PLUGIN_IMPL, "can't find %s.py\n", m_py_name.c_str()));if (PyErr_Occurred()){PyErr_Print();PyErr_Clear();return -1;} return -1;}m_py_mod = PyModule_GetDict(pModule);Py_DECREF(pName);Py_DECREF(pModule);return 0; }int plugin_python_t::handle_broken(channel_ptr_t channel_) { m_channel_mgr.erase(long(channel_));delete channel_;return call_py_handle_broken(long(channel_)); } int plugin_python_t::handle_msg(const message_t& msg_, channel_ptr_t channel_) {m_channel_mgr.insert(make_pair((long)channel_, channel_));return call_py_handle_msg((long)channel_, msg_.get_body().c_str()); }int plugin_python_t::call_py_handle_msg(long val, const char* msg) {PyObject *pDict = m_py_mod;const char* func_name = "handle_msg";PyObject *pFunc, *arglist, *pRetVal;pFunc = PyDict_GetItemString(pDict, func_name);if (!pFunc || !PyCallable_Check(pFunc)) { logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));return -1;}arglist = Py_BuildValue("ls", val, msg);pRetVal = PyObject_CallObject(pFunc, arglist);Py_DECREF(arglist);if (pRetVal){Py_DECREF(pRetVal);}if (PyErr_Occurred()){PyErr_Print();PyErr_Clear();return -1;}return 0; }int plugin_python_t::call_py_handle_broken(long val) {PyObject *pDict = m_py_mod;const char* func_name = "handle_broken";PyObject *pFunc, *arglist, *pRetVal;pFunc = PyDict_GetItemString(pDict, func_name);if (!pFunc || !PyCallable_Check(pFunc)) { logerror((PLUGIN_IMPL, "can't find function [%s]\n", func_name));return -1;}arglist = Py_BuildValue("l", val);pRetVal = PyObject_CallObject(pFunc, arglist);Py_DECREF(arglist);if (pRetVal){Py_DECREF(pRetVal);}if (PyErr_Occurred()){PyErr_Print();PyErr_Clear();return -1;}return 0; }channel_ptr_t plugin_python_t::get_channel(long p) {map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);if (it != m_channel_mgr.end()){return it->second;}return NULL; }Lua 插件:
工作流程如下:
- 初始化lua虚拟机
- 注册发送消息接口给lua
- 载入Lua脚本
- 有msg到来,调用lua的hanle_msg接口
- 有channel断开,调用lua的handle_broken接口
static plugin_lua_t* g_plugin_lua_obj = NULL; static int channel_send_msg(lua_State* ls_) {long ptr = (long)luaL_checknumber(ls_, 1);size_t len = 0;const char* msg = luaL_checklstring(ls_, 2, &len);channel_ptr_t c = g_plugin_lua_obj->get_channel(ptr);if (c){c->async_send(msg);}return 0; }plugin_lua_t::plugin_lua_t(const string& name_):m_ls(NULL) {g_plugin_lua_obj = this;string luapath = "./";int pos = name_.find_last_of('/');if (-1 == pos){m_lua_name = name_;}else{m_lua_name = name_.substr(pos+1);luapath = name_.substr(0, pos+1);}pos = m_lua_name.find_first_of('.');m_lua_name = m_lua_name.substr(0, pos);m_ls = lua_open();lua_checkstack(m_ls, 20);lua_pushcfunction(m_ls, channel_send_msg);lua_setglobal(m_ls, "_tmp_func_");luaL_dostring(m_ls, "channel = {} channel.send = _tmp_func_ _tmp_func_ = nil");string lua_str = "package.path = package.path .. \"" + luapath + "?.lua\"";luaL_openlibs(m_ls);if (luaL_dostring(m_ls, lua_str.c_str())){lua_pop(m_ls, 1);}m_lua_name = name_; }plugin_lua_t::~plugin_lua_t() { }int plugin_lua_t::start() {if (load_lua_mod()){logerror((PLUGIN_IMPL, "can't find %s.lua\n", m_lua_name.c_str()));return -1;}return 0; }int plugin_lua_t::stop() {return 0; }int plugin_lua_t::handle_broken(channel_ptr_t channel_) {m_channel_mgr.erase(long(channel_));delete channel_;return call_lua_handle_broken(long(channel_)); }int plugin_lua_t::handle_msg(const message_t& msg_, channel_ptr_t channel_) {m_channel_mgr.insert(make_pair((long)channel_, channel_));return call_lua_handle_msg((long)channel_, msg_.get_body()); }int plugin_lua_t::load_lua_mod() {if (luaL_dofile(m_ls, m_lua_name.c_str())){lua_pop(m_ls, 1);return -1;}return 0; }int plugin_lua_t::call_lua_handle_msg(long val, const string& msg) {lua_checkstack(m_ls, 20);lua_getglobal(m_ls, "handle_msg");lua_pushnumber(m_ls, val);lua_pushlstring(m_ls, msg.c_str(), msg.size());if (lua_pcall(m_ls, 2, 0, 0) != 0){lua_pop(m_ls, 1);return -1;}return 0; }int plugin_lua_t::call_lua_handle_broken(long val) {lua_checkstack(m_ls, 20);lua_getglobal(m_ls, "handle_broken");lua_pushnumber(m_ls, val);if (lua_pcall(m_ls, 1, 0, 0) != 0){lua_pop(m_ls, 1);return -1;}return 0; }channel_ptr_t plugin_lua_t::get_channel(long p) {map<long, channel_ptr_t>::iterator it = m_channel_mgr.find(p);if (it != m_channel_mgr.end()){return it->second;}return NULL; }msg_broker 待完善的地方:
- 心跳层还未加入
- 插件层报错不够友好
- Python 中封装的channel使用long型,调用send接口时需要从long转化到channel,需要优化一下,直接封装一个channel对象到Python
- Lua中channel的封装暂时也是使用long来表示,具有和上面一样的性能损耗问题
支持插件的消息中间件【msg broker with plugin】 - 知然 - 博客园相关推荐
- 让博客园博客中的图片支持fancybox浏览
我给博客中的图片设置了最大宽度,如果有大点的图片内容会模糊. 所以想实现点击图片可弹窗浏览.还好博客园支持自定义js和css.这样我们可以引入fancybox插件. 实现: 1. 进到配置页面http ...
- 博客园开始对X++语言语法高亮的支持
关注X++的博客园博友有福啦,现在发布X++代码也同样可以实现语法高亮.目前仅支持SyntaxHighlighter插件.请看如下: public void update(boolean _updat ...
- [chrome插件分享] 博客园是个好图床 image-uploader
说明 去年做了一款 windows 下的图片上传程序,[自制小工具含源码--博客园图床ImageBed], 但是这个小工具只适用于 windows 平台,自从换了 Mac 以后就不能用了.于是萌生了再 ...
- 刚刚做了个chrome浏览器 博客园转载插件,欢迎试用,多提意见!
刚刚做了个chrome浏览器 博客园转载插件,欢迎试用,多提意见! https://chrome.google.com/extensions/detail/jplfeohlkffepcpolmenml ...
- EasyUC博客助手 [支持:博客园,MSN/Live空间,CSDN, 博客之家,PJBlog,Z-Blog...]
今天终于可以发布一个版本了.因为之前的考试和春节耽误了一些时间,不过总算还是初步完成了.之前也看到了园里已经有朋友Share了MSN发布博客的机器人的作品.我的这个程序也差不多,只不过是按照自己的想法 ...
- 发布一个博客园专用Windows Live Writer代码插件
一直用Windows Live Writer写博客,不过没找到能与博客园配合得很好的代码插件,每次写完文章发布到博客园总要手动修改代码.所以我自己写了一个博客园专用的Windows Live Writ ...
- 在线文本替换工具 、支持正则表达式(博客园文章里添加Javascript或<script>语句)
概况与介绍 在博客园发布一篇文章,文章就是<在线文本替换工具 .支持正则表达式>https://www.cnblogs.com/lsllll44/articles/15522697.htm ...
- 博客园美化[SimpleMemory主题+tctip插件]
美化前置条件 SimpleMemory主题 官网地址:Document 点击"安装使用"和选择公告 2.参考这里进行设置侧边栏 js部分 css部分 3.我的代码及其效果对照 js ...
- 为博客园选择一个小巧霸气的语法高亮插件
博客园的语法高亮简直蛋疼,于是乎就打算找一个靠谱的插件来改造下. 各种百度谷歌,大致得到几个推荐:SyntaxHighlighter,Snippet,Google Code Pretiffy,High ...
最新文章
- java 正则匹配 sql星号,18. 正则表达式:开头、结尾、任意一个字符、星号和加号匹配...
- indesign如何画弧线_彩铅画入门教程,如何给独角兽设计一款好发型
- 微信朋友圈删除后服务器还有吗,删了的朋友圈还可以找回来吗
- TQ210——核心板和底板
- java中事务的管理
- mysql事务的acid、三种并发问题与四种隔离级别
- 常用的Webserver接口
- 国内首份千款主流安卓应用耗电指标评测报告新鲜出炉!
- python第一周作业--------模拟登录
- 华为关闭telnet命令_华为交换机关闭Telnet
- steam换头像出现服务器错误_steam测试中国版 单机游戏强制防沉迷
- 给视频加水印的软件有哪些?推荐两种软件快速加水印
- Python笔记。超详细的基本语法
- python微信接龙转Excel表格
- java jmf mp3,java播发mp3(不用jmf)
- python分三行将你的学号姓名班级_python第三次作业——叶耀宗
- VMware虚拟机备份和恢复
- python黑色背景rbg_PIL图像转换为RGB,保存为纯黑色图像(python)
- QUIC 协议的简单分析
- 推荐最适合IT人自学的视频网站、社区网站
热门文章
- 【 Linux 】常用命令总结(更新)
- JavaScript入门经典(第4版)
- 【HIMI转载推荐之三】基于Cocos2dx引擎UI扩展引擎包[cocos2d-x-3c]
- Android 系统提供的文件下载
- spring实例教程
- PHP “Warning: session_start()...”、correct (..\..\php5\Temp) in Unknown on line 0 的解决方法...
- ionic开发:第一步
- ASP.NET 5系列教程 (四):向视图中添加服务和发布应用到公有云
- [C#学习]多线程编程——多线程基础
- 记一次python升级版本遇到的事