目录

0.测试代码路径与说明

1.helloworld客户端的线程情况

1.0 线程总体分布

1.1线程一(main线程)

1.2 线程二(StatReport线程)

1.3 线程三(TC_TimeProvider线程)

1.4 线程四(CommunicatorEpoll线程)

1.5 线程五(异步处理线程)

1.6 线程六(异步处理线程)

1.7 线程七(异步处理线程)

1.8 帮助理解的一些图

2.线程相关源码分析

2.1 从调用流程看线程分布

2.1.1 图一(comm.stringToProxy)

2.1.2 图二(getServantProxy)

2.1.3 图三(Communicator::initialize)

2.1.4 图四(所有七个线程浮出水面)

2.1.5 同步方法进行调用

2.1.6 调用testHello

2.1.7 调用ServantProxy::tars_invoke()进行远程方法调用(以同步方式)

2.1.8 调用ServantProxy::tars_invoke_async()进行远程方法调用(以异步方式)

2.2 getStatReport()->createPropertyReport其实没有产生线程

2.3 _communicatorEpoll[i]->start()产生两个线程的细节阅读

2.3.1 初始化CommunicatorEpoll,​​CommunicatorEpoll本身继承自TC_Thread就有一个线程

2.3.2 CommunicatorEpoll::run()中调用CommunicatorEpoll::doTimeout()等函数用到了TC_TimeProvider中的线程

3 客户端流程分析

3.0 总览

3.1 执行stringToProxy

3.2 执行Communicator的初始化函数

3.3尝试获取ServantProxy和ObjectProxy指针数组(即ServantProxy)

3.4 获取ObjectProxy

3.4.1 获取CommunicatorEpoll

3.4.2 获取ObjectProxy

3.5 建立ObjectProxy与AdapterProxy的关系

3.6 继续完成ServantProxy的创建

4.同步调用

4.0 前言

4.1 网络线程发送请求

4.2 网络线程接收响应

​5.异步调用

5.1 初始化

5.1.1初始化逻辑

5.1.2 网络线程数与CommunicatorEpoll个数的关系

5.2 接收响应与函数回调

6.客户端的负载均衡实现方法


0.测试代码路径与说明

/home/tarsproto/TestApp/HelloClientcd /home/muten/module/TARS/TarsFramework/build
cmake .. -DCMAKE_BUILD_TYPE=debug
make -j8
make install

本文中的大量篇幅取自 TARS基金会-微服务开源框架TARS的RPC源码解析之初识TARS-C++客户端 中的内容,感谢TARS基金会!

1.helloworld客户端的线程情况

1.0 线程总体分布

1.1线程一(main线程)

1.2 线程二(StatReport线程)

1.3 线程三(TC_TimeProvider线程)

1.4 线程四(CommunicatorEpoll线程)

1.5 线程五(异步处理线程)

1.6 线程六(异步处理线程)

1.7 线程七(异步处理线程)

1.8 帮助理解的一些图

2.线程相关源码分析

2.1 从调用流程看线程分布

2.1.1 图一(comm.stringToProxy)

2.1.2 图二(getServantProxy)

2.1.3 图三(Communicator::initialize)

2.1.4 图四(所有七个线程浮出水面)

2.1.5 同步方法进行调用

2.1.6 调用testHello

2.1.7 调用ServantProxy::tars_invoke()进行远程方法调用(以同步方式)

2.1.8 调用ServantProxy::tars_invoke_async()进行远程方法调用(以异步方式)

本节主要关注异步调用逻辑,注意在2.1.5中调用的同步方法prx->testHello(sReq, sRsp)要改成prx->async_testHello.

2.2 getStatReport()->createPropertyReport其实没有产生线程

2.3 _communicatorEpoll[i]->start()产生两个线程的细节阅读

2.3.1 初始化CommunicatorEpoll,​​CommunicatorEpoll本身继承自TC_Thread就有一个线程

2.3.2 CommunicatorEpoll::run()中调用CommunicatorEpoll::doTimeout()等函数用到了TC_TimeProvider中的线程

3 客户端流程分析

3.0 总览

3.1 执行stringToProxy

