上一篇BLOG已经介绍了revolver RUDP的传输性能、基本的框架和接口,这篇文章我重点讲述RUDP的实现细节。在RUDP的模块中最为重要的是其收发缓冲控制和CCC发送窗口控制、CCC发送慢启动控制、CCC快恢复控制等几个过程。(关于RUDP源代码实现在revolver开源项目的RUDP目录:点击打开链接)

数据块定义

在RUDP模块中,所有发送的数据被定义成RUDPRecvSegment 和 RUDPSendSegment结构,其中RUDPSendSegment是发送块定义,RUDPRecvSegment 是接收块定义。如下:
//发送数据片
typedef struct tagRUDPSendSegment
{uint64_t   seq_;                          //块序号uint64_t    push_ts_;              //进入发送队列的时刻uint64_t  last_send_ts_;             //最后一次发送的时刻uint16_t  send_count_;               //发送的次数uint8_t       data_[MAX_SEGMENT_SIZE];       //块数据    uint16_t    data_size_;                    //块数据长度
}RUDPSendSegment;
typedef struct tagRUDPRecvSegment
{uint64_t   seq_;                          //块序号uint8_t     data_[MAX_SEGMENT_SIZE];       //块数据uint16_t    data_size_;                //块数据长度
}RUDPRecvSegment;

块的最大尺寸为MAX_SEGMENT_SIZE = 1408(不能大于MTU,一般MTU是1492)。为了加快内存分配的速度,RUDP模块中使用了对象池来保证块对象的快速申请,对象池定义:

ObjectPool<RUDPSendSegment, RUDP_SEGMENT_POOL_SIZE>  SENDSEGPOOL;
ObjectPool<RUDPRecvSegment, RUDP_SEGMENT_POOL_SIZE>   RECVSEGPOOL;#define GAIN_SEND_SEG(seg) \RUDPSendSegment* seg = SENDSEGPOOL.pop_obj();\seg->reset()#define RETURN_SEND_SEG(seg) \if(seg != NULL)\SENDSEGPOOL.push_obj(seg)#define GAIN_RECV_SEG(seg) \RUDPRecvSegment* seg = RECVSEGPOOL.pop_obj(); \seg->reset()#define RETURN_RECV_SEG(seg) \if(seg != NULL)\RECVSEGPOOL.push_obj(seg)
几个宏是作为块申请和释放的宏。以上就是块的定义介绍,更具体的只有去查看相关源代码了。

发送缓冲区

发送缓冲区,定义如下:
class RUDPSendBuffer
{
public:...//发送数据接口int32_t               send(const uint8_t* data, int32_t data_size);//ACK处理void                on_ack(uint64_t ack_seq);//NACK处理void               on_nack(uint64_t base_seq, const LossIDArray& loss_ids);//定时器接口void             on_timer(uint64_t now_ts);//检查BUFFER是否可以写入数据void                check_buffer();...public:uint64_t           get_buffer_seq() {return buffer_seq_;};//设置NAGLE算法  void                set_nagle(bool nagle = true){nagle_ = nagle;};bool                get_nagle() const {return nagle_;};//设置发送缓冲区的大小void             set_buffer_size(int32_t buffer_size){buffer_size_ = buffer_size;};int32_t              get_buffer_size() const {return buffer_size_;};...protected:IRUDPNetChannel*    net_channel_;//正在发送的数据片SendWindowMap        send_window_;//正在发送的报文的丢包集合LossIDSet            loss_set_;//等待发送的数据片SendDataList        send_data_;//发送缓冲区的大小int32_t                buffer_size_;//当前缓冲数据的大小int32_t             buffer_data_size_;//当前BUFFER中最大的SEQuint64_t         buffer_seq_;//当前WINDOW中最大的SEQuint64_t           cwnd_max_seq_;//接收端最大的SEQuint64_t           dest_max_seq_;//速度控制器RUDPCCCObject*     ccc_;//是否启动NAGLE算法bool              nagle_;
}

