本节将介绍第一个实现具体传输功能的类TSocket,这个类是基于TCP socket实现TTransport的接口。下面具体介绍这个类的相关函数功能实现。
1.构造函数
分析一个类的功能首先看它的定义和构造函数实现,先看看它的定义:

class TSocket : public TVirtualTransport<TSocket> { ......}

由定义可以看书TSocket继承至虚拟传输类,并且把自己当做模板参数传递过去,所以从虚拟传输类继承下来的虚拟函数(如read_virt)调用非虚拟函数(如read)就是TSocket自己实现的。
TSocket类的构造函数有4个,当然还有一个析构函数。四个构造函数就是根据不同的参数来构造,它们的声明如下:

  TSocket();//所有参数都默认TSocket(std::string host, int port);//根据主机名和端口构造一个socketTSocket(std::string path);//构造unix域的一个socketTSocket(int socket);//构造一个原始的unix句柄socket

四个构造函数分别用于不同的情况下来产生不同的TSocket对象,不过这些构造函数都只是简单的初始化一些最基本的成员变量,而没有真正的连接socket。它们初始化的变量基本如下:

  TSocket::TSocket() :host_(""),port_(0),path_(""),socket_(-1),connTimeout_(0),sendTimeout_(0),recvTimeout_(0),lingerOn_(1),lingerVal_(0),noDelay_(1),maxRecvRetries_(5) {recvTimeval_.tv_sec = (int)(recvTimeout_/1000);recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;}