在客户端程序中,一开始会执行下面的代码进行整个客户端代理的初始化:Communicator comm;
HelloPrx prx;
comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 1.1.1.1 -p 20001" , prx);先声明一个Communicator变量comm(其实不建议这么做)以及一个ServantProxy类的指针变量prx,在此
处,服务为Hello,因此声明一个HelloPrx prx。注意一个客户端只能拥有一个Communicator。为了能够获
得RPC的服务句柄,我们调用Communicator::stringToProxy(),并传入服务端的信息与prx变量,函数返
回后,prx就是RPC服务的句柄。
进入Communicator::stringToProxy()函数中,我们通过Communicator::getServantProxy()来依据
objectName与setName获取服务代理ServantProxy:/**
* 生成代理
* @param T
* @param objectName
* @param setName 指定set调用的setid
* @param proxy
*/
template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="")
{ServantProxy * pServantProxy = getServantProxy(objectName,setName);proxy = (typename T::element_type*)(pServantProxy);
}

3.2 执行Communicator的初始化函数

进入Communicator::getServantProxy(),首先会执行Communicator::initialize()来初始化
Communicator,需要注意一点,Communicator:: initialize()只会被执行一次,下一次执行
Communicator::getServantProxy()将不会再次执行Communicator:: initialize()函数:void Communicator::initialize()
{TC_LockT<TC_ThreadRecMutex> lock(*this);if (_initialized) //已经被初始化则直接返回return;......
}进入Communicator::initialize()函数中,在这里,将会new出上文介绍的与Communicator密切相关的类
ServantProxyFactory与n个CommunicatorEpoll,n为客户端的网络线程数,最小为1,最大为
MAX_CLIENT_THREAD_NUM:
void Communicator::initialize()
{......_servantProxyFactory = new ServantProxyFactory(this);......for(size_t i = 0; i < _clientThreadNum; ++i){_communicatorEpoll[i] = new CommunicatorEpoll(this, i);_communicatorEpoll[i]->start(); //启动网络线程}......
}在CommunicatorEpoll的构造函数中,ObjectProxyFactory被创建出来,这是构造图(1-2)关系的前提。
除此之外,还可以看到获取相应配置,创建并启动若干个异步回调后的处理线程。创建完成后,调用
CommunicatorEpoll::start()启动网络线程。至此,Communicator::initialize()顺利执行。说明:本段摘自「TARS基金会」的原创文章
————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/TARSFoundation/article/details/107208534

3.3尝试获取ServantProxy和ObjectProxy指针数组(即ServantProxy)

进入ServantProxyFactory::getServantProxy(),首先会加锁,从map<string, ServantPrx>
_servantProxy中查找目标,若查找成功直接返回.若查找失败,TARS需要构造出相应的ServantProxy,
ServantProxy的构造需要如图(1-3)所示的相对应的ObjectProxy作为构造函数的参数,由此可见,在
ServantProxyFactory::getServantProxy()中有如下获取ObjectProxy指针数组的代码:ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
assert(ppObjectProxy != NULL);
for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
{ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
}

3.4 获取ObjectProxy

看看如何获取ObjectProxy:

代码来到ObjectProxyFactory::getObjectProxy(),同样,会首先加锁,再从map<string,ObjectProxy*>
_objectProxys中查找是否已经拥有目标ObjectProxy,若查找成功直接返回。若查找失败,需要新建一个新的
ObjectProxy.通过类图可知,ObjectProxy需要一个CommunicatorEpoll对象进行初始化,由此关联管理自己
的CommunicatorEpoll,CommunicatorEpoll之后便可以通过getObjectProxy()接口获取属于自己的
ObjectProxy.

3.4.1 获取CommunicatorEpoll

3.4.2 获取ObjectProxy

3.5 建立ObjectProxy与AdapterProxy的关系

新建ObjectProxy的过程同样非常值得关注,在ObjectProxy::ObjectProxy()中,关键代码是:
_endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(),
sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));用ObjectProxy构造中 _endpointManger.reset来实现ObjectProxy与AdapterProxy的关系的建立.

逻辑说明:
每个ObjectProxy都有属于自己的EndpointManager负责管理到服务端的所有socket连接AdapterProxy,每个
AdapterProxy连接到一个提供相应服务的服务端物理机socket上。通过EndpointManager还可以以不同的负载
均衡方式获取与服务器的socket连接AdapterProxy。
ObjectProxy:: ObjectProxy()是图(1-6)或者图(1-8)中的略1,具体的代码流程如图(1-9)所示。
ObjectProxy创建一个EndpointManager对象,在EndpointManager的初始化过程中,依据客户端提供的信息,
直接创建连接到服务端物理机的TCP/UDP连接AdapterProxy或者从代理中获取服务端物理机socket列表后再创
建TCP/UDP连接AdapterProxy。————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/TARSFoundation/article/details/107208534

3.6 继续完成ServantProxy的创建