其中send函数是数据写入函数,在这个函数里面,缓冲区对象先会对写入的数据进行报文拼接成发送块,让发送数据尽量接近MAX_SEGMENT_SIZE,如果发送的数据大于MAX_SEGMENT_SIZE,也会进行MAX_SEGMENT_SIZE为单元的分片。然后写入到对应的发送缓冲列表send_data_当中。最后尝试进行网络发送。伪代码如下:

int32_t RUDPSendBuffer::send(const uint8_t* data, int32_t data_size)
{int32_t copy_pos = 0;int32_t copy_size = 0;uint8_t* pos = (uint8_t *)data;uint64_t now_timer = CBaseTimeValue::get_time_value().msec();if(!send_data_.empty()) //拼接报文,让其接近MAX_SEGMENT_SIZE{//取出send_data_中的最后一片,如果它没有达到MAX_SEGMENT_SIZE,数据追加到MAX_SEGMENT_SIZE大小为止。RUDPSendSegment* last_seg = send_data_.back();if(last_seg != NULL && last_seg->data_size_ < MAX_SEGMENT_SIZE){copy_size = MAX_SEGMENT_SIZE - last_seg->data_size_;if( copy_size > data_size) copy_size = data_size;memcpy(last_seg->data_ + last_seg->data_size_, pos, copy_size);copy_pos += copy_size;pos += copy_size;last_seg->data_size_ += copy_size;}}//剩余数据分成MAX_SEGMENT_SIZE为单位的若干分片while(copy_pos < data_size){GAIN_SEND_SEG(last_seg);//设置初始化的的时刻last_seg->push_ts_ = now_timer; //记录压入时间戳last_seg->seq_ = buffer_seq_;buffer_seq_ ++;//确定拷贝的块长度copy_size = (data_size - copy_pos);if(copy_size > MAX_SEGMENT_SIZE)copy_size = MAX_SEGMENT_SIZE;memcpy(last_seg->data_, pos, copy_size);copy_pos += copy_size;pos += copy_size;last_seg->data_size_ = copy_size;//压入发送队列send_data_.push_back(last_seg);}//记录缓冲区的数据长度buffer_data_size_ += copy_pos;//尝试发送,立即发送attempt_send(now_timer);return copy_pos;
}

这里会触发attempt_send()函数。这个函数是尝试发送的核心函数。在后面的几个过程里面也会调用到这个函数。以上就是发送函数的过程。

除了发送函数以外,发送缓冲区对象还会响应来自网络的on_ack和on_nack消息,这两个消息分别是处理正常的状态报告和丢包情况下的网络报告。如果收到on_ack,缓冲区对象会把已经接收端报告过来的报文ID全部从发送窗口中删除,然后调用attempt_send尝试新的块发送。如果收到的是on_nack,表示对端有丢包,则先会记录丢包的ID到loss_set中,再调用on_ack进行处理。
触发attempt_send还有可能是定时器Timer,定时器每5MS会检查一下发送缓冲区,并调用attempt_send尝试发送并且会检查缓冲区是否可写。