大部分简单的参数都采用初始化列表初始化了,需要简单计算的就放在函数体内初始化,其他几个都是这种情况。下面需要单独介绍一下的是unix domain socket。
socket API原本是为网络通讯设计的,但后来在socket的框架上发展出一种IPC机制,就是UNIX Domain Socket。虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
UNIX Domain Socket是全双工的,API接口语义丰富,相比其它IPC机制有明显的优越性,目前已成为使用最广泛的IPC机制,比如X Window服务器和GUI程序之间就是通过UNIX Domain Socket通讯的。
使用UNIX Domain Socket的过程和网络socket十分相似,也要先调用socket()创建一个socket文件描述符,address family指定为AF_UNIX,type可以选择SOCK_DGRAM或SOCK_STREAM,protocol参数仍然指定为0即可。
UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。
打开连接函数open
首先看这个函数的代码实现,如下:

  void TSocket::open() {if (isOpen()) {//如果已经打开就直接返回return;}if (! path_.empty()) {//如果unix路径不为空就打开unix domian socketunix_open();} else {local_open();//打开通用socket}}

Open函数又根据路径为不为空(不为空就是unix domain socket)调用相应的函数来继续打开连接,首先看看打开unix domain socket,代码如下:

  void TSocket::unix_open(){if (! path_.empty()) {//保证path_不为空// Unix Domain SOcket does not need addrinfo struct, so we pass NULLopenConnection(NULL);//调用真正的打开连接函数}}

由代码可以看出,真正实现打开连接的函数是openConnection,这个函数根据传递的参数来决定是否是打开unix domain socket,实现代码如下(这个函数代码比较多,其中除了错误部分代码省略):

  void TSocket::openConnection(struct addrinfo *res) {if (isOpen()) {return;//如果已经打开了直接返回}if (! path_.empty()) {//根据路径是否为空创建不同的socketsocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);//创建unix domain socket} else {socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);//创建通用的网络通信socket}if (sendTimeout_ > 0) {//如果发生超时设置大于0就调用设置发送超时函数设置发送超时setSendTimeout(sendTimeout_);}if (recvTimeout_ > 0) {//如果接收超时设置大于0就调用设置接收超时函数设置接收超时setRecvTimeout(recvTimeout_);}setLinger(lingerOn_, lingerVal_);//设置优雅断开连接或关闭连接参数setNoDelay(noDelay_);//设置无延时#ifdef TCP_LOW_MIN_RTOif (getUseLowMinRto()) {//设置是否使用较低的最低TCP重传超时 int one = 1;setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));}#endif//如果超时已经存在设置连接为非阻塞int flags = fcntl(socket_, F_GETFL, 0);//得到socket_的标识if (connTimeout_ > 0) {//超时已经存在if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {//设置为非阻塞}} else {if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {//设置为阻塞}}// 连接socketint ret;if (! path_.empty()) {//unix domain socket#ifndef _WIN32 //window不支持struct sockaddr_un address;socklen_t len;if (path_.length() > sizeof(address.sun_path)) {//path_长度不能超过最长限制}address.sun_family = AF_UNIX;snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());len = sizeof(address);ret = connect(socket_, (struct sockaddr *) &address, len);//连接unix domain socket#else//window不支持unix domain socket#endif} else {ret = connect(socket_, res->ai_addr, res->ai_addrlen);//连接通用的非unix domain socket}if (ret == 0) {//失败了就会执行后面的代码,用poll来监听写事件goto done;//成功了就直接跳转到完成处}struct pollfd fds[1];//定于用于poll的描述符std::memset(fds, 0 , sizeof(fds));//初始化为0fds[0].fd = socket_;//描述符为socketfds[0].events = POLLOUT;//接收写事件ret = poll(fds, 1, connTimeout_);//调用poll,有一个超时值if (ret > 0) {// 确保socket已经被连接并且没有错误被设置int val;socklen_t lon;lon = sizeof(int);int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);//得到错误选项参数if (val == 0) {// socket没有错误也直接到完成处了goto done;}} else if (ret == 0) {// socket 超时//相应处理代码省略} else {// poll()出错了,相应处理代码省略}done:fcntl(socket_, F_SETFL, flags);//设置socket到原来的模式了(阻塞)if (path_.empty()) {//如果是unix domain socket就设置缓存地址setCachedAddress(res->ai_addr, res->ai_addrlen);}}

上面这个函数代码确实比较长,不过还好都是比较简单的代码实现,没有什么很绕的代码,整个流程也很清晰,在代码中也有比较详细的注释了。下面继续看通用socket打开函数local_open(它也真正的执行打开功能也是调用上面刚才介绍的那个函数,只是传递了具体的地址信息):

  void TSocket::local_open(){#ifdef _WIN32TWinsockSingleton::create();//兼容window平台#endif // _WIN32if (isOpen()) {//打开了就直接返回return;}if (port_ < 0 || port_ > 0xFFFF) {//验证端口是否为有效值throw TTransportException(TTransportException::NOT_OPEN, "Specified port is invalid");}struct addrinfo hints, *res, *res0;res = NULL;res0 = NULL;int error;char port[sizeof("65535")];std::memset(&hints, 0, sizeof(hints));//内存设置为0hints.ai_family = PF_UNSPEC;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;sprintf(port, "%d", port_);error = getaddrinfo(host_.c_str(), port, &hints, &res0);//根据主机名得到所有网卡地址信息// 循环遍历所有的网卡地址信息,直到有一个成功打开for (res = res0; res; res = res->ai_next) {try {openConnection(res);//调用打开函数break;//成功就退出循环} catch (TTransportException& ttx) {if (res->ai_next) {//异常处理,是否还有下一个地址,有就继续close();} else {close();freeaddrinfo(res0); // 清除地址信息内存和资源throw;//抛出异常}}}freeaddrinfo(res0);//释放地址结构内存}

整个local_open函数就是根据主机名得到所有的网卡信息,然后依次尝试打开,直到打开一个为止就退出循环,如果所有都不成功就抛出一个异常信息。
读函数read
在实现读函数的时候需要注意区分返回错误为EAGAIN的情况,因为当超时和系统资源耗尽都会产生这个错误(没有明显的特征可以区分它们),所以Thrift在实现的时候设置一个最大的尝试次数,如果超过这个了这个次数就认为是系统资源耗尽了。下面具体看看read函数的实现,代码如下(省略一些参数检查和错误处理的代码):

  uint32_t TSocket::read(uint8_t* buf, uint32_t len) {int32_t retries = 0;//重试的次数uint32_t eagainThresholdMicros = 0;if (recvTimeout_) {//如果设置了接收超时时间,那么计算最大时间间隔来判断是否系统资源耗尽eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);}try_again:struct timeval begin;if (recvTimeout_ > 0) {gettimeofday(&begin, NULL);//得到开始时间} else {begin.tv_sec = begin.tv_usec = 0;//默认为0,不需要时间来判断是超时了}int got = recv(socket_, cast_sockopt(buf), len, 0);//从socket接收数据int errno_copy = errno; //保存错误代码++g_socket_syscalls;//系统调用次数统计加1if (got < 0) {//如果读取错误if (errno_copy == EAGAIN) {//是否为EAGAINif (recvTimeout_ == 0) {//如果没有设置超时时间,那么就是资源耗尽错误了!抛出异常throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");}struct timeval end;gettimeofday(&end, NULL);//得到结束时间,会改变errno,所以前面需要保存就是这个原因uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)//计算消耗的时间+ (((uint64_t)(end.tv_usec - begin.tv_usec))));if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {if (retries++ < maxRecvRetries_) {//重试次数还小于最大重试次数usleep(50);//睡眠50毫秒goto try_again;//再次尝试从socket读取数据} else {//否则就认为是资源不足了throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (unavailable resources)");}} else {//推测为超时了throw TTransportException(TTransportException::TIMED_OUT, "EAGAIN (timed out)");}}if (errno_copy == EINTR && retries++ < maxRecvRetries_) {//如果是中断并且重试次数没有超过goto try_again;//那么重试}#if defined __FreeBSD__ || defined __MACH__if (errno_copy == ECONNRESET) {//FreeBSD和MACH特殊处理错误代码return 0;}#endif#ifdef _WIN32if(errno_copy == WSAECONNRESET) {//win32平台处理错误代码return 0; // EOF}#endifreturn got;}

整个读函数其实没有什么特别的,主要的任务就是错误情况的处理,从这里可以看出其实实现一个功能是很容易的,但是要做到稳定和容错性确实需要发很大功夫。
写函数write
写函数和读函数实现差不多,主要的代码还是在处理错误上面,还有一点不同的是写函数写的内容可能一次没有发送完毕,所以是在一个while循环中一直发送直到指定的内容全部发送完毕。代码实现如下:

  void TSocket::write(const uint8_t* buf, uint32_t len) {uint32_t sent = 0;//记录已经发送了的字节数while (sent < len) {//是否已经发送了指定的字节长度uint32_t b = write_partial(buf + sent, len - sent);//调部分写入函数if (b == 0) {//发送超时过期了throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");}sent += b;//已经发送的字节数}}上面的函数还没有这种的调用send函数发送写入的内容,而是调用部分写入函数write_partial写入,这个函数实现如下:uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {uint32_t sent = 0;int flags = 0;#ifdef MSG_NOSIGNAL //使用这个代替SIGPIPE 错误,代替我们检查返回EPIPE错误条件和关闭socket的情况flags |= MSG_NOSIGNAL;//设置这个标志位#endif int b = send(socket_, const_cast_sockopt(buf + sent), len - sent, flags);//发送数据++g_socket_syscalls;//系统调用计数加1if (b < 0) { //错误处理if (errno == EWOULDBLOCK || errno == EAGAIN) {return 0;//应该阻塞错误直接返回}int errno_copy = errno;//保存错误代码if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) {close();//连接错误关闭掉socket}}return b;//返回写入的字节数}

这个写入的实现逻辑和过程也是非常简单的,只是需要考虑到各种错误的情况并且相应的处理之。
其他函数
TSocket类还有一些其他函数,不过功能都比较简单,比如设置一些超时和得到一些成员变量值的函数,哪些函数一般都是几句代码完成了。

thrift之TTransport层的堵塞的套接字I/O传输类TSocket相关推荐

  1. Day09: socket网络编程-OSI七层协议,tcp/udp套接字,tcp粘包问题,socketserver

    今日内容:socket网络编程     1.OSI七层协议     2.基于tcp协议的套接字通信     3.模拟ssh远程执行命令     4.tcp的粘包问题及解决方案     5.基于udp协 ...

  2. Java基础23 网络编程 socket套接字流 TCP传输总结

    一.网络编程的概念 1.计算机网络:将不同地区的计算机,使用网络来进行连接 实现不同地区的数据的交互与共享(互联时代) 2. 网络编程的三要素:IP地址 端口号 协议 3. ip地址:是在网络连接中 ...

  3. Python网络编程—TCP套接字之HTTP传输

    HTTP协议 (超文本传输协议) 1.用途 : 网页获取,数据的传输 2.特点: 应用层协议,传输层使用tcp传输 简单,灵活,很多语言都有HTTP专门接口 无状态,协议不记录传输内容 http1.1 ...

  4. 套接字编程--1(UDP协议编程,端口号,传输层协议,网络字节序)

    传输层的协议: ip地址: 在网络中唯一标识一台主机 IPV4:uint32_t DHCP NAT IPV6 : uint8_t addr[16] -向前并不兼容IPV4 每一条数据都必须包含源地址和 ...

  5. 原始套接字学习笔记(1)

    一般来说,我们会用到如下三种套接字: TCP:SOCK_STREAM套接字 UDP:SOCK_DGRAM套接字 原始套接字:SOCK_RAW套接字 对于TCP和UDP两种套接字,相对来说只要配置好IP ...

  6. Linux原始套接字学习总结

    Linux网络编程:原始套接字的魔力[上] http://blog.chinaunix.net/uid-23069658-id-3280895.html 基于原始套接字编程        在开发面向连 ...

  7. Linux原始套接字实现分析---转

    http://blog.chinaunix.net/uid-27074062-id-3388166.html 本文从IPV4协议栈原始套接字的分类入手,详细介绍了链路层和网络层原始套接字的特点及其内核 ...

  8. Unix 网络编程(四)- 典型TCP客服服务器程序开发实例及基本套接字API介绍

    转载:http://blog.csdn.net/michael_kong_nju/article/details/43457393 写在开头: 在上一节中我们学习了一些基础的用来支持网络编程的API, ...

  9. Linux 套接字编程中的 5 个隐患

    Linux 套接字编程中的 5 个隐患 (2011-05-03 17:50) 分类: Socket编程 在 4.2 BSD UNIX® 操作系统中首次引入,Sockets API 现在是任何操作系统的 ...

最新文章

  1. KETTLE数据上传
  2. 《中国人工智能学会通讯》——12.58 大数据不确定性学习的研究
  3. php修改html,关于html:用PHP设置innerHTML?
  4. windows下 安装 rabbitMQ 及操作常用命令
  5. Thinkphp ajax分页
  6. 在webclient UI page里嵌入external view
  7. MQ保证消息的可靠性传输
  8. Eclipse Java注释模板设置详解
  9. matlab三角形分割,MATLAB 2014b及以上版本中带有画家渲染器的三角形拆分补丁
  10. webstore报 ESLint: Expected space or tab after '//' in comment.(spaced-comment)
  11. 运维管理成中小企业“心头大患” 飞塔“安接入”一步解决
  12. 深入剖析Android音频(四)AudioTrack
  13. 关于mssql的学习体会,仅供参考!
  14. 显卡的测试软件是什么,你的显卡是什么水平,用这个软件就知道了
  15. 北京五大不可不去的隐秘餐厅
  16. 深入了解 Squid 代理服务器及应用
  17. [经验教程]拼多多直接免拼是什么意思?
  18. 应用于音箱领域中的音频功放IC型号推荐
  19. 科技云报道:“奇袭”混合云,青云QingCloud站上C位
  20. org.hibernate.MappingException: Unknown entity 该怎么解决

热门文章

  1. .net mvc 获取url中controller和action
  2. 【Val】对于博客使用些许意见
  3. vrml行走和静止的人代码_CAE二次开发的核心不是代码
  4. 基于模板的通用代码生成器LKGenerator(四)-核心技术之各种数据库查询表信息sql整理...
  5. 开发一款高端大气上档次的android应用需要必备的知识——记于2013年末
  6. myeclipse里使用fat jar生成可执行jar
  7. redis源码dict.c simple reading
  8. DELL T410服务器U盘安装Centos7
  9. java线程——详解Callable、Future和FutureTask
  10. 阿里云、天津开启多项合作,区域经济大脑落地津南