退出层层的函数调用栈,代码再次回 ServantProxyFactory::getServantProxy()--本文的2.1.2图,此时,
ServantProxyFactory已经获得相应的ObjectProxy数组ObjectProxy** ppObjectProxy,接着便可以调
用:
ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());进行ServantProxy的构造。构造完成便可以呈现出如图(2-1)的关系。在ServantProxy的构造函数中可以看
到,ServantProxy在新建一个EndpointManagerThread变量,这是对外获取路由请求的类,是TARS为调用逻辑
而提供的多种解决跨地区调用等问题的方案。同时可以看到:for(size_t i = 0;i < _objectProxyNum; ++i)
{(*(_objectProxy + i))->setServantProxy(this);
}建立了ServantProxy与ObjectProxy的相互关联关系. 剩下的是读取配置文件,获取相应的信息.
构造ServantProxy变量完成后,ServantProxyFactory::getServantProxy()获取一些超时参数,赋值给
ServantProxy变量,同时将其放进map<string, ServantPrx> _servantProxy中,方便下次直接查找获
取。
ServantProxyFactory::getServantProxy()将刚刚构造的ServantProxy指针变量返回给调用他的
Communicator::getServantProxy(),在Communicator::getServantProxy()中:ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName)
{……return _servantProxyFactory->getServantProxy(objectName,setName);
}直接将返回值返回给调用起Communicator::getServantProxy()的Communicator::stringToProxy()。可
以看到:template<class T> void stringToProxy(const string& objectName, T& proxy,const string&
setName="")
{ServantProxy * pServantProxy = getServantProxy(objectName,setName);proxy = (typename T::element_type*)(pServantProxy);
}Communicator::stringToProxy()将返回值强制转换为客户端代码中与HelloPrx prx同样的类型
HelloPrx。由于函数参数proxy就是prx的引用。那么实际就是将句柄prx成功初始化了,用户可以利用句柄
prx进行RPC调用了.————————————————
版权声明:本段为CSDN博主「TARS基金会」的原创文章的摘选,遵循CC 4.0 BY-SA版权协议,转载请附上原
文出处链接及本声明。
原文链接:https://blog.csdn.net/TARSFoundation/article/details/107208534

4.同步调用

4.0 前言

当我们获得一个ServantProxy句柄后,便可以进行RPC调用了.Tars提供四种RPC调用方式,分别是同步调用,
异步调用,promise调用与协程调用. 其中最简单最常见的RPC调用方式是同步调用,接下来,将简单分析Tars
的同步调用.现假设有一个MyDemo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个
string类型的变量,返回两个string类型变量的拼接结果。而且假设有两台服务器,socket标识分别是
192.168.106.129:34132与192.168.106.130:34132,设置客户端的网络线程数为3,那么执行如下代码:Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.129 -p 34132",
_proxy);
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.130 -p 34132",_proxy);————————————————
版权声明:本文为CSDN博主「TARS基金会」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/TARSFoundation/article/details/107208534经过上文关于客户端初始化的分析介绍可知,可以得出如下图所示的关系图:

获取StringServantPrx _proxy后,直接调用:

string str1(abc-), str2(defg), rStr;
int retCode = _proxy->append(str1, str2, rStr);

成功进行RPC同步调用后,返回的结果是rStr = “abc-defg”. 同样,我们先看看与同步调用相关的类图,如下图所示:

StringServantProxy是继承自ServantProxy的,StringServantProxy提供了RPC同步调用的接口Int32 append(),当用户发起同步调用_proxy->append(str1, str2, rStr)时,

所进行的函数调用过程如下图所示:

在函数StringServantProxy::append()中,程序会先构造ServantProxy::tars_invoke()所需要的参数,如请求包类型,RPC方法名,方法参数等,需要值得注意的是,

传递参数中有一个ResponsePacket类型的变量,在同步调用中,最终的返回结果会放置在这个变量上。接下来便直接调用了ServantProxy::tars_invoke()方法:

tars_invoke(tars::TARSNORMAL, "append", _os.getByteBuffer(), context, _mStatus, rep);

在ServantProxy::tars_invoke()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,同步调用的超时时间(单位为毫秒)等。最后,调用ServantProxy::invoke()进行远程方法调用。