attempt_send函数伪代码如下:
void RUDPSendBuffer::attempt_send(uint64_t now_timer)
{uint32_t cwnd_size = send_window_.size();uint32_t rtt = ccc_->get_rtt();uint32_t ccc_cwnd_size = ccc_->get_send_window_size();RUDPSendSegment* seg = NULL;uint32_t send_packet_number  = 0;if(!loss_set_.empty()) //重发丢失的片段{//发送丢包队列中的报文uint64_t loss_last_ts = 0;uint64_t loss_last_seq = 0;for(LossIDSet::iterator it = loss_set_.begin(); it != loss_set_.end();) //检查丢失报文是否要重发{if(send_packet_number >= ccc_cwnd_size) //超过发送窗口break;SendWindowMap::iterator cwnd_it = send_window_.find(*it);if(cwnd_it != send_window_.end() && cwnd_it->second->last_send_ts_ + rtt < now_timer) //丢失报文必须在窗口中{seg = cwnd_it->second;//UDP网络发送net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);if(cwnd_max_seq_ < seg->seq_)cwnd_max_seq_ = seg->seq_;//判断是否可以更改TSif(loss_last_ts < seg->last_send_ts_){loss_last_ts = seg->last_send_ts_;if(loss_last_seq < *it) loss_last_seq = *it;}seg->last_send_ts_ = now_timer;seg->send_count_ ++;send_packet_number ++;loss_set_.erase(it ++);//报告CCC有重发ccc_->add_resend();}else++ it;}//更新重发包范围内未重发报文的时刻,防止下一次定时器到来时重复发送for(SendWindowMap::iterator it = send_window_.begin(); it != send_window_.end(); ++it){if(it->second->push_ts_ < loss_last_ts && loss_last_seq >= it->first)it->second->last_send_ts_ = now_timer;else if(loss_last_seq < it->first)break;}}else if(send_window_.size() > 0)//丢包队列为空,重发所有窗口中超时的分片{//发送间时间隔阈值uint32_t rtt_threshold = (uint32_t)ceil(rtt * 1.25);rtt_threshold = (core_max(rtt_threshold, 30));SendWindowMap::iterator end_it = send_window_.end();for(SendWindowMap::iterator it = send_window_.begin(); it != end_it; ++it){if(send_packet_number >= ccc_cwnd_size || (it->second->push_ts_ + rtt_threshold > now_timer))break;seg = it->second;//重发块的触发条件是上一次发送的时间距离现在大于特定的阈值或者压入时间很长并且是属于发送缓冲区靠前的块if(seg->last_send_ts_ + rtt_threshold < now_timer || (seg->push_ts_ + rtt_threshold * 5 < now_timer && seg->seq_ < dest_max_seq_ + 3 && seg->last_send_ts_ + rtt_threshold / 2 < now_timer)) {net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);if(cwnd_max_seq_ < seg->seq_)cwnd_max_seq_ = seg->seq_;seg->last_send_ts_ = now_timer;seg->send_count_ ++;send_packet_number ++;//报告CCC有重发块ccc_->add_resend();}}}//判断是否可以发送新的报文if(ccc_cwnd_size > send_packet_number){while(!send_data_.empty()){RUDPSendSegment* seg = send_data_.front();//判断NAGLE算法,NAGLE最少需要在100MS凑1024个字节报文if(cwnd_size > 0 && nagle_ && seg->push_ts_ + NAGLE_DELAY > now_timer && seg->data_size_ < MAX_SEGMENT_SIZE - 256)break;//判断发送窗口if(cwnd_size < ccc_cwnd_size){send_data_.pop_front();send_window_.insert(SendWindowMap::value_type(seg->seq_, seg));cwnd_size ++;seg->push_ts_ = now_timer;seg->last_send_ts_ = now_timer;seg->send_count_ = 1;//UDP网络发送net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer);if(cwnd_max_seq_ < seg->seq_)cwnd_max_seq_ = seg->seq_;}else //发送窗口满,则停止发送break;}}
}

从上可得知,attempt_send是首先检查是否可以发送丢失的报文,然后再检查窗口中太老的报文是否要重发,最后才加入新的发送报文。所有的前提约束是不超过发送窗口。这个函数里CCC决定的发送窗口大小和RTT直接控制着发送速度和发送策略。 在这里值得一提的是NAGLE的实现,RUDP为了防止小包过多,实现了一个nagle算法,如果设置了此开关,假如只有1个块在缓冲队列中,会等数据达到1024的长度才进行发送。如果等100MS没到1024长度也会发送,也就是最大等100MS.开关可以通过rudp interface设置的。

接收缓冲区

