TeamTalk源码分析之win-client
windows的程序的基本原理:
- windows消息机制(消息如何产生、如何发送、如何处理,常见的消息有哪些、消息的优先级、如何自定义消息、窗体消息、常用控件消息)
- gdi原理(要熟悉gdi的各种对象,如画笔、画刷、字体、区域、裁剪、位图等,熟悉它们的API,熟悉各种gdi绘图API、当然最好也要熟悉一整套的gdi+的类,gdi与gdi+的区别)
- windows进程与线程的概念(进程的概念、如何创建、如何结束、跨进程如何通信;线程的创建与销毁、线程间的同步与资源保护,熟悉windows常用的线程同步对象:临界区、事件、互斥体、信号量等)
- windows内存管理(清晰地掌握一个进程地址空间的内存分布、windows堆的创建与管理等)
- dll技术(dll的生成、变量的导出、函数的导出、类的导出、如何查看dll导出哪些函数、隐式dll的加载、显示dll的加载、远程dll注入技术等)
- PE文件(一个PE文件的结构、有哪些节、如何修改、分别映射到进程地址空间的什么位置等)
- windows SEH(结构化异常处理)
- windows socket编程
- windows读写文件技术(像CreateFile、WriteFile、GetFileSize等这些API应该熟练掌握、内存映射技术)
当然很多必备的技术也不好归类到windows技术下面,比如socket编程,这涉及到很多网络的知识,比如tcp的三次握手,数据的收发等,还有就是各种字符编码的知识、以及之间的相互转换,又比如一系列的CRT函数及其对应的宽字符版本。当然如果你搞windows开发,一定要熟悉开发工具Visual Studio,熟悉其工程项目的大多数属性配置,而且要做到知其然也知其所以然。
一、win-client:
1、解析参数:采用jsoncpp;
示例:登陆msg_server服务时候从config.dat中取配置信息;
2、界面:从XML中获取控件等UI界面是用的Duilib界面库画出来的;
示例:登陆界面的tipText框,提示内容取自:chinese.ini
Duilib是teamtalk使用的一款开源界面库,该界面库模仿web开发中的布局技术,使用xml文件来布局windows界面,并且在主窗口上绘制所有子控件,也就是所谓的directUI技术;
3、日志:yaolog记录日志;格式:LOG__(szLogID, szFormat, …);
示例:LOG__(ERR,_T(“bad function call-m_keyId:%d”),m_keyId);
4、音频处理:
4.1、libogg是一个C++库,用来处理 Ogg 多流传输格式的解码器;libogg是一个语音库,用来解析声音文件的,因为pc客户端可能会收到移动端的语音聊天,相比较传统的*.wav、.mp3、.wma,*.ogg格式的不仅音质高,而且音频文件的体积小,腾讯的QQ游戏英雄杀中的语音也是使用这个格式的。
4.2、Speex是一套专门用于压缩声音的库,压缩声音的性能非常高;
5、消息处理:Tcp Socket;netlib库;
6、数据报文的结构:采用Protocol Buffer(简称PB)是google 的一种数据交换的格式;
7、http请求:采用httpclient;httpclient功能是程序中使用的http请求库,登录前程序会先连接服务器的login_server以获得后续需要登录的msg_server的ip地址和端口号 等信息,这里就是使用的http协议,同时聊天过程中收发的聊天图片与图片服务器msfs也使用http协议来收发这些图片;
8、表情处理:采用GifSmiley动态gif,bmp,jpg等图片文件;GifSmiley是程序中用来解析和显示gif格式的图片的库,以支持gif图片的动画效果;
9、Modules就是TeamTalk中使用的各种库了;
10、network是teamtalk使用的网络通信的代码,其实teamtalk pc端和服务器端使用的是同一套网络通信库,只不过如果服务器运行在linux下,其核心的IO复用模型是epoll,而pc客户端使用的IO复用模型是select;
11、speexdec 也是和ogg格式相关的编码和解码器;
12、teamtalk是主程序入口工程;
13、utility包含了teamtalk中用到的一些工具类工程,比如sqlite的包装接口、md5工具类等。
二、php_Server:采用CodeIgniter框架;
三、以下是Duilib的一些网址
很多都需要climb over the wall后才能访问。
官网: www.duilib.com
论坛: bbs.duilib.com
qq群: 153787916(1群),79145400(2群),1507570(3群)
google code项目托管地址: http://code.google.com/p/duilib/
svn: http://duilib.googlecode.com/svn/trunk/
新浪微博: http://weibo.com/duilib
腾讯微博: http://t.qq.com/duilib
四、客户端的具体页面布局
xml文件在VS工具中的目录如图1所示:
图1
这些xml源代码文件放在文件夹E:\tools\mogu.io\TeamTalk-master\win-client\bin\teamtalk\gui\下面。
修改举例:详细信息页面的用户名显示不全,我用UIDesigner工具打开了UserDetailInfoDialog.xml这个页面,并将width改为了80。如图2
图2
接下来具体分析一下win-client的源码包:VS打开后,如图3,红色标注的我在第一篇博客里面讲到过的涉及采用到其他第三方库。这里就先不去深入学习了,后面涉及到那块就学哪块吧。
图3
接下来重点讲一下其他部分:
Sln_define:解决方案的全局性配置(GlobalConfig.h)和全局变量(GlobalDefine.h)的定义。
Utility顾名思义:公用模块,如图4:
图4
1、CppSQLite3是对SQLite的API进行了二次封装的类。
参考资料:
http://blog.csdn.net/stan1989/article/details/8589293
http://www.codeproject.com/Articles/6343/CppSQLite-C-Wrapper-for-SQLite/
2、IniOperation:对ini文件进行读写操作的API的封装。
3、Md5:对md5加密算法的封装。
4、Multilingual:多语言支持包,当前只有一个从win-client\bin\teamtalk\chinese.ini文件里面取中文翻译的函数。
5、TTThread:多线程的封装(创建、销毁、挂起等)。
6、utilCommonAPI:常规的自定义公用函数
原作者 快刀kuaidao@mogujie.com 同志,一个函数的注释也没有,还好都可以顾名思义,我注释了如下,不对的地方请网友指正:
//二进制转十六进制再转字符串
CString binToHexToCString(const unsigned char * data, size_t len)//获取md5字符串
CString getMd5CString(const char* pSrc, size_t length)//获取当前进程已加载模块的文件的完整路径
CString getAppPath()//获取当前进程已加载模块的文件的父路径
CString getParentAppPath()//创建路径中的全部文件夹
BOOL createAllDirectories(CString & csDir)//基于BKDR的哈希算法
UInt32 hash_BKDR(const char* str)//消息泵(抽取消息、翻译消息、分发消息)
void messagePump()//将空间单位转换为人类可读的单位
std::string getHumanReadableSizeString(double size)//文件是否存在
BOOL isFileExist(IN const LPCTSTR csFileName)//注册dll控件
BOOL registerDll(const CString& sFilePath)//线程等待函数(这里500ms作为判断时间)
BOOL waitSingleObject(HANDLE handle, Int32 timeout)
7、utilstrCodingAPI:自定义的字符串编码公用函数
//32位整型转字符串
CString int32ToCString(Int32 n)//字符串转32位整型(要求输入的参数字符串是符合int范围[-2147483648, 2147483647])
Int32 cstringToInt32(LPCTSTR buff)//无符号32位整型转字符串
std::string uint32ToString(UInt32 n)//字符串转32位整型(不做参数检查)
Int32 stringToInt32(const std::string& src)//utf8转字符串
CString utf8ToCString(const char* src)//字符串转utf8
std::string cStringToUtf8(const CString& src)//宽字符转窄字符
const std::string ws2s(const std::wstring& src)//窄字符转宽字符
const std::wstring s2ws(const std::string& src)//是否包含中文
BOOL isIncludeChinese(const std::string& str)//获取第一个字母
std::string GetFirstLetter(const char* strChs)//汉字转首字母拼音
CString HZ2FirstPY(IN std::string szHZ)//汉字转拼音
char* ConvertChineseUnicodeToPyt(wchar_t* chrstr)//整句翻译成拼音
CString HZ2AllPY(IN CString szHZ)//拆分字符串
Int32 splitString(__in std::wstring src, __in std::vector<std::wstring> _vecSpliter,__out std::vector<std::wstring> &_splitList)
五、代码示例
整个程序使用了mfc框架来做一个架子,而所有的窗口和对话框都使用的是duilib,关于duilib网上有很多资料,这里不介绍duilib细节的东西了。一个mfc程序框架,使用起来也很简单,就是定义一个类集成mfc的CWinApp类,并改写其InitInstance()方法,mfc内部会替我们做好消息循环的步骤。TeamTalk相关的代码如下:
#include "stdafx.h"
#include "teamtalk.h"
#include "UI/MainDialog.h"
#include "GlobalConfig.h"
#include "versioninfo.h"
#include "google/protobuf/stubs/common.h"
#include "utility/utilCommonAPI.h"
#include "utility/Multilingual.h"
#include "utility/utilStrCodingAPI.h"
#include "Modules/IHttpPoolModule.h"
#include "Modules/ILoginModule.h"
#include "Modules/IMiscModule.h" //一些比较杂的公用的接口函数
#include "Modules/ISysConfigModule.h"
#include "Modules/ITcpClientModule.h"
#include "Modules/UIEventManager.h"
#include "network/OperationManager.h"
#include "network/ImCore.h" //IM核心类:线程、TcpSockets的操作之类的/** 指定重启管理器的风格* 重启管理器可以支持多种风格,例如,有的风格(AFX_RESTART_MANAGER_SUPPORT_RESTART)只是将崩溃的应用程序重新启动,* 而有的风格(AFX_RESTART_MANAGER_SUPPORT_ALL_ASPECTS)不仅将应用程序重新启动,* 还可以打开原来自动保存的文档,恢复应用程序到崩溃前的状态。* 我们可以在应用程序类的构造函数中,通过指定CWinAppEx的dwRestartManagerSupportFlags成员变量来指定重启管理器的风格。*/CteamtalkApp::CteamtalkApp()
:m_pMainDialog(0)
{m_dwRestartManagerSupportFlags = AFX_RESTART_MANAGER_SUPPORT_RESTART;
}CteamtalkApp theApp; //MFC类实例/** 应用程序入口* 所有的初始化工作就是写在CteamtalkApp::InitInstance()方法中了*/
BOOL CteamtalkApp::InitInstance()
{INITCOMMONCONTROLSEX InitCtrls;InitCtrls.dwSize = sizeof(InitCtrls);InitCtrls.dwICC = ICC_WIN95_CLASSES; //用来注册InitCommonControls函数所注册的所有类。InitCommonControlsEx(&InitCtrls);//如果一个运行在 Windows XP 上的应用程序清单指定要使用 ComCtl32.dll 版本 6 或更高版本来启用可视化方式,//则需要 InitCommonControlsEx();否则,将无法创建窗口。//YAOLOG 初始化_InitLog();//验证GOOGLE_PROTOBUF版本兼容性// Verify that the version of the library that we linked against is// compatible with the version of the headers we compiled against.GOOGLE_PROTOBUF_VERIFY_VERSION;//输出一行日志LOG__(APP, _T("===================VersionNO:%d======BulidTime:%s--%s=========================="), TEAMTALK_VERSION, util::utf8ToCString(__DATE__), util::utf8ToCString(__TIME__));if (!__super::InitInstance()) //__super关键字:本类的基类CWinApp{LOG__(ERR, _T("__super::InitInstance failed."));return FALSE;}//调用在应用程序对象的 InitInstance 函数中对此函数启用 OLE 控件包容的支持。AfxEnableControlContainer();//应用程序是否已经启动if (_IsHaveInstance()){LOG__(ERR, _T("Had one instance,this will exit"));HWND hwndMain = FindWindow(_T("TeamTalkMainDialog"), NULL); //寻找标题为TeamTalkMainDialog的进程if (hwndMain){::SendMessage(hwndMain, WM_START_MOGUTALKINSTANCE, NULL, NULL);//该函数将指定的消息发送到一个或多个窗口。此函数为指定的窗口调用窗口程序,直到窗口程序处理完消息再返回。//而和函数PostMessage不同,PostMessage是将一个消息寄送到一个线程的消息队列后就立即返回。}return FALSE;}//start imcore lib //启动蓝狐大神写的IM核心类(线程启动,互斥锁。。。有兴趣可以深入去了解ImCore)//在这里启动任务队列和网络IO线程if (!imcore::IMLibCoreRunEvent()){LOG__(ERR, _T("start imcore lib failed!"));}LOG__(APP, _T("start imcore lib done"));//start ui event//在这里创建代理窗口并启动定时器定时处理任务 if (module::getEventManager()->startup() != imcore::IMCORE_OK){LOG__(ERR, _T("start ui event failed"));}LOG__(APP, _T("start ui event done"));//create user folders_CreateUsersFolder();//duilib初始化CPaintManagerUI::SetInstance(AfxGetInstanceHandle()); //将程序实例与皮肤绘制管理器挂钩//获取可执行程序的所在路径,指的是.exe文件路径//track这个设置了路径,会导致base里设置的无效。CPaintManagerUI::SetResourcePath(CPaintManagerUI::GetInstancePath() + _T("..\\gui\\"));//COM在OLE32.DLL和 OLE32.LIB定义了一些常用的函数。在使用这些函数前要先调用CoInitialize来初始化COM库::CoInitialize(NULL);//OleInitialize是初始化Ole的运行环境,Ole是在Com的基础上作的扩展,是ActiveX运行的基础,//OleInitialize肯定会调用CoInitialize。::OleInitialize(NULL);//无需配置server(这里是初始化)module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();if (pCfg && pCfg->loginServIP.IsEmpty()){if (!module::getSysConfigModule()->showServerConfigDialog(NULL)){LOG__(APP, _T("server config canceled"));return FALSE;}}//创建登陆窗口,并启动它if (!module::getLoginModule()->showLoginDialog()){LOG__(ERR, _T("login canceled"));return FALSE;}LOG__(APP,_T("login success"));//创建主窗口,只是初始化,这里不会ShowModal显示处理if (!_CreateMainDialog()){LOG__(ERR, _T("Create MianDialog failed"));return FALSE;}LOG__(APP, _T("Create MianDialog done"));CPaintManagerUI::MessageLoop(); //DuiLib之消息循环。CPaintManagerUI::Term(); //关闭压缩包return TRUE;
}//销毁主窗口
BOOL CteamtalkApp::_DestroyMainDialog()
{delete m_pMainDialog;m_pMainDialog = 0;return TRUE;
}//创建主窗口
BOOL CteamtalkApp::_CreateMainDialog()
{m_pMainDialog = new MainDialog(); //新建主窗体类对象PTR_FALSE(m_pMainDialog); //assert 该值为FALSE时中断当前操作//从bin\teamtalk\chinese.ini 配置文件中获取窗体标题CString csTitle = util::getMultilingual()->getStringById(_T("STRID_GLOBAL_CAPTION_NAME")); //创建主窗口(600宽,800高)if (!m_pMainDialog->Create(NULL, csTitle, UI_CLASSSTYLE_DIALOG, WS_EX_STATICEDGE /*| WS_EX_APPWINDOW*/ | WS_EX_TOOLWINDOW, 0, 0, 600, 800))return FALSE;m_pMainDialog->BringToTop(); //主窗体置于所有窗体上面return TRUE;
}//退出应用程序
BOOL CteamtalkApp::ExitInstance()
{LOG__(APP, _T("Exit Instance"));//close httppoolmodule::getHttpPoolModule()->shutdown();LOG__(APP, _T("close http pool done"));//close network socket iomodule::getTcpClientModule()->shutdown();LOG__(APP, _T("close tcpclient socket done"));//stop imcore libimcore::IMLibCoreStopEvent();LOG__(APP, _T("stop imcore lib done"));//stop ui eventmodule::getEventManager()->shutdown();LOG__(APP, _T("stop ui event done"));//销毁主窗口_DestroyMainDialog();LOG__(APP,_T("MainDialog Destory done")); //终止Winsock 2 DLL (Ws2_32.dll) 的使用.WSACleanup();// Optional: Delete all global objects allocated by libprotobuf.google::protobuf::ShutdownProtobufLibrary();LOG__(APP, _T("Exit OK"));YAOLOG_EXIT;return __super::ExitInstance(); ///__super关键字:本类的基类CWinApp
}//创建用户目录
BOOL CteamtalkApp::_CreateUsersFolder()
{module::IMiscModule* pModule = module::getMiscModule(); //定义一些比较杂的公用的接口函数的类实例//users目录if (!util::createAllDirectories(pModule->getUsersDir())){LOG__(ERR, _T("_CreateUsersFolder users direcotry failed!"));return FALSE;}//下载目录if (!util::createAllDirectories(pModule->getDownloadDir())){LOG__(ERR, _T("_CreateUsersFolder download direcotry failed!"));return FALSE;}return TRUE;
}//应用程序的HANDLE标识号
#ifdef _DEBUG#define AppSingletonMutex _T("{7A666640-EDB3-44CC-954B-0C43F35A2E17}")
#else#define AppSingletonMutex _T("{5676532A-6F70-460D-A1F0-81D6E68F046A}")
#endifBOOL CteamtalkApp::_IsHaveInstance()
{// 单实例运行HANDLE hMutex = ::CreateMutex(NULL, TRUE, AppSingletonMutex);if (hMutex != NULL && GetLastError() == ERROR_ALREADY_EXISTS){MessageBox(0, _T("上次程序运行还没完全退出,请稍后再启动!"), _T("TeamTalk"), MB_OK);return TRUE;}return FALSE;
}//初始化YAOLOG日志
void CteamtalkApp::_InitLog()
{std::string logConfig = util::cStringToString(util::getAppPath()) + "\\ttlogconfig.ini";YAOLOG_INIT;YAOLOG_CREATE(APP, true, YaoUtil::LOG_TYPE_TEXT);YAOLOG_CREATE(NET, true, YaoUtil::LOG_TYPE_TEXT);YAOLOG_CREATE(DEBG, true, YaoUtil::LOG_TYPE_TEXT);YAOLOG_CREATE(ERR, true, YaoUtil::LOG_TYPE_TEXT);YAOLOG_CREATE(SOCK, true, YaoUtil::LOG_TYPE_FORMATTED_BIN);YAOLOG_SET_ATTR_FROM_CONFIG_FILE(APP, logConfig.c_str());YAOLOG_SET_ATTR_FROM_CONFIG_FILE(NET, logConfig.c_str());YAOLOG_SET_ATTR_FROM_CONFIG_FILE(DEBG, logConfig.c_str());YAOLOG_SET_ATTR_FROM_CONFIG_FILE(ERR, logConfig.c_str());YAOLOG_SET_ATTR_FROM_CONFIG_FILE(SOCK, logConfig.c_str());
}
上述代码大致做了以下工作:
// 1. 初始化yaolog日志库
// 2. google protobuf的版本号检测
// 3. 启动网络通信线程检测网络数据读写,再启动一个线程创建一个队列,如果队列中有任务,则取出该任务执行
// 4. 创建支线程与UI线程的桥梁——代理窗口
// 5. 创建用户文件夹
// 6. 配置duilib的资源文件路径、初始化com库、初始化ole库
// 7. 如果没有配置登录服务器的地址,则显示配置对话框
// 8. 显示登录对话框
// 9. 登录成功后,登录对话框销毁,显示主对话框
// 10. 启动duilib的消息循环(也就是说不使用mfc的消息循环)
其它的没什么好介绍的,我们来重点介绍下第3点和第4点。先说第3点,在第3点中又会牵扯出第4点,网络通信线程的启动:
//start imcore lib
//在这里启动任务队列和网络IO线程
if (!imcore::IMLibCoreRunEvent())
{
<span style="white-space:pre"> </span>LOG__(ERR, _T("start imcore lib failed!"));
}
LOG__(APP, _T("start imcore lib done"));
bool IMLibCoreRunEvent()
{ LOG__(NET, _T("===============================================================================")); //在这里启动任务队列处理线程 getOperationManager()->startup(); CAutoLock lock(&g_lock); if (!netlib_is_running()) {
#ifdef _MSC_VER unsigned int m_dwThreadID; //在这里启动网络IO线程 g_hThreadHandle = (HANDLE)_beginthreadex(0, 0, event_run, 0, 0, (unsigned*)&m_dwThreadID); if (g_hThreadHandle < (HANDLE)2) { m_dwThreadID = 0; g_hThreadHandle = 0; } return g_hThreadHandle >(HANDLE)1;
#else pthread_t pt; pthread_create(&pt, NULL, event_run, NULL);
#endif } return true;
}
先看getOperationManager()->startup();:
IMCoreErrorCode OperationManager::startup()
{ m_operationThread = std::thread([&] { std::unique_lock <std::mutex> lck(m_cvMutex); Operation* pOperation = nullptr; while (m_bContinue) { if (!m_bContinue) break; if (m_vecRealtimeOperations.empty()) m_CV.wait(lck); if (!m_bContinue) break; { std::lock_guard<std::mutex> lock(m_mutexOperation); if (m_vecRealtimeOperations.empty()) continue; pOperation = m_vecRealtimeOperations.front(); m_vecRealtimeOperations.pop_front(); } if (!m_bContinue) break; if (pOperation) { pOperation->process(); pOperation->release(); } } }); return IMCORE_OK;
}
这里利用一个C++11的新语法lamda表达式来创建一个线程,线程函数就是lamda表达式的具体内容:先从队列中取出任务,然后执行。所有的任务都继承其基类Operation,而Operation又继承接口类IOperatio,任务类根据自己具体需要做什么来改写process()方法:
class NETWORK_DLL Operation : public IOperation
{ enum OperationState { OPERATION_IDLE = 0, OPERATION_STARTING, OPERATION_RUNNING, OPERATION_CANCELLING, OPERATION_FINISHED }; public: /** @name Constructors and Destructor*/ //@{ /** * Constructor */ Operation(); Operation(const std::string& name); /** * Destructor */ virtual ~Operation(); //@} public: virtual void processOpertion() = 0; public: virtual void process(); virtual void release(); inline std::string name() const { return m_name; } inline void set_name(__in std::string name){ m_name = name; } private: OperationState m_state; std::string m_name;
};
struct NETWORK_DLL IOperation
{
public: virtual void process() = 0;
//private: /** * 必须让容器来释放自己 * * @return void * @exception there is no any exception to throw. */ virtual void release() = 0;
};
这里我们介绍的任务队列我们称为队列A,下文中还有一个专门做http请求的队列,我们称为队列B。
后半部分代码其实就是启动网络检测线程,检测网络数据读写:
g_hThreadHandle = (HANDLE)_beginthreadex(0, 0, event_run, 0, 0, (unsigned*)&m_dwThreadID);
unsigned int __stdcall event_run(void* threadArgu)
{ LOG__(NET, _T("event_run")); netlib_init(); netlib_set_running(); netlib_eventloop(); return NULL;
}
void netlib_eventloop(uint32_t wait_timeout)
{ CEventDispatch::Instance()->StartDispatch(wait_timeout);
}
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{ fd_set read_set, write_set, excep_set; timeval timeout; timeout.tv_sec = 1; //wait_timeout 1 second timeout.tv_usec = 0; while (running) { //_CheckTimer(); //_CheckLoop(); if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count) { Sleep(MIN_TIMER_DURATION); continue; } m_lock.lock(); FD_ZERO(&read_set); FD_ZERO(&write_set); FD_ZERO(&excep_set); memcpy(&read_set, &m_read_set, sizeof(fd_set)); memcpy(&write_set, &m_write_set, sizeof(fd_set)); memcpy(&excep_set, &m_excep_set, sizeof(fd_set)); m_lock.unlock(); if (!running) break; //for (int i = 0; i < read_set.fd_count; i++) { // LOG__(NET, "read fd: %d\n", read_set.fd_array[i]); //} int nfds = select(0, &read_set, &write_set, &excep_set, &timeout); if (nfds == SOCKET_ERROR) { //LOG__(NET, "select failed, error code: %d\n", GetLastError()); Sleep(MIN_TIMER_DURATION); continue; // select again } if (nfds == 0) { continue; } for (u_int i = 0; i < read_set.fd_count; i++) { //LOG__(NET, "select return read count=%d\n", read_set.fd_count); SOCKET fd = read_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnRead(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < write_set.fd_count; i++) { //LOG__(NET, "select return write count=%d\n", write_set.fd_count); SOCKET fd = write_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnWrite(); pSocket->ReleaseRef(); } } for (u_int i = 0; i < excep_set.fd_count; i++) { LOG__(NET, _T("select return exception count=%d"), excep_set.fd_count); SOCKET fd = excep_set.fd_array[i]; CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd); if (pSocket) { pSocket->OnClose(); pSocket->ReleaseRef(); } } }
}
我们举个具体的例子来说明这个三个线程的逻辑(任务队列A、网络线程和下文要介绍的专门处理http请求的任务队列B)和代理窗口的消息队列,以在登录对话框输入用户名和密码后接下来的步骤:
//位于LoginDialog.cpp中
void LoginDialog::_DoLogin()
{ LOG__(APP,_T("User Clicked LoginBtn")); m_ptxtTip->SetText(_T("")); CDuiString userName = m_pedtUserName->GetText(); CDuiString password = m_pedtPassword->GetText(); if (userName.IsEmpty()) { CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_USERNAME_EMPTY")); m_ptxtTip->SetText(csTip); return; } if (password.IsEmpty()) { CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_PASSWORD_EMPTY")); m_ptxtTip->SetText(csTip); return; } module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig(); pCfg->userName = userName; if (m_bPassChanged) { std::string sPass = util::cStringToString(CString(password)); char* pOutData = 0; uint32_t nOutLen = 0; int retCode = EncryptPass(sPass.c_str(), sPass.length(), &pOutData, nOutLen); if (retCode == 0 && nOutLen > 0 && pOutData != 0) { pCfg->password = std::string(pOutData, nOutLen); Free(pOutData); } else { LOG__(ERR, _T("EncryptPass Failed!")); CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_ENCRYPT_PASE_FAIL")); m_ptxtTip->SetText(csTip); return; } } pCfg->isRememberPWD = m_pChkRememberPWD->GetCheck(); module::getSysConfigModule()->saveData(); CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_DOLOGIN")); m_pBtnLogin->SetText(csTxt); m_pBtnLogin->SetEnabled(false); //连接登陆服务器 DoLoginServerParam param; DoLoginServerHttpOperation* pOper = new DoLoginServerHttpOperation( BIND_CALLBACK_1(LoginDialog::OnHttpCallbackOperation), param); module::getHttpPoolModule()->pushHttpOperation(pOper);
}
点击登录按钮之后,程序先对用户名和密码进行一些有效性校验,接着产生一个DoLoginServerHttpOperation对象,该类继承IHttpOperation,IHttpOperation再继承ICallbackOpertaion,ICallbackOpertaion再继承Operation类。这个任务会绑定一个任务完成之后的回调函数,即宏BIND_CALLBACK_1,这个宏实际上就是std::bind:
#define BIND_CALLBACK_1(func) std::bind(&func, this, placeholders::_1)
#define BIND_CALLBACK_2(func) std::bind(&func, this, placeholders::_1, placeholders::_2)
往任务队列中放入任务的动作如下:
void HttpPoolModule_Impl::pushHttpOperation(module::IHttpOperation* pOperaion, BOOL bHighPriority /*= FALSE*/)
{ if (NULL == pOperaion) { return; } CAutoLock lock(&m_mtxLock); if (bHighPriority) m_lstHttpOpers.push_front(pOperaion); else m_lstHttpOpers.push_back(pOperaion); _launchThread(); ::ReleaseSemaphore(m_hSemaphore, 1, NULL); return;
}
其中_launchThread()会启动一个线程,该线程函数是另外一个任务队列,专门处理http任务:
BOOL HttpPoolModule_Impl::_launchThread()
{ if ((int)m_vecHttpThread.size() >= MAX_THEAD_COUNT) { return TRUE; } TTHttpThread* pThread = new TTHttpThread(); PTR_FALSE(pThread); if (!pThread->create()) { return FALSE; } Sleep(300); m_vecHttpThread.push_back(pThread); return TRUE;
}
线程函数最终实际执行代码如下:
UInt32 TTHttpThread::process()
{ module::IHttpOperation * pHttpOper = NULL; HttpPoolModule_Impl *pPool = m_pInstance; while (m_bContinue) { if (WAIT_OBJECT_0 != ::WaitForSingleObject(pPool->m_hSemaphore, INFINITE)) { break; } if (!m_bContinue) { break; } { CAutoLock lock(&(pPool->m_mtxLock)); if (pPool->m_lstHttpOpers.empty()) pHttpOper = NULL; else { pHttpOper = pPool->m_lstHttpOpers.front(); pPool->m_lstHttpOpers.pop_front(); } } try { if (m_bContinue && pHttpOper) { pHttpOper->process(); pHttpOper->release(); } } catch (...) { LOG__(ERR, _T("TTHttpThread: Failed to execute opertaion(0x%p)"), pHttpOper); } } return 0;
}
当这个http任务被任务队列执行时,实际执行DoLoginServerHttpOperation::processOpertion(),代码如下:
void DoLoginServerHttpOperation::processOpertion()
{ module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig(); LOG__(APP, _T("loginAddr = %s"), pCfg->loginServIP); std::string& loginAddr = util::cStringToString(pCfg->loginServIP); std::string url = loginAddr; DoLoginServerParam* pPamram = new DoLoginServerParam(); pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_DEFERROR")); Http::HttpResponse response; Http::HttpClient client; //对于登录:url=http://192.168.226.128:8080/msg_server Http::HttpRequest request("get", url); if (!client.execute(&request, &response)) { CString csTemp = util::stringToCString(url); pPamram->result = DOLOGIN_FAIL; LOG__(ERR,_T("failed %s"), csTemp); asyncCallback(std::shared_ptr<void>(pPamram)); client.killSelf(); return; } /** { "backupIP" : "localhost", "code" : 0, "discovery" : "http://127.0.0.1/api/discovery", "msfsBackup" : "http://127.0.0.1:8700/", "msfsPrior" : "http://127.0.0.1:8700/", "msg" : "", "port" : "8000", "priorIP" : "localhost" } */ std::string body = response.getBody(); client.killSelf(); //json解析 try { Json::Reader reader; Json::Value root; if (!reader.parse(body, root)) { CString csTemp = util::stringToCString(body); LOG__(ERR, _T("parse data failed,%s"), csTemp); pPamram->result = DOLOGIN_FAIL; pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR")); goto End; } int nCode = root.get("code", "").asInt(); if (0 == nCode)//登陆成功 { LOG__(APP, _T("get msgSvr IP succeed!")); pCfg->msgSevPriorIP = root.get("priorIP", "").asString(); pCfg->msgSevBackupIP = root.get("backupIP", "").asString(); std::string strPort = root.get("port", "").asString(); pCfg->msgServPort = util::stringToInt32(strPort); pCfg->fileSysAddr = util::stringToCString(root.get("msfsPrior", "").asString()); pCfg->fileSysBackUpAddr = util::stringToCString(root.get("msfsBackup", "").asString()); pPamram->result = DOLOGIN_SUCC; } else { LOG__(ERR, _T("get msgSvr IP failed! Code = %d"),nCode); pPamram->result = DOLOGIN_FAIL; CString csRetMsgTemp = util::stringToCString(root.get("msg", "").asString()); if (!csRetMsgTemp.IsEmpty()) pPamram->resMsg = csRetMsgTemp; } } catch (...) { CString csTemp = util::stringToCString(body); LOG__(ERR,_T("parse json execption,%s"), csTemp); pPamram->result = DOLOGIN_FAIL; pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR")); } End: asyncCallback(std::shared_ptr<void>(pPamram));
}
实际上是向login_server发送一个http请求,这是一个同步请求。得到的结果是一个json字符串,代码注释中已经给出。然后调用asyncCallback(std::shared_ptr(pPamram));参数pPamram携带了当前任务的回调函数指针:
/**
* 异步回调,借助UIEvent
*
* @param std::shared_ptr<void> param
* @return void
* @exception there is no any exception to throw.
*/ void asyncCallback(std::shared_ptr<void> param)
{ CallbackOperationEvent* pEvent = new CallbackOperationEvent(m_callback, param); module::getEventManager()->asynFireUIEvent(pEvent);
}
这实际上产生了一个回调事件。也就是说队列B做http请求,操作完成后往代理窗口的消息队列中放入一个回调事件,这个事件通过代理窗口过程函数来处理的(这就是上文中第4点介绍的代理窗口过程的作用,实际上是利用windows消息队列来做任务处理(系统有现成的任务队列系统,为何不利用呢?)):
module::IMCoreErrorCode UIEventManager::asynFireUIEvent(IN const IEvent* const pEvent)
{ assert(m_hWnd); assert(pEvent); if (0 == m_hWnd || 0 == pEvent) return IMCORE_ARGUMENT_ERROR; if (FALSE == ::PostMessage(m_hWnd, UI_EVENT_MSG, reinterpret_cast<WPARAM>(this), reinterpret_cast<WPARAM>(pEvent))) return IMCORE_WORK_POSTMESSAGE_ERROR; return IMCORE_OK;
}
看到没有?向代理窗口的消息队列中投递一个UI_EVENT_MSG事件,并在消息参数LPARAM中传递了回调事件的对象指针。这样代理窗口过程函数就可以处理这个消息了:
LRESULT _stdcall UIEventManager::_WindowProc(HWND hWnd , UINT message , WPARAM wparam , LPARAM lparam)
{ switch (message) { case UI_EVENT_MSG: reinterpret_cast<UIEventManager*>(wparam)->_processEvent(reinterpret_cast<IEvent*>(lparam), TRUE); break; case WM_TIMER: reinterpret_cast<UIEventManager*>(wparam)->_processTimer(); break; default: break; } return ::DefWindowProc(hWnd, message, wparam, lparam);
}
void UIEventManager::_processEvent(IEvent* pEvent, BOOL bRelease)
{ assert(pEvent); if (0 == pEvent) return; try { pEvent->process(); if (bRelease) pEvent->release(); } catch (imcore::Exception *e) { LOG__(ERR, _T("event run exception")); pEvent->onException(e); if (bRelease) pEvent->release(); if (e) { LOG__(ERR, _T("event run exception:%s"), util::stringToCString(e->m_msg)); assert(FALSE); } } catch (...) { LOG__(ERR, _T("operation run exception,unknown reason")); if (bRelease) pEvent->release(); assert(FALSE); }
}
根据C++的多态特性,pEvent->process()实际上调用的是CallbackOperationEvent.process()。代码如下:
virtual void process()
{ m_callback(m_param);
}
m_callback(m_param);调用的就是上文中介绍DoLoginServerHttpOperation操作的回调函数LoginDialog::OnHttpCallbackOperation():
void LoginDialog::OnHttpCallbackOperation(std::shared_ptr<void> param)
{ DoLoginServerParam* pParam = (DoLoginServerParam*)param.get(); if (DOLOGIN_SUCC == pParam->result) { module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig(); PTR_VOID(pCfg); LoginParam loginparam; loginparam.csUserName = pCfg->userName; loginparam.password = pCfg->password; loginparam.csUserName.Trim(); LoginOperation* pOperation = new LoginOperation( BIND_CALLBACK_1(LoginDialog::OnOperationCallback), loginparam); imcore::IMLibCoreStartOperation(pOperation); } else { m_ptxtTip->SetText(pParam->resMsg); module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig(); LOG__(ERR, _T("get MsgServer config faild,login server addres:%s:%d"), pCfg->loginServIP,pCfg->loginServPort); CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_LOGIN")); m_pBtnLogin->SetText(csTxt); m_pBtnLogin->SetEnabled(true); }
}
ok,终于到家了。但是这并没结束,我们只介绍了队列B和代理窗口消息队列,还有队列A呢?LoginDialog::OnHttpCallbackOperation()会根据获取的msg_server的情况来再次产生一个新的任务LoginOperation来放入队列A中,这次才是真正的用户登录,根据上面的介绍,LoginOperation任务从队列A中取出来之后,实际执行的是LoginOperation::processOpertion():
void LoginOperation::processOpertion()
{ LOG__(APP,_T("login start,uname:%s,status:%d"), m_loginParam.csUserName , m_loginParam.mySelectedStatus); LoginParam* pParam = new LoginParam; pParam->csUserName = m_loginParam.csUserName; pParam->mySelectedStatus = m_loginParam.mySelectedStatus; //连接消息服务器 module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig(); CString server = util::stringToCString(pCfg->msgSevPriorIP); LOG__(APP, _T("MsgServeIp:%s,Port:%d"), server, pCfg->msgServPort); //8000端口 IM::Login::IMLoginRes* pImLoginResp = (IM::Login::IMLoginRes*)module::getTcpClientModule() ->doLogin(server, pCfg->msgServPort,m_loginParam.csUserName,m_loginParam.password); if (0 == pImLoginResp || pImLoginResp->result_code() != IM::BaseDefine::REFUSE_REASON_NONE || !pImLoginResp->has_user_info()) { //TODO,若失败,尝试备用IP LOG__(ERR,_T("add:%s:%d,uname:%s,login for msg server failed"),server,pCfg->msgServPort, m_loginParam.csUserName); if (pImLoginResp) { CString errInfo = util::stringToCString(pImLoginResp->result_string()); pParam->errInfo = errInfo; pParam->result = LOGIN_FAIL; pParam->server_result = pImLoginResp->result_code(); LOG__(ERR, _T("error code :%d,error info:%s"), pImLoginResp->result_code(), errInfo); } else { pParam->result = IM::BaseDefine::REFUSE_REASON_NO_MSG_SERVER; LOG__(ERR, _T("login msg server faild!")); } asyncCallback(std::shared_ptr<void>(pParam)); return; } pParam->result = LOGIN_OK; pParam->serverTime = pImLoginResp->server_time(); pParam->mySelectedStatus = pImLoginResp->online_status(); //存储服务器端返回的userId IM::BaseDefine::UserInfo userInfo = pImLoginResp->user_info(); pCfg->userId = util::uint32ToString(userInfo.user_id()); pCfg->csUserId = util::stringToCString(pCfg->userId); //登陆成功,创建自己的信息 module::UserInfoEntity myInfo; myInfo.sId = pCfg->userId; myInfo.csName = m_loginParam.csUserName; myInfo.onlineState = IM::BaseDefine::USER_STATUS_ONLINE; myInfo.csNickName = util::stringToCString(userInfo.user_nick_name()); myInfo.avatarUrl = userInfo.avatar_url(); myInfo.dId = util::uint32ToString(userInfo.department_id()); myInfo.department = myInfo.dId; myInfo.email = userInfo.email(); myInfo.gender = userInfo.user_gender(); myInfo.user_domain = userInfo.user_domain(); myInfo.telephone = userInfo.user_tel(); myInfo.status = userInfo.status(); myInfo.signature = userInfo.sign_info(); module::getUserListModule()->createUserInfo(myInfo); asyncCallback(std::shared_ptr<void>(pParam)); LOG__(APP, _T("login succeed! Name = %s Nickname = %s sId = %s status = %d") , m_loginParam.csUserName , util::stringToCString(userInfo.user_nick_name()) , module::getSysConfigModule()->UserID() , m_loginParam.mySelectedStatus); //开始发送心跳包 module::getTcpClientModule()->startHeartbeat();
}
同理,数据包发生成功以后,会再往代理窗口的消息队列中产生一个回调事件,最终调用刚才说的LoginOperation绑定的回调函数:
void asyncCallback(std::shared_ptr<void> param)
{ CallbackOperationEvent* pEvent = new CallbackOperationEvent(m_callback, param); module::getEventManager()->asynFireUIEvent(pEvent);
}
void LoginDialog::OnOperationCallback(std::shared_ptr<void> param)
{ LoginParam* pLoginParam = (LoginParam*)param.get(); if (LOGIN_OK == pLoginParam->result) //登陆成功 { Close(IDOK); //创建用户目录 _CreateUsersFolder(); //开启同步消息时间timer module::getSessionModule()->startSyncTimeTimer(); module::getSessionModule()->setTime(pLoginParam->serverTime); //通知服务器客户端初始化完毕,获取组织架构信息和群列表 module::getLoginModule()->notifyLoginDone(); } else //登陆失败处理 { module::getTcpClientModule()->shutdown(); if (IM::BaseDefine::REFUSE_REASON_NO_MSG_SERVER == pLoginParam->server_result) { CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_MSGSVR_FAIL")); m_ptxtTip->SetText(csTip); } else if (!pLoginParam->errInfo.IsEmpty()) { m_ptxtTip->SetText(pLoginParam->errInfo); } else { CString errorCode = util::int32ToCString(pLoginParam->server_result); CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_UNKNOWN_ERROR")); m_ptxtTip->SetText(csTip + CString(":") + errorCode); } } CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_LOGIN")); m_pBtnLogin->SetText(csTxt); m_pBtnLogin->SetEnabled(true);
}
至此,登录才成功。等等,那数据包是怎么发到服务器的呢?这也是一个重点,我们来详细地介绍一下,LoginOperation::processOpertion()中有这一行代码:
doLogin函数代码如下:
IM::Login::IMLoginRes* TcpClientModule_Impl::doLogin(CString &linkaddr, UInt16 port ,CString& uName,std::string& pass)
{ m_socketHandle = imcore::IMLibCoreConnect(util::cStringToString(linkaddr), port); imcore::IMLibCoreRegisterCallback(m_socketHandle, this); if(util::waitSingleObject(m_eventConnected, 5000)) { IM::Login::IMLoginReq imLoginReq; string& name = util::cStringToString(uName); imLoginReq.set_user_name(name); imLoginReq.set_password(pass); imLoginReq.set_online_status(IM::BaseDefine::USER_STATUS_ONLINE); imLoginReq.set_client_type(IM::BaseDefine::CLIENT_TYPE_WINDOWS); imLoginReq.set_client_version("win_10086"); if (TCPCLIENT_STATE_OK != m_tcpClientState) return 0; sendPacket(IM::BaseDefine::SID_LOGIN, IM::BaseDefine::CID_LOGIN_REQ_USERLOGIN, ++g_seqNum , &imLoginReq); m_pImLoginResp->Clear(); util::waitSingleObject(m_eventReceived, 10000); } return m_pImLoginResp;
}
这段代码先连接服务器,然后调用sendPacket()发送登录数据包。如何连接服务器使用了一些“奇技淫巧”,我们后面单独介绍。我们这里先来看sendPacket()发包代码:
void TcpClientModule_Impl::sendPacket(UInt16 moduleId, UInt16 cmdId, UInt16 seq, google::protobuf::MessageLite* pbBody)
{ m_TTPBHeader.clear(); m_TTPBHeader.setModuleId(moduleId); m_TTPBHeader.setCommandId(cmdId); m_TTPBHeader.setSeqNumber(seq); _sendPacket(pbBody);
}
void TcpClientModule_Impl::_sendPacket(google::protobuf::MessageLite* pbBody)
{ UInt32 length = imcore::HEADER_LENGTH + pbBody->ByteSize(); m_TTPBHeader.setLength(length); std::unique_ptr<byte> data(new byte[length]); memset(data.get(), 0, length); memcpy(data.get(), m_TTPBHeader.getSerializeBuffer(), imcore::HEADER_LENGTH); if (!pbBody->SerializeToArray(data.get() + imcore::HEADER_LENGTH, pbBody->ByteSize())) { LOG__(ERR, _T("pbBody SerializeToArray failed")); return; } imcore::IMLibCoreWrite(m_socketHandle, data.get(), length);
}
其实就是序列化成protobuf要求的格式,然后调用imcore::IMLibCoreWrite(m_socketHandle, data.get(), length);发出去:
int IMLibCoreWrite(int key, uchar_t* data, uint32_t size)
{ int nRet = -1; int nHandle = key; CImConn* pConn = TcpSocketsManager::getInstance()->get_client_conn(nHandle); if (pConn) { pConn->Send((void*)data, size); } else { LOG__(NET, _T("connection is invalied:%d"), key); } return nRet;
}
先尝试着直接发送,如果目前tcp窗口太小发不出去,则暂且将数据放在发送缓冲区里面,并检测socket可写事件。这里就是和服务器一样的网络库的代码了,前面一系列的文章,我们已经介绍过了。
int CImConn::Send(void* data, int len)
{ if (m_busy) { m_out_buf.Write(data, len); return len; } int offset = 0; int remain = len; while (remain > 0) { int send_size = remain; if (send_size > NETLIB_MAX_SOCKET_BUF_SIZE) { send_size = NETLIB_MAX_SOCKET_BUF_SIZE; } int ret = netlib_send(m_handle, (char*)data + offset, send_size); if (ret <= 0) { ret = 0; break; } offset += ret; remain -= ret; } if (remain > 0) { m_out_buf.Write((char*)data + offset, remain); m_busy = true; LOG__(NET, _T("send busy, remain=%d"), m_out_buf.GetWriteOffset()); } return len;
}
数据发出去以后,服务器应答登录包,网络线程会检测到socket可读事件:
void CBaseSocket::OnRead()
{ if (m_state == SOCKET_STATE_LISTENING) { _AcceptNewSocket(); } else { u_long avail = 0; if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) ) { m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL); } else { m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL); } }
}
void imconn_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{ NOTUSED_ARG(handle); NOTUSED_ARG(pParam); CImConn* pConn = TcpSocketsManager::getInstance()->get_client_conn(handle); if (!pConn) { //LOG__(NET, _T("connection is invalied:%d"), handle); return; } pConn->AddRef(); // LOG__(NET, "msg=%d, handle=%d\n", msg, handle); switch (msg) { case NETLIB_MSG_CONFIRM: pConn->onConnect(); break; case NETLIB_MSG_READ: pConn->OnRead(); break; case NETLIB_MSG_WRITE: pConn->OnWrite(); break; case NETLIB_MSG_CLOSE: pConn->OnClose(); break; default: LOG__(NET, _T("!!!imconn_callback error msg: %d"), msg); break; } pConn->ReleaseRef();
}
void CImConn::OnRead()
{ for (;;) { uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset(); if (free_buf_len < READ_BUF_SIZE) m_in_buf.Extend(READ_BUF_SIZE); int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE); if (ret <= 0) break; m_in_buf.IncWriteOffset(ret); while (m_in_buf.GetWriteOffset() >= imcore::HEADER_LENGTH) { uint32_t len = m_in_buf.GetWriteOffset(); uint32_t length = CByteStream::ReadUint32(m_in_buf.GetBuffer()); if (length > len) break; try { imcore::TTPBHeader pbHeader; pbHeader.unSerialize((byte*)m_in_buf.GetBuffer(), imcore::HEADER_LENGTH); LOG__(NET, _T("OnRead moduleId:0x%x,commandId:0x%x"), pbHeader.getModuleId(), pbHeader.getCommandId()); if (m_pTcpSocketCB) m_pTcpSocketCB->onReceiveData((const char*)m_in_buf.GetBuffer(), length); LOGBIN_F__(SOCK, "OnRead", m_in_buf.GetBuffer(), length); } catch (std::exception& ex) { assert(FALSE); LOGA__(NET, "std::exception,info:%s", ex.what()); if (m_pTcpSocketCB) m_pTcpSocketCB->onReceiveError(); } catch (...) { assert(FALSE); LOG__(NET, _T("unknown exception")); if (m_pTcpSocketCB) m_pTcpSocketCB->onReceiveError(); } m_in_buf.Read(NULL, length); } }
}
收取数据,并解包:
void TcpClientModule_Impl::onReceiveData(const char* data, int32_t size)
{ if (m_pServerPingTimer) m_pServerPingTimer->m_bHasReceivedPing = TRUE; imcore::TTPBHeader header; header.unSerialize((byte*)data, imcore::HEADER_LENGTH); if (IM::BaseDefine::CID_OTHER_HEARTBEAT == header.getCommandId() && IM::BaseDefine::SID_OTHER == header.getModuleId()) { //模块器端过来的心跳包,不跳到业务层派发 return; } LOG__(NET, _T("receiveData message moduleId:0x%x,commandId:0x%x") , header.getModuleId(), header.getCommandId()); if (g_seqNum == header.getSeqNumber()) { m_pImLoginResp->ParseFromArray(data + imcore::HEADER_LENGTH, size - imcore::HEADER_LENGTH); ::SetEvent(m_eventReceived); return; } //将网络包包装成任务放到逻辑任务队列里面去 _handlePacketOperation(data, size);
}
void TcpClientModule_Impl::_handlePacketOperation(const char* data, UInt32 size)
{ std::string copyInBuffer(data, size); imcore::IMLibCoreStartOperationWithLambda( [=]() { imcore::TTPBHeader header; header.unSerialize((byte*)copyInBuffer.data(),imcore::HEADER_LENGTH); module::IPduPacketParse* pModule = (module::IPduPacketParse*)__getModule(header.getModuleId()); if (!pModule) { assert(FALSE); LOG__(ERR, _T("module is null, moduleId:%d,commandId:%d") , header.getModuleId(), header.getCommandId()); return; } std::string pbBody(copyInBuffer.data() + imcore::HEADER_LENGTH, size - imcore::HEADER_LENGTH); pModule->onPacket(header, pbBody); });
}
根据不同的命令号来做相应的处理:
void UserListModule_Impl::onPacket(imcore::TTPBHeader& header, std::string& pbBody)
{ switch (header.getCommandId()) { case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_RECENT_CONTACT_SESSION_RESPONSE: _recentlistResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_STATUS_NOTIFY: _userStatusNotify(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_USER_INFO_RESPONSE: _usersInfoResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_REMOVE_SESSION_RES: _removeSessionResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_ALL_USER_RESPONSE: _allUserlistResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_USERS_STATUS_RESPONSE: _usersLineStatusResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_CHANGE_AVATAR_RESPONSE: _changeAvatarResponse(pbBody); break; case IM::BaseDefine::CID_BUDDY_LIST_REMOVE_SESSION_NOTIFY: _removeSessionNotify(pbBody); break; case IM::BaseDefine::CID_BUDDY_LIST_DEPARTMENT_RESPONSE: _departmentResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_AVATAR_CHANGED_NOTIFY: _avatarChangeNotify(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_CHANGE_SIGN_INFO_RESPONSE: _changeSignInfoResponse(pbBody); break; case IM::BaseDefine::BuddyListCmdID::CID_BUDDY_LIST_SIGN_INFO_CHANGED_NOTIFY: _signInfoChangedNotify(pbBody); break; default: LOG__(ERR, _T("Unknow commandID:%d"), header.getCommandId()); return; }
}
每一个处理分支,都最终会产生一个事件放入代理窗口的消息队列中。这前面已经介绍过了。这里我不得不说一点,teamtalk对于其它数据包的应答都是走的上面的介绍的流程,但是对于登录的应答却是使用了一些特殊处理。听我慢慢道来:
上文中发送了登录数据包之后,在那里等一个事件10秒钟,如果10秒内这个事件有信号,则认为登录成功。那么什么情况该事件会有信号呢?
该事件在构造函数里面创建,默认无信号:
当网络线程收到数据以后(上文逻辑流中介绍过了):
除了心跳包直接过滤以外,通过一个序列号(Seq,变量g_seqNum)唯一标识了登录数据包的应答,如果收到这个序列号的数据,则置信m_eventReceived。这样等待在那里的登录流程就可以返回了,同时也得到了登录应答,登录应答数据记录在成员变量m_pImLoginResp中。如果是其它的数据包,则走的流程是_handlePacketOperation(data, size);,处理逻辑上文也介绍了。
至此,整个客户端程序结构就介绍完了,我们总结一下,实际上程序有如下几类线程:
- 网络事件检测线程,用于接收和发送网络数据;
- http任务处理线程用于处理http操作;
- 普通的任务处理线程,用于处理一般性的任务,比如登录;
- UI线程,界面逻辑处理,同时在UI线程里面有一个代理窗口的窗口过程函数,用于非UI线程与UI线程之间的数据流和逻辑中转,核心是利用PostMessage往代理线程投递事件,事件消息参数携带任务信息。
至于,像聊天、查看用户信息这些业务性的内容,留给有兴趣的读者自己去研究吧。
六、程序中使用的一些比较有意思的技巧摘录
- 唯一实例判断
很多程序只能启动一个实例,当你再次启动某个程序的实例时,会激活前一个实例,其实实现起来很简单,就是新建一个命名的Mutex,因为Mutex可以跨进程,当再次启动程序实例时,创建同名的Mutex,会无法创建,错误信息是已经存在。这是windows上非常常用的技巧,如果你从事windows开发,请你务必掌握它。看teamtalk的实现:
#ifdef _DEBUG #define AppSingletonMutex _T("{7A666640-EDB3-44CC-954B-0C43F35A2E17}")
#else #define AppSingletonMutex _T("{5676532A-6F70-460D-A1F0-81D6E68F046A}")
#endif
BOOL CteamtalkApp::_IsHaveInstance()
{ // 单实例运行 HANDLE hMutex = ::CreateMutex(NULL, TRUE, AppSingletonMutex); if (hMutex != NULL && GetLastError() == ERROR_ALREADY_EXISTS) { MessageBox(0, _T("上次程序运行还没完全退出,请稍后再启动!"), _T("TeamTalk"), MB_OK); return TRUE; } return FALSE;
}
- socket函数connect()连接等待时长设定
传统的做法是将socket设置为非阻塞的,调用完connect函数之后,调用select函数检测socket是否可写,在select函数里面设置超时时间。代码如下:
//为了调试方便,暂且注释掉
int ret = ::connect(m_hSocket, (struct sockaddr*)&addrSrv, sizeof(addrSrv));
if (ret == 0)
{ m_bConnected = TRUE; return TRUE;
} if (ret == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
{ return FALSE;
} fd_set writeset;
FD_ZERO(&writeset);
FD_SET(m_hSocket, &writeset);
struct timeval tv = { timeout, 0 };
if (::select(m_hSocket + 1, NULL, &writeset, NULL, &tv) != 1)
{ return FALSE;
}
return TRUE;
我们看看teamtalk里面怎么做的:
红色箭头的地方调用connect函数连接服务器,然后绿色的箭头等待一个事件有信号(内部使用WaitForSingleObject函数),那事件什么时候有信号呢?
网络线程检测第一次到socket可写时,调用onConnectDone函数:
实际做的事情还是和上面介绍的差不多。其实对于登录流程做成同步的,也是和这个类似,上文中我们介绍过。我早些年刚做windows网络通信方面的项目时,开始总是找不到好的处理等待登录请求应答的方法。这里是一种很不错的设置超时等待的方法。
3.teamtalk的截图功能
不知道,你在使用qq这样的截图工具时,QQ截图工具能自动检测出某个窗口的范围。这个功能在teamtalk中也有实现,实现代码如下:
BOOL ScreenCapture::initCapture(__in HWND hWnd)
{ //register hot key const std::wstring screenCaptureHotkeyName = L"_SCREEN_CAPTURE_HOTKEY"; int iHotkeyId = (int)GlobalAddAtom(screenCaptureHotkeyName.c_str()); if (!RegisterHotKey(hWnd, iHotkeyId, MOD_CONTROL | MOD_SHIFT, 0x51)) //ctrl + shift + Q { GlobalDeleteAtom(iHotkeyId); } m_iHotkeyId = iHotkeyId; m_hRegisterHotkeyWnd = hWnd; return createMsgWindow();
}
程序初始化时,注册截屏快捷键,这里是ctrl+shift+Q(QQ默认是ctrl+alt+A)。当点击截屏按钮之后,开始启动截图:
HWND hDesktopWnd = GetDesktopWindow();
HDC hScreenDC = GetDC(hDesktopWnd); RECT rc = { 0 };
GetWindowRect(hDesktopWnd, &rc);
int cx = rc.right - rc.left;
int cy = rc.bottom - rc.top; HBITMAP hBitmap = CreateCompatibleBitmap(hScreenDC, cx, cy);
m_hMemDC = CreateCompatibleDC(hScreenDC);
HGDIOBJ hOldBitmap = SelectObject(m_hMemDC, (HGDIOBJ)hBitmap);
BitBlt(m_hMemDC, 0, 0, cx, cy, hScreenDC, 0, 0, SRCCOPY); m_hBkgMemDC = CreateCompatibleDC(hScreenDC);
HBITMAP hBkgBitmap = CreateCompatibleBitmap(hScreenDC, cx, cy);
SelectObject(m_hBkgMemDC, (HGDIOBJ)hBkgBitmap);
BitBlt(m_hBkgMemDC, 0, 0, cx, cy, hScreenDC, 0, 0, SRCCOPY); HDC hMaskDC = CreateCompatibleDC(hScreenDC);
HBITMAP hMaskBitmap = CreateCompatibleBitmap(hScreenDC, cx, cy);
SelectObject(hMaskDC, (HGDIOBJ)hMaskBitmap); BLENDFUNCTION ftn = { AC_SRC_OVER, 0, 100, 0};
AlphaBlend(m_hBkgMemDC, 0, 0, cx, cy, hMaskDC, 0, 0, cx, cy, ftn);
DeleteObject(hMaskBitmap);
DeleteDC(hMaskDC); m_hDrawMemDC = CreateCompatibleDC(hScreenDC);
HBITMAP hDrawBitmap = CreateCompatibleBitmap(hScreenDC, cx, cy);
SelectObject(m_hDrawMemDC, hDrawBitmap); ReleaseDC(hDesktopWnd, hScreenDC);
实际上就是在桌面窗口上画图。再遍历当前所有有显示区域的窗口,并记录这些窗口的窗口句柄和矩形区域:
for (HWND hWnd = GetTopWindow(NULL); NULL != hWnd; hWnd = GetWindow(hWnd, GW_HWNDNEXT))
{ if (!IsWindow(hWnd) || !IsWindowVisible(hWnd) || IsIconic(hWnd)) { continue; } RECT rcWnd = { 0 }; GetWindowRect(hWnd, &rcWnd); adjustRectInScreen(rcWnd); if (ScreenCommon::isRectEmpty(rcWnd)) { continue; } wchar_t szTxt[MAX_PATH] = { 0 }; GetWindowText(hWnd, szTxt, MAX_PATH); if (wcslen(szTxt) <= 0) { continue; } //combine the rect with the screen rect m_lsWndList.push_back(ScreenCaptureWndInfo(hWnd, rcWnd));
} return m_lsWndList.size() > 0;
然后显示一个截图工具:
BOOL UIScreenCaptureMgr::createWindows()
{ m_hBkgUI = BkgroundUI::Instance()->createWindow(); wchar_t szImg[MAX_PATH] = {0}; GetModuleFileName(NULL, szImg, MAX_PATH); PathRemoveFileSpec(szImg); PathRemoveFileSpec(szImg); std::wstring strBkgPic = std::wstring(szImg) + L"\\gui\\ScreenCapture\\sc_toolbar_normal.png"; std::wstring strHoverPic = std::wstring(szImg) + L"\\gui\\ScreenCapture\\sc_toolbar_hover.png"; std::wstring strSelPic = std::wstring(szImg) + L"\\gui\\ScreenCapture\\sc_toolbar_select.png"; EditToolbarInfo toolBarInfo = { 0, 0, 193, 37, strBkgPic, strHoverPic, strSelPic, { { 9, 5, 35, 31 }, { 43, 5, 69, 31 }, { 85, 5, 112, 31 }, { 119, 5, 185, 31 } } }; m_hEditToolBarUI = EditToolbarUI::Instance()->createWindow(toolBarInfo, m_hBkgUI); SetWindowPos(m_hBkgUI, HWND_TOPMOST, 0, 0, 0, 0, SWP_NOSIZE | SWP_NOMOVE); forceForgroundWindow(m_hBkgUI); ShowWindow(m_hBkgUI, SW_SHOW); return TRUE;
}
然后安装一个消息钩子(hook):
BOOL ScreenCapture::installMsgHook(BOOL bInstall)
{ BOOL result = FALSE; if (bInstall) { if (!m_hMouseHook) { m_hMouseHook = SetWindowsHookEx(WH_MOUSE, MouseProc, NULL, GetCurrentThreadId()); result = (NULL != m_hMouseHook); } } else { UnhookWindowsHookEx(m_hMouseHook); m_hMouseHook = NULL; result = TRUE; } return result;
}
LRESULT ScreenCapture::MouseProc(_In_ int nCode, _In_ WPARAM wParam, _In_ LPARAM lParam)
{ PMOUSEHOOKSTRUCT pHookInfo = (PMOUSEHOOKSTRUCT)lParam; int xPos = pHookInfo->pt.x; int yPos = pHookInfo->pt.y; LRESULT lResHandled = CallNextHookEx(ScreenCapture::getInstance()->getMouseHook(), nCode, wParam, lParam); if (WM_LBUTTONDBLCLK == wParam ) { ScreenCommon::postNotifyMessage(WM_SNAPSHOT_FINISH_CAPTURE, 0, 0); } else if (WM_RBUTTONDBLCLK == wParam) { ScreenCommon::postNotifyMessage(WM_SNAPSHOT_CANCEL_CPATURE, 0, 0); } else if (WM_LBUTTONDOWN == wParam) { if (CM_AUTO_SELECT == CaptureModeMgr::Instance()->getMode()) { CaptureModeMgr::Instance()->changeMode(CM_MANAL_SELECT); } } CaptureModeMgr::Instance()->handleMouseMsg(wParam, xPos, yPos); return lResHandled;
}
在钩子函数中,如果出现鼠标双击事件,则表示取消截图;如果出现双击事件,则表示完成截图。如果鼠标按下则表示开始绘制截图区域,然后处理鼠标移动事件:
void CaptureModeMgr::handleMouseMsg(__in UINT uMsg, __in int xPos, __in int yPos)
{ IModeMsgHandler *msgHandler = getModeHandler(); if (!msgHandler) return; if (WM_MOUSEMOVE == uMsg) { msgHandler->onMouseMove(xPos, yPos); } else if (WM_LBUTTONDOWN == uMsg) { msgHandler->onLButtonDown(xPos, yPos); } else if (WM_LBUTTONUP == uMsg) { msgHandler->onLButtonUp(xPos, yPos); } else if (WM_LBUTTONDBLCLK == uMsg) { msgHandler->onLButtonDBClick(xPos, yPos); }
}
选取区域结束时,将选择的区域保存为位图并存至某个路径下:
void ScreenCapture::finishCapture()
{
<span style="white-space:pre"> </span>RECT rcSelect = {0}; UIScreenCaptureMgr::Instance()->sendBkgMessage(WM_SNAPSHOT_TEST_SELECT_RECT, (WPARAM)&rcSelect, 0); rcSelect.left += 2; rcSelect.top += 2; rcSelect.right -= 2; rcSelect.bottom -= 2; if (!ScreenCommon::isRectEmpty(rcSelect)) { ScreenSnapshot::Instance()->saveRect(rcSelect, m_strSavePath); } cancelCapture(); if (m_callBack) m_callBack->onScreenCaptureFinish(m_strSavePath);
}
BOOL ScreenSnapshot::saveRect(__in RECT &rc, __in std::wstring &savePath)
{ snapshotScreen(); CxImage img; int cx = rc.right - rc.left; int cy = rc.bottom - rc.top; HDC hSaveDC = CreateCompatibleDC(m_hMemDC); HBITMAP hBitmap = CreateCompatibleBitmap(m_hMemDC, cx, cy); HBITMAP hSaveBitmap = (HBITMAP)SelectObject(hSaveDC, (HGDIOBJ)hBitmap); BitBlt(hSaveDC, 0, 0, cx, cy, m_hMemDC, rc.left, rc.top, SRCCOPY); hBitmap = (HBITMAP)SelectObject(hSaveDC, (HBITMAP)hSaveBitmap); BOOL result = FALSE; do { if (!img.CreateFromHBITMAP(hBitmap)) { break; } if (!img.Save(savePath.c_str(), CXIMAGE_FORMAT_BMP)) { break; } result = TRUE; } while (FALSE); DeleteObject((HGDIOBJ)hBitmap); DeleteDC(hSaveDC); return result;
}
注意整个过程使用了一个神奇的windows API,你没看错,它叫mouse_event,很少有windows API长成这个样子。利用这个api可以用程序模拟鼠标很多事件,后面有时间我会专门介绍一下这个有用的API函数。当然,关于截图的描述,你可能有点迷糊。没关系,后面我会专门写一篇文章细致地探究下teamtalk的屏幕截图效果实现,因为这里面有价值的东西很多。
4.线程的创建
IMCoreErrorCode OperationManager::startup()
{ m_operationThread = std::thread([&] { std::unique_lock <std::mutex> lck(m_cvMutex); Operation* pOperation = nullptr; while (m_bContinue) { if (!m_bContinue) break; if (m_vecRealtimeOperations.empty()) m_CV.wait(lck); if (!m_bContinue) break; { std::lock_guard<std::mutex> lock(m_mutexOperation); if (m_vecRealtimeOperations.empty()) continue; pOperation = m_vecRealtimeOperations.front(); m_vecRealtimeOperations.pop_front(); } if (!m_bContinue) break; if (pOperation) { pOperation->process(); pOperation->release(); } } }); return IMCORE_OK;
}
这是利用lamda表达式创建一个线程典型的语法,其中m_operationThread是一个成员变量,类型是std::thread,std::thread([&]中括号中的&符号表示该lamda表达式以引用的方式捕获了所有外部的自动变量,这是在一个成员函数里面,也就是说在线程函数里面可以以引用的方式使用该类的所有成员变量。这个语法值得大家学习。
5.teamtalk的httpclient工程可以直接拿来使用,作者主页:http://xiangwangfeng.com,github链接:https://github.com/xiangwangfeng/httpclient。
另外teamtalk pc端大量使用C++11的语法和一些替代原来平常的写法,这个就不专门列出来了,后面我将会专门写一篇文章来介绍c++11中那些好用的工程级技巧。
七、框架分解
DDLogic框架着重解决如下这几个点:
- 基于Task的任务调度
- 事件的订阅与发布
- pdu通信协议以及拆装包过程
- 基于WSAAsyncSelect模型的网络异步I/O TCP/IP长连接
- 业务模块拆分以及模块与模块之间通过接口交互
- 持久化数据以及基于此数据之上的一层数据监听机制(类似IDE工具调试的 Watch)
下面针对每个点分别做描述:
1.基于Task的任务调度(Task 调度)
任何应用程序都会存在一个个需要处理的业务,只有如此你的应用程序才是活的,才能完成用户的业务需求。这些任务或是后台计算性、或是网络通信的拆包/装包、又或是前端交互的如动画计算,可以说整个应用程序就是由这样的一个个task跑起来的。**那么如何来合理的调度这些任务呢?**之前写过的一篇《TT和chrome线程模型对比分析》,同学们可以去看下,这里把任务调度相关的文字直接挪过来下。
一图胜千言,先上下DDLogic的执行逻辑模型图如下:
这张图告诉我们几点:
TT是多线程的,线程分为UI主线程、网络异步I/O线程、逻辑任务执行器线程池、http线程池等
1.1 主线程(UI线程):负责界面的显示和交互,以及借助消息循环来做事件的派发. 1.2 网络异步I/O线程:负责TCP/IP长连接以及消息服务器数据包的收发. 1.3 逻辑任务执行器线程池:一个简单的可伸缩的任务执行池,FIFO task list thread线程执行一些正常任务, Priority queue thread可以执行一些优先级调度或者dependency调度,Priority queue thread也可以在某个重任务把常驻线程耗掉的时候,开启一个新线程来执行后续饥渴任务。 1.4 http线程池:由于除主线程外所有子线程都没有MessagePump,逻辑任务执行器线程池只能负责一些后台计算性的任务(因为如果在逻辑执行器里面执行http任务,有可能会被同步http请求,卡住导致后续的任务不能够得到及时响应),所以只能再做个http线程池来专门处理http相关的任务.
任务执行单位——Task
2.1 task的创建和执行是分开的(command模式),可以在任何的线程中创建一个task,然后通过调用TaskPool的pushTask将任务放到TaskPool的线程池中执行。 2.2 整个过程只有在pushTask的时候才加锁,等到开始执行的时候是无锁的,所以在设计task的时候,开发者需要考虑到task中的数据对象管辖的范围。 2.3 task执行过程中产生的事件通知都是利用主线程的消息循环dispatch出去的(这一点与chrome有很大的不同)
这块接下来的目标会尽量和chrome的思想靠齐,特别是在线程任务的设计上chrome允许创建的每个线程都有执行各种任务的能力,并且也为之创建了各种的任务执行队列来异步执行,这样的轮子便于整个项目功能和业务的分解。
2 事件的订阅与发布 (Event Watch机制)
在一个框架里面有一套统一的、方便使用的事件订阅与发布是非常有必要的。看过一些优秀的开源代码、框架都有各自的不同程度不同方式的实现,如libevent的event-driven,一个高性能的服务器网络库;如.net framework 委托与事件;如delphi(object pascal) VCL的回调函数指针与事件等,同学们可以自行去研究下,特别是libevent的实现值得一看。**DDLogic对于这块的设计需要达到这样的效果——即观察者可以通过监听某个业务模块的某个唯一属性(MKN=module key name)的变化,当该属性发生变化的时候,观察者能够及时的获得同步或者异步方式的处理。**基于此目的mac TT和windows TT分别用不同的技术达到了DDLogic的设计需求。
mac TT
mac TT依托于强大的OC运行时库支持动态创建类、c语言原始的函数指针、函数调用在运行时才去做二进制重定位即编译时调用者不需要确保被调用函数的存在,实现Event机制的方式可以多种多样,我知道的有协议与委托、类别与委托、C语言的函数指针与回调、target/action、键值观察(KVO)、RunRoop(和windows的消息循环差不多),还有NS库提供的NotificationCenter等。PS:同学们可以去膜拜下《深入浅出Cocoa》( 深入浅出Cocoa)。
首先,先看下DDLogic Event Watch机制的使用好有个初步感受,描述如下:
1.首先将众多事件根据业务模块(module)来拆分,如会话module里面定义的事件属性包括:
//module key names
static NSString* const MKN_DDSESSIONMODULE_GROUPMSG = @"DDSESSIONMODULE_GROUPMSG"; //群消息到达static NSString* const MKN_DDSESSIONMODULE_SINGLEMSG = @"DDSESSIONMODULE_SGINGLEMSG"; //个人息到达
2.需要监听事件的地方调用如下,实现
[[DDLogic instance] addObserver:MODULE_ID_SESSION name: MKN_DDSESSIONMODULE_SINGLEMSG observer:self selector:@selector(onHandleSingleMsg:)];
onHandleSingleMsg函数,即具体的事件处理函数。
3. 在群信息/个人信息到达的时候发布事件,调用如下发布通知
[self uiAsyncNotify:MKN_DDSESSIONMODULE_SINGLEMSG userInfo:userInfo];
咋样上面使用起来很简单吧,典型的观察者模式接口设计。再完善一点可以像.net framework、Delphi VCL可视化订阅事件一样,将事件源的定义和事件和事件处理函数的绑定集成到xcode上去。
接下来讲讲DDLogic Event Watch机制在mac上是如何实现的。
DDLogic是借助了上文描述的NS库提供的NSNotificationCenter来实现,其实和NSNotificationCenter原生态的使用没啥区别,所以有些同学会问了NSNotificationCenter 接口使用文档。我这里想着重回答下同学们的一个疑问:因为肯定有会有同学问,本身NSNotificationCenter就已经很好用了而且你的框架也是简单包装了下而已,为啥要这样做呢?这里我的解释也不想套用啥高大上的理论,我自己的理解是:
DDLogic去包装NSNotificationCenter主要目的是定制一套统一的规则即定义module key
name、监听module key name的事件通知与处理、以及统一的事件发布。对于框架的层面不应该与某种技术选型耦合太深,就拿NSNotificationCenter技术选型来讲,当未来的某一天这套通知机制不够用的时候,可以方便的替换掉选择更适合的技术选型,这个时候可以尽量把替换封装在框架内而不用因此去重构业务层代码。
在技术选型上做一层适配,其实还有个好处是可以对你的技术选型做一个定制,比如你选择了NSNotificationCenter技术,但是发现NSNotificationCenter库很强大支持各种场景,但是你的项目其实不需要那么重,通过适配是可以降低使用者对NSNotificationCenter的学习成本。
还有一点是开发mac TT DDLogic的时候,windows TT的框架已经成型了,为了保持一致的使用体验,我就特地去包装了下,宽恕我吧_
windows TT
windows平台由于没有类似NSNotificationCenter这样的优秀的平台库,不可避免对于DDLogic Event Watch机制的封装需要自己去造轮子,当然会麻烦许多工作量也上升了一个指数。使用方式和上面写的差不多,这里就不重复写了,大家可以去看下具体的源码。
接下来讲讲DDLogic Event Watch机制在windows上是具体实现。它借助了
一层三元组[module_id,module_item,module_tag]来组成一个数据集(DataSet也可以称作Document)。
fastDelegate(一套开源的用c++实现的委托,比成员函数指针回调效率更高,有兴趣的可以自己去研究下(http://www.codeproject.com/Articles/7150/Member-Function-Pointers-and-the-Fastest-Possible))实现函数回调即调用到具体的事件处理函数。
操作系统的消息循环,包装成事件通知。
一图胜千言如下图:
通过图示具体实现如下:创建一个无窗口的句柄用来作为异步事件派发的基础,即底层最终是借助windows操作系统的消息循环来封装上层的Event事件的派发的,支持同步/异步派发(即SendMessage/PostMessage)。
生成一个全局唯一的三元组 DataSet实例用来存储各个业务模块观察者关心的唯一属性,类似mac TT的MKN(module key
name),存储格式按照三元组[module_id,module_item,module_tag],module_id对应业务模块ID,module_item对应登陆者信息,module_tag则对应MKN。在需要监听事件的地方调用
logic::GetLogic()->addWatch(this, MAKE_DELEGATE(this,&SessionChat::OnEvaluateWatch,serv::DID_EVALUTATE_CONFIG)
OnEvaluateWatch函数,即具体的事件处理函数。
- 在发布事件的地方调用
logic::GetLogic()->asyncPostEvent(serv::DID_EVALUTATE_CONFIG,module_item,TAG_EVALUTATE_CONFIG,pData);
3 PDU通信协议以及拆装包过程
(协议数据单元(Protocol Data Unit))PDU通信协议走的是二进制协议——即固定长度的协议头(16个字节) + 协议体方式。协议头包括整个协议包的大小、版本、模块号(moduleid)、命令号(commandid)等。模块号(moduleid)和业务模块对应,commandid对应具体的网络传输命令,这样做的好处是通过包头就可以知道这个包是属于那个业务模块处理的。对于协议这块的技术选型,我们当时也讨论了许多,我、大子腾、大子烨分别都提出了各自的解决方案,最终选择了大子腾的PDU协议,这个过程考虑的因素很多,所以我准备专门写一篇blog来分写下当时的情景,另外这篇博文还会分析PDU通信协议和chrome的对比,敬请期待…这里就不再深入描述了。
接下来讲下协议的拆包/装包与DDLogic的分层吧,虽然和具体通信协议交集不是那么大,但是想想还是放这里比较适。
如图:
从这幅图可以简单看出:
1.协议层:协议的拆包和协议任务的分配都封装在协议层,对业务层是透明的
2.协议层:协议拆包完成后,会生成一个task放入任务执行池,做任务派发的工作
3.业务层:根据module_id会分派到相应的业务模块
4.业务层:收到通知后,根据协议里面的command_id处理具体业务
4 TCP/IP长连接
大部分客户端应用程序的网络I/O模型采用阻塞模式就够用了,如遇到UI和网络需要异步,很常用的一种实现方式是启用多线程将网络数据的收发放到工作者线程中去。但是对网于IM这种应用场景来说阻塞模式就不适用了,试想聊天过程中你和服务器之间的交互是多么的频繁,你可以同时和几十位用户一起聊天,为了不阻塞难道每次聊天收发信息都需要建立一个线程来实现吗?这当然是不现实的,所以我们需要选择非阻塞模式异步socket IO。下面分别讲讲mac pro 和 windows的网络异步I/O的实现。
mac TT
mac TT得益于oc提供的良好平台目前借助的是CFNetwork和NSStream类实现TCP/IP 异步I/O socket,利用CFNetwork创建socket通信通道,利用NSStream传递单向的数据流,具体实现如下: 通过在NSStream中增加一个类方法扩展用于建立TCP/IP连接的一系列过程。
+ (void)getStreamsToHostNamed:(NSString *)hostName port:(NSInteger)port inputStream:(NSInputStream **)inputStream outputStream:(NSOutputStream **)outputStream
{CFHostRef host;CFReadStreamRef readStream;CFWriteStreamRef writeStream;host = CFHostCreateWithName(NULL, (__bridge CFStringRef) hostName);CFStreamCreatePairWithSocketToCFHost(NULL, host, (SInt32)port, &readStream, &writeStream);CFRelease(host);...
}
NSStream的两个派生类NSInputStream/NSOutputStream把整个socket通信抽象成了一个输入/输出流,通过oc平台的RunLoop将异步I/O事件通知到如下回调函数中:-(void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode
{switch(eventCode) {...}
}
回调通知的事件有:typedef NS_OPTIONS(NSUInteger, NSStreamEvent)
{NSStreamEventNone = 0,NSStreamEventOpenCompleted = 1UL << 0, //输入or输出流打开成功即socket连接建立成NSStreamEventHasBytesAvailable = 1UL << 1, //可以接受数据通知即输入缓冲区有内容了NSStreamEventHasSpaceAvailable = 1UL << 2, //可以发送数据通知即输出缓冲区空了NSStreamEventErrorOccurred = 1UL << 3, //错误通知NSStreamEventEndEncountered = 1UL << 4
};
以上具体代码可以参见mac tt代码NSStream+NStreamAddtion.m 以及 MGJMTalkClient.m(现在换成DDTcpClientManager.m)
咋样整个过程看下来是不都看不到socket的影子?这样做有啥好处呢?我自己的理解最大的好处是足够简单,对于调用者来说socket的整个过程是透明的,调用者不需要去理解操作系统对异步socket的I/O模型的支持,不需要去理解socket建立的整个过程等。类似的还有java的NIO甚至netty库都把整个socket过程隐藏在了一个流的概念中。 尽说好处了,差点忘记目前DDLogic的这种实现还有一个很大的问题(PS:是不是我们使用上有问题,同学们也可以帮忙看下),即connet TCP服务器的时候,回调事件里面肿么也收不到连接断失败的事件通知,导致整个TCP/IP流程不流畅,我们暂时采用了一个很龌龊的方式是:connet TCP服务器的时候设置个定时器,如果3秒钟没有收到连接建立成功的通知就认为连接失败了。这里的代码我担心也会为将来开发IOS TT埋下一个隐患,建议IOS开发同学去深入研究下或者寻找更好的技术选型。这里提供几个参考OC平台OS层的基于C的 BSD socket,这一层面提供的是socket原生态的方法,可以最大程度的控制网络编程,但是工作量也是最大的,和windows TT采用C/C++ 进行socket编程差不多。
OC平台Core Foundation层提供的CFNetwork C ,对OS层的BSD socket做了一层简单的包装,并且和系统的run loop结合起来,使得异步socket I/O实现起来很方便。上面mac TT用的其实就是这一层,所以这里还是需要去深入研究上面的坑。
OC平台最上层提供的Bonjour库,同学们可以自行去看下 Networking and Bonjour on iPhone
另辟蹊径不走OC平台提供的库,用libevent来实现,不过对于客户端来说使用该库可能略重,但是它良好的封装使得使用起来非常简单而且本身也是轻量级高性能的网络库,客户端选择POSIX select或windows select模型足够用了。
windows TT
windows TT是基于windows的WSAAsyncSelect模型建立的异步I/O,利用这个模型应用程序可在一个套接字上,接收以Windows消息为基础的网络事件通知,对于一个客户端程序已经足够用了。code projct有个对该模型很好的包装,同学们可以去看下( http://www.codeproject.com/Articles/3855/CAsyncSocketEx-Replacement-for-CAsyncSocket-with-p )。具体的实现windows并没有像oc平台这样好的抽象,但是实现起来其似乎也是差不多的思想,异步I/O消息通知的事件包括:FD_READ、FD_WRITE、FD_FORCEREAD、FD_CONNECT、FD_ACCEPT、FD_CLOSE这些,每个事件都相应的能通知到socket数据处理层就可以了。
比较下来两个系统平台对于TCP/IP异步socket IO的封装是差不多的,差别只是抽象的层次mac pro平台更加高一点,windows更加接近原生态的socket。 以上讲的是利用各自平台的网络库实现与服务器之间通信的技术.
接下来一起看下在内存中收发数据的两个buffer,因为数据传递是异步的,发送/接收数据都有可能是还没有真正发送/接收成功,所以需要在socket数据处理层维护两块buffer——inBuffer(接收数据缓存)/outBuffer(发送数据缓存)。以outBuffer(发送数据缓存)为例子,当你调用sendSocketData的时候,由于操作系统发送缓存区满了导致调用失败 ,由于是异步socket IO,系统的send过程并不会等待系统的发送缓存区空了再发送数据,而是会让send过程失败,等到系统的发送缓存区空的时候通过一个可写的事件通知你,所以在sendSocketData过程send失败的情况下,你所需要做的就是将数据缓存到outBuffer(发送数据缓存)中,等到可写事件收到了再将outBuffer(发送数据缓存)的数据发送出去,上个流程图吧:
5 业务模块拆分以及模块与模块之间通过接口交互
任何应用程序从业务角度讲都不是单一的,是由许多业务组装起来的(比如mac TT有登陆业务、文件传输业务、消息管理业务、会话管理业务等),那么这些业务需要如何有机的结合起来完成一个应用程序的所有需求呢?同学们应该会首先想到MVC(Model、View、Controller)/MVP(Model、View、Presenter),嗯没错,在OC平台中本身就是按照MVC来实现具体业务的开发的,DDLogic在MVC基础之上再加了一个Module的概念,为的是和前面:基于Task的任务调度、pdu通信协议以及拆装包过程、事件的订阅与发布、持久化数据以及基于此数据之上的一层数据监听机制(类似IDE工具调试的 Watch)这些有机的结合起来,回头看看是否还记得前面PDU协议面的module id和存储格式按照三元里面的module id呢?先上个简单的图吧:
DDLogic的思路是这样的(以登陆业务模块为例子):
// 现在的好像是DDLoginManager.m
1.所有模块的对外接口都通过DDLogic Modules Manager来管理。
2.模块与模块之间通过接口来调用,
模块内部实现对外不可见。比如外部只能调用DDloginModule的doLogin()来实现登陆操作,调用方是不知道具体如何实现登陆的。
3.每个独立的业务创建成为业务module——DDLoginModule,有一个全局唯一的业务模块ID——MODULE_ID_LOGIN
4.调用业务模块的接口函数通过全局唯一业务模块ID——MODULE_ID_LOGIN来,DDLoginModule*
loginModule = getDDModule(MODULE_ID_LOGIN);[loginModule doLogin];
5.支持插件管理,n(n >=1)个模块合作来组装成1个插件,并且支持动态加载/卸载(未实现的目标)
是不感觉DDLogic框架连这种东西也拿出来分享,没啥技术含量是个程序员都知道用类似的方式来拆分业务?是的你的感觉是对的,但是只对了一半,确实看起来没啥营养,但是请你再往下看你会发觉这个点才是整个DDLigic框架的精髓,如果说上面讲的每个设计点是DDLogic框架的一条条河流的话,那么这里就应该是它们的汇聚地,下面逐个点来分析
1.基于Task的任务调度:每个task的执行都会绑定一个module_id来知道具体是哪个模块的task在执行,并且通过module_id将任务执行的结果反馈给模块,这条河流就汇聚到module了。
2.事件的订阅与发布:每个事件都是通过指定module_id和MKN来订阅的,等到被订阅事件发布的时候同样通过指定module_id和MKN来通知出去,这条河流也汇聚到moudule了
3.pdu通信协议以及拆装包过程:通过解析pdu协议头获取module_id和command_id,然后生成NetworkTask派发到相应的模块中区,这条河流也汇聚到module了
4.持久化数据以及基于此数据之上的一层数据监听机制(类似IDE工具调试的 Watch):通过储格式三元组[module_id,module_item,module_tag],这条河流也汇聚到module了
6 持久化存储以及基于此数据模型数据监听机制(类似IDE调试工具的Watch)
DDLogic数据持久化用的是NSCoder可以支持基于业务模块的数据序列化/反序列化。基于数据模型的监听机制(暂且称作data watch机制)对一个应用程序来说是非常实用的,举个例子:你的好友管理模块的数据新增了一个好友,好友列表数据发生add事件,监听此数据变化的模块如好友列表控件、消息管理模块等都会收到相应的通知并作出及时的处理。
转自:http://www.jianshu.com/p/1359165bae4e
TeamTalk源码分析之win-client相关推荐
- TeamTalk源码分析之login_server
login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是80 ...
- TeamTalk源码分析(十一) —— pc客户端源码分析
--写在前面的话 在要不要写这篇文章的纠结中挣扎了好久,就我个人而已,我接触windows编程,已经六七个年头了,尤其是在我读研的三年内,基本心思都是花在学习和研究windows程序上 ...
- Teamtalk源码分析
(TeamTalk服务端源码分析一)TeamTalk服务端部署 - 灰信网(软件开发博客聚合) TeamTalk源码分析(四) -- 服务器端db_proxy_server源码分析_左雪菲的专栏-CS ...
- 【投屏】Scrcpy源码分析三(Client篇-投屏阶段)
Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...
- 【投屏】Scrcpy源码分析二(Client篇-连接阶段)
Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...
- TeamTalk源码分析(三) —— 服务器端的程序架构介绍
通过上一节的编译与部署,我们会得到TeamTalk服务器端以下部署程序: db_proxy_server file_server http_msg_server login_server msfs m ...
- TeamTalk源码分析(二) —— 服务器端的程序的编译与部署
写在前面的话,如果您在部署teamtalk过程中遇到困难,可以关注我的微信公众号『easyserverdev』,在微信公众号中回复『teamtalk部署求助』,我将与你取得联系并协助您解决.或者您对高 ...
- openxr runtime Monado 源码解析 源码分析:CreateInstance流程(设备系统和合成器系统)Compositor comp_main client compositor
monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码 ...
- 【投屏】Scrcpy源码分析一(编译篇)
Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...
最新文章
- Caddy-基于go的微型serve用来做反向代理和Gateway
- UVA10905孩子们的游戏
- Java 8 - Lambda从兴趣盎然到索然无味
- 前端面试中常见的算法问题
- 和为K的子数组—leetcode560
- node服务:日志、配置、路由与控制器
- 如何在PowerPoint演示文稿中使用iTunes音乐
- 2018-12 jdk_JDK 12新闻(2018年9月13日)
- 【算法分析与设计】快速排序
- 训练(training)和推理\推断(inference)的关系?
- 【华为云技术分享】LiteAI四大绝招,解锁物联网智能设备AI开发难关
- css根据屏幕大小切换样式
- 4K 海思 联咏 芯片_强悍芯片,重装来袭-海美迪H7 Plus旗舰4K电视盒子体验
- 在云上搭建大规模实时数据流处理系统
- “停课不停学”钉钉被刷一星在线求饶,这波公关我给满分
- Jenkins+GitLab+Docker持续集成LNMP
- winpe加载raid_为WinPE添加RAID卡驱动的几种步骤
- P - Balanced Stone Heaps
- 数字信号处理matlab相关实验
- 共模信号之静电是如何让影响电路工作的