无论同步调用还是各种异步调用,ServantProxy::invoke()都是RPC调用的必经之地。在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。客户端的每个caller线程都有属于自己的维护调用上下文的线程私有数据,如hash属性,消息染色信息。最关键的还是每条caller线程与每条客户端网络线程CommunicatorEpoll进行信息交互的桥梁——通信队列ReqInfoQueue数组,数组中的每个ReqInfoQueue元素负责与一条网络线程进行交互,如图(1-13)所示,图中橙色阴影代表数组ReqInfoQueue[],阴影内的圆柱体代表数组元素ReqInfoQueue。假如客户端create两条线程(下称caller线程)发起StringServant服务的RPC请求,且客户端网络线程数设置为2,那么两条caller线程各自有属于自己的线程私有数据请求队列数组ReqInfoQueue[],数组里面的ReqInfoQueue元素便是该数组对应的caller线程与两条网络线程的通信桥梁,一条网络线程对应着数组里面的一个元素,通过网络线程ID进行数组索引。整个关系有点像生产者消费者模型,生产者Caller线程向自己的线程私有数据**ReqInfoQueue[]**中的第N个元素ReqInfoQueue[N] push请求包,消费者客户端第N个网络线程就会从这个队列中pop请求包。

阅读代码可能会发现几个常量值,如MAX_CLIENT_THREAD_NUM=64,这是最大网络线程数,在图(1-13)中为单个请求队列数组ReqInfoQueue[]的最大size;

MAX_CLIENT_NOTIFYEVENT_NUM=2048,在图(1-13)中,可以看作caller线程的最大数量,或者请求队列数组ReqInfoQueue[]的最大数量

(反正两者一一对应,每个caller线程都有自己的线程私有数据ReqInfoQueue[])。

接着依据caller线程的线程私有数据进行第一次的负载均衡——选取ObjectProxy(即选择网络线程CommunicatorEpoll)和与之相对应的ReqInfoQueue:

ObjectProxy * pObjProxy = NULL;
ReqInfoQueue * pReqQ = NULL;
//选择网络线程
selectNetThreadInfo(pSptd, pObjProxy, pReqQ);

在ServantProxy::selectNetThreadInfo()中,通过轮询的形式来选取ObjectProxy与ReqInfoQueue。

退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的ReqInfoQueue,稍后通过pObjectProxy来发送RPC请求,请求信息会暂存在ReqInfoQueue中。

由于是同步调用,需要新建一个条件变量去监听RPC的完成,可见:

//同步调用 new 一个ReqMonitor
assert(msg->pMonitor == NULL);
if(msg->eType == ReqMessage::SYNC_CALL)
{msg->bMonitorFin = false;if(pSptd->_sched){msg->bCoroFlag = true;msg->sched  = pSptd->_sched;msg->iCoroId   = pSptd->_sched->getCoroutineId();}else{msg->pMonitor = new ReqMonitor;}
}

创建完条件变量,接下来往ReqInfoQueue中push_back()请求信息包msg,并通知pObjProxy所属的CommunicatorEpoll进行数据发送:

if(!pReqQ->push_back(msg,bEmpty))
{TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);delete msg;msg = NULL;pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);throw TarsClientQueueException("client queue full");
}pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

来到CommunicatorEpoll::notify()中,往请求事件通知数组NotifyInfo _notify[]中添加请求事件,通知CommunicatorEpoll进行请求包的发送。注意了,这个函数的作用仅仅是通知网络线程准备发送数据,通过TC_Epoller::mod()或者TC_Epoller::add()触发一个EPOLLIN事件,从而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的网络线程CommunicatorEpoll被唤醒,并设置唤醒后的epoll_event中的联合体epoll_data变量为&_notify[iSeq].stFDInfo:

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);if(_notify[iSeq].bValid){_ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);}else{_notify[iSeq].stFDInfo.iType   = FDInfo::ET_C_NOTIFY;_notify[iSeq].stFDInfo.p     = (void*)msgQueue;_notify[iSeq].stFDInfo.fd    = _notify[iSeq].eventFd;_notify[iSeq].stFDInfo.iSeq    = iSeq;_notify[iSeq].notify.createSocket();_notify[iSeq].bValid        = true;_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);}
}

就是经过这么一个操作,网络线程就可以被唤醒,唤醒后通过epoll_event变量可获得&_notify[iSeq].stFDInfo。接下来的请求发送与响应的接收会在后面会详细介绍。

随后,代码再次回到ServantProxy::invoke(),阻塞于:

if(!msg->bMonitorFin)
{TC_ThreadLock::Lock lock(*(msg->pMonitor));//等待直到网络线程通知过来if(!msg->bMonitorFin){msg->pMonitor->wait();}
}

等待网络线程接收到数据后,对其进行唤醒。
接收到响应后,检查是否成功获取响应,是则直接退出函数即可,响应信息在传入的参数msg中:

if(msg->eStatus == ReqMessage::REQ_RSP && msg->response.iRet == TARSSERVERSUCCESS)
{snprintf(pSptd->_szHost, sizeof(pSptd->_szHost), "%s", msg->adapter->endpoint().desc().c_str());//成功return;
}