接收缓冲区相对比较简单,其主要功能是接收发送方的数据并生成接收块、块排序、丢包判断和反馈、读事件通知等。以下是接收缓冲区的定义:
class RUDPRecvBuffer
{
public:...//来自网络中的数据int32_t             on_data(uint64_t seq, const uint8_t* data, int32_t data_size);//定时事件void                on_timer(uint64_t now_timer, uint32_t rtc);//读取BUFFER中的数据int32_t                read(uint8_t* data, int32_t data_size);//检查缓冲区是否可读void              check_buffer();//检查丢包bool               check_loss(uint64_t now_timer, uint32_t rtc);...
protected:IRUDPNetChannel*  net_channel_;//接收窗口RecvWindowMap        recv_window_;//已完成的连续数据片RecvDataList        recv_data_;//丢包序列LossIDTSMap            loss_map_;//当前BUFFER中最大连续数据片的SEQuint64_t            first_seq_;//当期BUFFER中受到的最大的数据片IDuint64_t           max_seq_;//最后一次发送ACK的时刻uint64_t         last_ack_ts_;//在上次发送ACK到现在,受到新的连续报文的标志   bool                recv_new_packet_;...
};

在上面定义中,核心的函数主要是on_data和on_timer。on_data是接收来自发送端的RUDP数据报文,在这个函数里面首先会进行接收到报文和缓冲去里面的报文进行比较判断是否丢包和重复包。如果有丢包,记录到loss_map中。如果是重复包,则丢弃。如果接收到的包和缓冲区里的报文可以组成连续的块序列。则对上层触发on_read读事件。一下是这个函数的伪代码:

int32_t RUDPRecvBuffer::on_data(uint64_t seq, const uint8_t* data, int32_t data_size)
{//报文合法性检测if(seq > first_seq_ + MAX_SEQ_INTNAL || data_size > MAX_SEGMENT_SIZE){//报告异常RUDP_RECV_DEBUG("on data exception!!");net_channel_->on_exception();return -1;}RUDPRecvSegment* seg = NULL;if(first_seq_ + 1 == seq)//连续报文{recv_new_packet_= true;//将数据缓冲到队列中GAIN_RECV_SEG(seg);seg->seq_ = seq;seg->data_size_ = data_size;memcpy(seg->data_, data, data_size);recv_data_.push_back(seg);first_seq_ = seq;//判断缓冲区中的块是否连续,并进行排序check_recv_window();//触发可读事件net_channel_->on_read();//删除丢包loss_map_.erase(seq);}else if(seq > first_seq_ + 1) //非连续报文{RecvWindowMap::iterator it = recv_window_.find(seq);if(it == recv_window_.end()) //记录到接收窗口中{//将数据缓冲到队列中GAIN_RECV_SEG(seg);seg->seq_ = seq;seg->data_size_ = data_size;memcpy(seg->data_, data, data_size);recv_window_[seq] = seg;}//判断丢包if(seq > max_seq_ + 1){uint64_t ts = CBaseTimeValue::get_time_value().msec();for(uint64_t i = max_seq_ + 1; i < seq;  ++ i) //缓冲区中最大的报文和收到的报文之间的报文全部列入丢包范围中,并记录丢包时刻loss_map_[i] = ts;}else{//删除丢包loss_map_.erase(seq);}}//更新缓冲区最大SEQif(max_seq_ < seq)max_seq_ = seq;return 0;
}

on_timer是定时触发的,一般是5MS触发一次。主要是向发送端发送报告消息(ack/nack)、检查缓冲区是否可读两个操作。发送ack状态消息的条件是

uint32_t rtc_threshold = core_min(20, rtc / 2);
if(last_ack_ts_ + rtc_threshold <= now_timer && recv_new_packet_){
发送ack
}
其中rtc是RTT的修正值,由CCC计算得来。间隔不大于20MS发送一次。recv_new_packet_是一个收到正常连续报文的标志。如果发送了NACK,就不发送ACK,如果有丢包的话,就会触发发送nack,在on_timer的时候就会检测是本定时周期是否有丢包,如果有,就将丢包的序号通过nack发往发送端做丢包补偿。
void RUDPRecvBuffer::on_timer(uint64_t now_timer, uint32_t rtc)
{       //检查丢包if(check_loss(now_timer, rtc))recv_new_packet_ = false;//检查是否需要发送ackuint32_t rtc_threshold = core_min(20, rtc / 2);if(last_ack_ts_ + rtc_threshold <= now_timer && recv_new_packet_)send_ack();//检查缓冲区是否可读if(!recv_data_.empty() && net_channel_ != NULL)net_channel_->on_read();
}