若接收失败,会抛出异常,并删除msg:

TarsException::throwException(ret, os.str());

若接收成功,退出ServantProxy::invoke()后,回到ServantProxy::tars_invoke(),获取ResponsePacket类型的响应信息,并删除msg包:

rsp = msg->response;
delete msg;
msg = NULL;

代码回到StringServantProxy::append(),此时经过同步调用,可以直接获取RPC返回值并回到客户端中.

4.1 网络线程发送请求

上面提到,当在ServantProxy::invoke()中,调用CommunicatorEpoll::notify()通知网络线程进行请求发送后,接下来,网络线程的具体执行流程如下图所示:

由于CommunicatorEpoll继承自TC_Thread,在上文1.2.2节中的第2小点的初始化CommunicatorEpoll之后便调用其CommunicatorEpoll::start()函数启动网络线程,

网络线程在CommunicatorEpoll::run()中一直等待_ep.wait(iTimeout)。由于在上一节的描述中,在CommunicatorEpoll::notify(),caller线程发起了通知notify,网络

线程在CommunicatorEpoll::run()就会调用CommunicatorEpoll::handle()处理通知:

void CommunicatorEpoll::run()
{......try{int iTimeout = ((_waitTimeout < _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval);int num = _ep.wait(iTimeout);if(_terminate){break;}//先处理epoll的网络事件for (int i = 0; i < num; ++i){//获取epoll_event变量的data,就是1.3.1节中提过的&_notify[iSeq].stFDInfoconst epoll_event& ev = _ep.get(i);uint64_t data = ev.data.u64;if(data == 0){continue; //data非指针, 退出循环}handle((FDInfo*)data, ev.events);}}......}

在CommunicatorEpoll::handle()中,通过传递进来的epoll_event中的data成员变量获取前面被选中的ObjectProxy并调用其ObjectProxy::invoke()函数:

void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
{try{assert(pFDInfo != NULL);//队列有消息通知过来if(FDInfo::ET_C_NOTIFY == pFDInfo->iType){ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;ReqMessage * msg = NULL;try{while(pInfoQueue->pop_front(msg)){......try{msg->pObjectProxy->invoke(msg);}......}}......}......
}

在ObjectProxy::invoke()中将进行第二次的负载均衡,每个ObjectProxy通过EndpointManager可以以不同的负载均衡方式对AdapterProxy进行选取选择:

void ObjectProxy::invoke(ReqMessage * msg)
{......//选择一个远程服务的Adapter来调用AdapterProxy * pAdapterProxy = NULL;bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);......
}

在EndpointManager:: selectAdapterProxy()中,有多种负载均衡的方式来选取AdapterProxy,如getHashProxy(),getWeightedProxy(),getNextValidProxy()等。

获取AdapterProxy之后,便将选择到的AdapterProxy赋值给EndpointManager:: selectAdapterProxy()函数中的引用参数pAdapterProxy,随后执行:

void ObjectProxy::invoke(ReqMessage * msg)
{......msg->adapter = pAdapterProxy;pAdapterProxy->invoke(msg);
}

调用pAdapterProxy将请求信息发送出去。而在AdapterProxy::invoke()中,AdapterProxy将调用Transceiver::sendRequset()进行请求的发送。
至此,对应同步调用的网络线程发送请求的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。

4.2 网络线程接收响应

当网络线程CommunicatorEpoll接收到响应数据之后,如同之前发送请求那样, 在CommunicatorEpoll::run()中,程序获取活跃的epoll_event的变量,并将其中的epoll_data_t data传递给CommunicatorEpoll::handle():

//先处理epoll的网络事件
for (int i = 0; i < num; ++i)
{const epoll_event& ev = _ep.get(i);uint64_t data = ev.data.u64;if(data == 0){continue; //data非指针, 退出循环}handle((FDInfo*)data, ev.events);
}

接下来的程序流程如下图所示:

在CommunicatorEpoll::handle()中,从epoll_data::data中获取Transceiver指针,并调用CommunicatorEpoll::handleInputImp():

Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;
//先收包
if (events & EPOLLIN)
{try{
handleInputImp(pTransceiver);}catch(exception & e){
TLOGERROR("[TARS]CommunicatorEpoll::handle exp:"<<e.what()<<" ,line:"<<__LINE__<<endl);}catch(...){
TLOGERROR("[TARS]CommunicatorEpoll::handle|"<<__LINE__<<endl);}
}

在CommunicatorEpoll::handleInputImp()中,除了对连接的判断外,主要做两件事,调用Transceiver::doResponse()以及AdapterProxy::finishInvoke(ResponsePacket&),前者的工作是从socket连接中获取响应数据并判断接收的数据是否为一个完整的RPC响应包。后者的作用是将响应结果返回给客户端,同步调用的会唤醒阻塞等待在条件变量中的caller线程,异步调用的会在异步回调处理线程中执行回调函数。
在AdapterProxy::finishInvoke(ResponsePacket&)中,需要注意一点,假如是同步调用的,需要获取响应包rsp对应的ReqMessage信息,在Tars中,执行:

ReqMessage * msg = NULL;
// 获取响应包rsp对应的msg信息,并在超时队列中剔除该msg
bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

在找回响应包对应的请求信息msg的同时,将其在超时队列中剔除出来。接着执行:

msg->eStatus = ReqMessage::REQ_RSP;
msg->response = rsp;
finishInvoke(msg);

程序调用另一个重载函数AdapterProxy::finishInvoke(ReqMessage*),在AdapterProxy::finishInvoke(ReqMessage*)中,

不同的RPC调用方式会执行不同的动作,例如同步调用会唤醒对应的caller线程:

//同步调用,唤醒ServantProxy线程
if(msg->eType == ReqMessage::SYNC_CALL)
{if(!msg->bCoroFlag){assert(msg->pMonitor);TC_ThreadLock::Lock sync(*(msg->pMonitor));msg->pMonitor->notify();msg->bMonitorFin = true;}else{msg->sched->put(msg->iCoroId);}return ;
}

至此,对应同步调用的网络线程接收响应的工作就结束了,网络线程会回到CommunicatorEpoll::run()中,继续等待数据的收发。
综上,客户端同步调用的过程如下图所示。

5.异步调用

在Tars中,除了最常见的同步调用之外,还可以进行异步调用,异步调用可分三种:普通的异步调用,promise异步调用与协程异步调用,这里简单介绍普通的异步调用,看看其与上文介绍的同步调用有何异同。

异步调用不会阻塞整个客户端程序,调用完成(请求发送)之后,用户可以继续处理其他事情,等接收到响应之后,Tars会在异步处理线程当中执行用户实现好的回调函数。在这里,会用到《Effective C++》中条款35所介绍的“藉由Non-Virtual Interface手法实现Template Method模式”,用户需要继承一个XXXServantPrxCallback基类,并实现里面的虚函数,异步回调线程会在收到响应包之后回调这些虚函数,具体的异步调用客户端示例这里不作详细介绍,在Tars的Example中会找到相应的示例代码。

5.1 初始化

5.1.1初始化逻辑

本文第一章已经详细介绍了客户端的初始化,这里再简单提一下,在第一章的“1.2.2初始化代码跟踪- 2.执行Communicator的初始化函数”中,已经提到说,在每一个网络线程CommunicatorEpoll的初始化过程中,会创建_asyncThreadNum条异步线程,等待异步调用的时候处理响应数据:

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{......//异步线程数_asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));if(_asyncThreadNum == 0){_asyncThreadNum = 3;}if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM){_asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;}......//异步队列的大小size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));if(iAsyncQueueCap < 10000){iAsyncQueueCap = 10000;}......//创建异步线程for(size_t i = 0; i < _asyncThreadNum; ++i){_asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);_asyncThread[i]->start();}......
}

在开始讲述异步调用与接收响应之前,先看看大致的调用过程,与同步调用来个对比:

跟同步调用的示例一样,现在有一MyDemo.StringServer.StringServantObj的服务,提供一个RPC接口是append,传入两个string类型的变量,返回两个string类型变量的拼接结果。在执行tars2cpp而生成的文件中,定义了回调函数基类StringServantPrxCallback,用户需要public继承这个基类并实现自己的方法,例如:

class asyncClientCallback : public StringServantPrxCallback {
public:void callback_append(Int32 ret, const string& rStr) {cout <<  "append: async callback success and retCode is " << ret << " ,rStr is " << rStr << "\n";}void callback_append_exception(Int32 ret) {cout <<  "append: async callback fail and retCode is " << ret << "\n";}
};

然后,用户就可以通过proxy->async_append(new asyncClientCallback(), str1, str2)进行异步调用了,调用过程与上文的同步调用差不多,函数调用流程如下图所示,可以与同步调用的进行比较,看看同步调用与异步调用的异同。

在异步调用中,客户端发起异步调用_proxy->async_append(new asyncClientCallback(), str1, str2)后,在函数StringServantProxy::async_append()中,程序同样会先构造ServantProxy::tars_invoke_async()所需要的参数,如请求包类型,RPC方法名,方法参数等,与同步调用的一个区别是,还传递了承载回调函数的派生类实例。接下来便直接调用了ServantProxy::tars_invoke_async()方法:

tars_invoke_async(tars::TARSNORMAL,"append", _os.getByteBuffer(), context, _mStatus, callback)

在ServantProxy::tars_invoke_async()方法中,先创建一个ReqMessage变量msg,初始化msg变量,给变量赋值,如Tars版本号,数据包类型,服务名,RPC方法名,Tars的上下文容器,异步调用的超时时间(单位为毫秒)以及异步调用后的回调函数ServantProxyCallbackPtr callback(等待异步调用返回响应后回调里面的函数)等。最后,与同步调用一样,调用ServantProxy::invoke()进行远程方法调用。

在ServantProxy::invoke()中,继续填充传递进来的变量ReqMessage msg。此外,还需要获取调用者caller线程的线程私有数据ServantProxyThreadData,用来指导RPC调用。与同步调用一样,利用ServantProxy::selectNetThreadInfo()来轮询选取ObjectProxy(网络线程CommunicatorEpoll)与对应的ReqInfoQueue,详细可看同步调用中的介绍,注意区分客户端中的调用者caller线程与网络线程,以及之间的通信桥梁。

退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy类型的pObjProxy及其对应的ReqInfoQueue类型的ReqInfoQueue,在异步调用中,不需要建立条件变量来阻塞进程,直接通过pObjectProxy来发送RPC请求,请求信息会暂存在ReqInfoQueue中:

if(!pReqQ->push_back(msg,bEmpty))
{TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);delete msg;msg = NULL;pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);throw TarsClientQueueException("client queue full");
}pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

在之后,就不需要做任何的工作,退出层层函数调用,回到客户端中,程序可以继续执行其他动作.

5.1.2 网络线程数与CommunicatorEpoll个数的关系

Communicator根据client配置的线程数_clientThreadNum来决定创建CommunicatorEpoll的个数,_clientThreadNum是多少,就创建多少个CommunicatorEpoll(取值逻辑在Communicator::initialize()中,可看本文2.1.4中的源码或自己机器上的源码)

5.2 接收响应与函数回调

异步调用的请求发送过程与同步调用的一致,都是在网络线程中通过ObjectProxy去调用AdapterProxy来发送数据。但是在接收到响应之后,通过图(1-15)可以看到,在函数AdapterProxy::finishInvoke(ReqMessage*)中,同步调用会通过msg->pMonitor->notify()唤醒客户端的caller线程来接收响应包,而在异步调用中,则是如图(1-19)所示,CommunicatorEpoll与AsyncProcThread的关系如图(1-20)所示。

在函数AdapterProxy::finishInvoke(ReqMessage*)中,程序通过:

//异步回调,放入回调处理线程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

将信息包msg(带响应信息)放到异步回调处理线程中,在CommunicatorEpoll::pushAsyncThreadQueue()中,通过轮询的方式选择异步回调处理线程处理接收到的响应包,异步处理线程数默认是3,最大是1024。

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
    //先不考虑每个线程队列数目不一致的情况
    _asyncThread[_asyncSeq]->push_back(msg);
    _asyncSeq ++;
 
    if(_asyncSeq == _asyncThreadNum)
    {
        _asyncSeq = 0;
    }
}

选取之后,通过AsyncProcThread::push_back(),将msg包放在响应包队列AsyncProcThread::_msgQueue中,然后通过AsyncProcThread:: notify()函数通知本异步回调处理线程进行处理,AsyncProcThread:: notify()函数可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的异步处理线程被唤醒。

在AsyncProcThread::run()中,主要执行下面的程序进行函数回调:

if (_msgQueue->pop_front(msg))
{......try{ReqMessagePtr msgPtr = msg;msg->callback->onDispatch(msgPtr);}catch (exception& e){TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);}catch (...){TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);}
}