CCC核心控制

CCC的核心控制就是慢启动、快恢复、RTT评估三个部分。
慢启动过程描述如下:
1、发送端的初始化发送窗口(send_wnd)为16
2、当发送端收到第一个ACK时,send_wnd = send_wnd + (本ACK周期内发送成功的报文数量)
3、继续发送数据报文,直到下一个ACK。重复2和3步骤,如果send_wnd >= MAX_WND.慢启动结束,或者慢启动时间超过10个RTT和出现丢包情况,慢启动也结束。
其中MAX_WND是通过RTT决定的。RTT与MAX_WND的对照
RTT                                              MAX_WND
< 10ms                                       2048
< 50ms                                       6144
< 100ms                                     8192
其他                                             12288
从上面可得知,RTT越大MAX_WND越大,这样做的目的是提高高延迟稳定网络之间的吞吐量。
快恢复过程描述如下:
在慢启动结束后,数据传输过程会随着网络变化策略也要变化。
1、如果1个ACK周期没有丢包,发送窗口send_wnd = snd_cwnd * 1.5
2、如果1个ACK周期有丢包,send_wnd  = send_wnd  / 1.25;最小不能低于8
3、如果本地触发on_timer事件,检查本地重发报文resend_count > send_wnd  / 8,如果条件满足send_wnd  = send_wnd  / 1.25;最小不能低于8。
RTT的评估是通过RUDP_KEEPLIVE的回路得到一个keeplive_rtt为参数如数计算得到rtt和rtt_var.伪代码如下:
void RUDPCCCObject::set_rtt(uint32_t keep_live_rtt)
{...//第一次计算rtt和rtt修正if(rtt_first_){rtt_first_ = false;rtt_ = keep_live_rtt;rtt_var_ = rtt_ / 2;}else //参考了tcp的rtt计算{rtt_var_ = (rtt_var_ * 3 + core_abs(rtt_, keep_live_rtt)) / 4;rtt_ = (7 * rtt_ + keep_live_rtt) / 8;}rtt_ = core_max(5, rtt_);rtt_var_ = core_max(3, rtt_var_);...
}
总结,revolver RUDP模块在传输速度和稳定性上表现还算优秀,在带宽达到30M/s以上,CPU上升比较高,一般占用一个CORE的30%,造成这个原因主要是一个UDP socket发送比较耗CPU,还有就是大数据量造成发送和接收窗口增长,使得丢包判定、窗口移动等效率明显下降。关于窗口移动和发送以后可以考虑用存C来实现,不依赖C++和STL,应该效率有比较大的提升。