通过msg->callback,程序可以调用回调函数基类StringServantPrxCallback里面的onDispatch()函数。在StringServantPrxCallback:: onDispatch()中,分析此次响应所对应的RPC方法名,获取响应结果,并通过动态多态,执行用户所定义好的派生类的虚函数。通过ReqMessagePtr的引用计数,还可以将ReqNessage* msg删除掉,与同步调用不同,同步调用的msg的新建与删除都在caller线程中,而异步调用的msg在caller线程上构造,在异步回调处理线程中析构.

6.客户端的负载均衡实现方法

不同的网络线程CommunicatorEpoll均可以发起同一RPC请求,对于同一RPC服务,选取不同的ObjectProxy(或可认为选取不同的网络线程CommunicatorEpoll)是第一层的负载均衡,

而对于同一个被选中的ObjectProxy,通过EndpointManager选择不同的socket连接AdapterProxy(假如ObjectProxy有大于1个的AdapterProxy,如图(1-4)的fooObjectProxy)是第二层的负载均衡.

可回顾本文5.1.2中的内容.

————————————————
版权声明:本文中很大篇幅为CSDN博主「TARS基金会」的原创文章节选内容,遵循CC 4.0 BY-SA版权协议,转载请附上TARS基金会的原文出处链接及本声明。
原文链接:https://blog.csdn.net/TARSFoundation/article/details/107208534