C++高性能服务框架revolver:RUDP(可靠UDP)算法详解相关推荐

  1. C++高性能服务框架revover:rudp总体介绍(可靠UDP传输)

    在revolver框架中实现了一个高效可靠的RUDP通信方式,这个通信方式是基于UDP实现一种模拟TCP传输数据的行为.在很多实际应用中,udp/TCP都不是最好的通信方式,例如:点对点文件传输.视频 ...

  2. MapReduce框架下的FP Growth算法详解

    转载自:http://blog.sina.com.cn/s/blog_68ffc7a40100uebk.html Sharding 这一步没什么好讲的,将数据库分成连续的大小相等的几个块,放置在不同的 ...

  3. zys高性能服务框架

    zys是基于yaf和swoole的高性能服务框架 核心特性 1.基于swoole提供分布式服务器通讯服务 2.基于thrift提供rpc远程调用服务 3.基于HTML5提供在线网络直播平台服务 4.基 ...

  4. java udp 协议_网络协议 - UDP 协议详解

    ¶ 网络协议 - UDP 协议详解 基于TCP和UDP的协议非常广泛,所以也有必要对UDP协议进行详解.@pdai ¶ UDP概述 UDP(User Datagram Protocol)即用户数据报协 ...

  5. php事件和行为,Yii框架组件和事件行为管理详解

    Yii框架组件和事件行为管理详解 来源:中文源码网    浏览: 次    日期:2018年9月2日 [下载文档:  Yii框架组件和事件行为管理详解.txt ] (友情提示:右键点上行txt文档名- ...

  6. SAP UI5 初学者教程之二十六 - OData 服务配合 Mock 服务器的使用步骤详解试读版

    一套适合 SAP UI5 初学者循序渐进的学习教程 教程目录 SAP UI5 本地开发环境的搭建 SAP UI5 初学者教程之一:Hello World SAP UI5 初学者教程之二:SAP UI5 ...

  7. 三大框架题目整合考试题(含详解)

    三大框架题目整合考试题(含详解) 1.在Hibernate的关联关系映射配置中,下列选项对于inverse说法错误的是(bd). (选择二项) A. inverse属性指定了关联关系中的方向 //in ...

  8. linux中apache配置文件在哪,linux网站服务Apache的安装与配置方法详解

    这篇文章介绍下linux网站服务apache的安装与配置方法,包括挂载光盘,安装http服务,管理httpd服务,httpd的配置文件几大部分.具体详情可以参考下文. 1.挂载光盘 自己习惯将光盘挂载 ...

  9. 自动化测试框架[Cypress命令行执行测试详解]

    前提 已经熟练掌握了Cypress的基本知识,请参考自动化测试框架[Cypress概述]和自动化测试框架[各自动化测试框架比较] 已经熟练掌握Cypress环境配置,请参考自动化测试框架[Cypres ...

最新文章

  1. ROS与Arduino学习(三)订阅与发布
  2. 双击.exe文件出现Debug Error: abort() has been called解决办法(之一)
  3. ssl1614-医院设置【图论,最短路】
  4. 字典超详细--python
  5. PyQt5的QAction多次响应triggered信号的处理方式
  6. 授权码模式、Token登录认证
  7. CASS11最新版免狗下载安装教程
  8. web项目缺少web组件才能与vs一起运行,带razor语法的 asp.net web pages 2.0.0.1
  9. 数学作图工具_非常实用的九个程序员工具网站
  10. 英文邮件礼仪:向教授请教学术问题
  11. 【转】火影10大秘密最终之解(杜撰)
  12. 超简单的方法完整保留原有所有样式拆分Excel表
  13. 单机版Fate安装教程(含虚拟机搭建)
  14. 什么是GPT,如何克隆GPT类型的磁盘?
  15. 精华 | 恩墨学院侯圣文:大数据时代下的 DBA 该何去何从?
  16. Jq-滚动条插件写法(二)
  17. 怎么入驻印象淘宝短视频 申请通过印象淘宝条件要求
  18. WebRTC 桌面共享:
  19. 基于Python+opencv实现的人脸识别系统
  20. opencore添加Linux引导,让OpenCore的引导界面更好看,已更新详细教程

热门文章

  1. 深富策略:A股市场处于震荡颠簸期
  2. 一文详解激光SLAM框架LeGO-LOAM
  3. atom linux64下载,ATOM下载
  4. jsp标签jsp:useBean用法
  5. 计算机技术 食堂管理,一套出色的智慧食堂消费系统,应该具备哪些突出特点?...
  6. 收集的签名档的好去处
  7. 【ELectron】electron应用任务栏图标闪烁提醒
  8. 发出商品及差异调整方案
  9. kiss原则包括什么_KISS原则以及介绍
  10. Centos7 忘记密码的情况下,修改root或其他用户密码