【TARS】TARS-CPP客户端学习二相关推荐

  1. 【TARS】TARS-CPP客户端学习一

    目录 0.入门学习链接 1.客户端中涉及的类的介绍 1.1 TarsClient(客户端类) 1.2 Communicator(通信器类) 1.3 ServantProxyFactory&Se ...

  2. ios客户端学习-二维码扫描和应用跳转

    转载至链接:http://sindrilin.com/ios-dev/2015/11/01/二维码扫描和应用跳转.html 序言 在iOS7之前,大部分应用中使用的二维码扫描是第三方的扫描框架,例如Z ...

  3. 开源中国iOS客户端学习

    开源中国iOS客户端学习 续写前言 <开源中国iOS客户端学习>续写前系列博客    http://blog.csdn.net/column/details/xfzl-kykhd.html ...

  4. 开源中国iOS客户端学习——(十二)用户登陆

    ---------------------------------------------------------------------------------------------------- ...

  5. hadoop hive hbase 入门学习 (二)

    hadoop 自学系列                hadoop hive hbase 入门学习 (一) hadoop安装.hdfs学习及mapreduce学习 hadoop 软件下载 (hadoo ...

  6. Docker学习二:Docker镜像与容器

    前言 本次学习来自于datawhale组队学习: 教程地址为: https://github.com/datawhalechina/team-learning-program/tree/master/ ...

  7. 深度学习二(Pytorch物体检测实战)

    深度学习二(Pytorch物体检测实战) 文章目录 深度学习二(Pytorch物体检测实战) 1.PyTorch基础 1.1.基本数据结构:Tensor 1.1.1.Tensor数据类型 1.1.2. ...

  8. webrtc服务器janus通信方法学习二

    webrtc服务器janus通信方法学习二 网关部署了一个客户端可以利用的接口.这个janus.js库以透明的方式使用它,其中与之交流的接口都封装好了,也可以自己使用其他方式进行通信,我不使用js接口 ...

  9. Elasticsearch 学习(二).实战使用

    Elasticsearch 学习(二).实战使用 参考:http://www.passjava.cn/#/01.PassJava/02.PassJava_Architecture/15.Elastic ...

最新文章

  1. 回调函数与Delphi的事件模型
  2. 【转】C# 中的委托和事件
  3. 一个老忘且非常有用的jquery动画方法 网页上卷
  4. boost::multiprecision模块测试 cpp_dec_float_50 是否符合我们的 概念要求
  5. JavaWeb学习总结(五十)——文件上传和下载
  6. 漫步者蓝牙驱动_有什么平价好用的蓝牙耳机?双11不踩雷高性价比蓝牙耳机推荐...
  7. primefaces 查询 点击按钮 加载 动画 ajax loader
  8. 小型微型计算机系统退回修改,小型微型计算机系统
  9. FD.io VPP startup.conf配置文件示例:安装后第一次配置
  10. 你还在盲目做抖音吗?
  11. Linux命令解释之sed
  12. php简单富文本,JS简易版富文本编辑器实现代码
  13. 解析dex2oat的实现
  14. 银行的USB KEY里面包含的是什么
  15. DELL存储SCv3020组件概念
  16. 看不到可用网络,网络适配器出现黄色感叹号(代码56)
  17. JavaScript逻辑训练题(一)
  18. 通过跟踪源码证明在Java中通过执行Start()方法创建线程
  19. python第五章总结
  20. Visual studio2022还原VC6环境设置

热门文章

  1. 101/103/104规约应用典型问题例举【转】
  2. MUSIC算法推导及代码实现
  3. bzero与memset
  4. Linux 高并发服务器实战 - 2 Linux多进程开发
  5. TestBench编写_激励产生
  6. 数据挖掘算法与原理(第三版)
  7. 踔厉奋发,笃行不怠,我的第一篇CSDN文章
  8. 深度学习:自编码进行模式分类
  9. 用python来玩科学计算
  10. 基于Java的课程管理系统