rdr是winvnc用来io操作的一个库工程。

IO操作无非两种,写入和读取数据,所以这里每个类都相应地分成了IN和OUT两种操作,先分析IN。

view plain
  1. class InStream {  public:    virtual ~InStream() {}    // check() ensures there is buffer data for at least one item of size    // itemSize bytes.  Returns the number of items in the buffer (up to a    // maximum of nItems).    inline int check(int itemSize, int nItems=1)    {      if (ptr + itemSize * nItems > end) {        if (ptr + itemSize > end)          return overrun(itemSize, nItems);        nItems = (end - ptr) / itemSize;      }      return nItems;    }    // readU/SN() methods read unsigned and signed N-bit integers.    inline U8  readU8()  { check(1); return *ptr++; }    inline U16 readU16() { check(2); int b0 = *ptr++; int b1 = *ptr++;                           return b0 << 8 | b1; }    inline U32 readU32() { check(4); int b0 = *ptr++; int b1 = *ptr++;                                     int b2 = *ptr++; int b3 = *ptr++;                           return b0 << 24 | b1 << 16 | b2 << 8 | b3; }    inline S8  readS8()  { return (S8) readU8();  }    inline S16 readS16() { return (S16)readU16(); }    inline S32 readS32() { return (S32)readU32(); }    // readString() reads a string - a U32 length followed by the data.    // Returns a null-terminated string - the caller should delete[] it    // afterwards.    char* readString();    // maxStringLength protects against allocating a huge buffer.  Set it    // higher if you need longer strings.    static U32 maxStringLength;    inline void skip(int bytes) {      while (bytes > 0) {        int n = check(1, bytes);        ptr += n;        bytes -= n;      }    }    // readBytes() reads an exact number of bytes.    virtual void readBytes(void* data, int length) {      U8* dataPtr = (U8*)data;      U8* dataEnd = dataPtr + length;      while (dataPtr < dataEnd) {        int n = check(1, dataEnd - dataPtr);        memcpy(dataPtr, ptr, n);        ptr += n;        dataPtr += n;      }    }    // readOpaqueN() reads a quantity without byte-swapping.    inline U8  readOpaque8()  { return readU8(); }    inline U16 readOpaque16() { check(2); U16 r; ((U8*)&r)[0] = *ptr++;                                ((U8*)&r)[1] = *ptr++; return r; }    inline U32 readOpaque32() { check(4); U32 r; ((U8*)&r)[0] = *ptr++;                                ((U8*)&r)[1] = *ptr++; ((U8*)&r)[2] = *ptr++;                                ((U8*)&r)[3] = *ptr++; return r; }    inline U32 readOpaque24A() { check(3); U32 r=0; ((U8*)&r)[0] = *ptr++;                                 ((U8*)&r)[1] = *ptr++; ((U8*)&r)[2] = *ptr++;                                 return r; }    inline U32 readOpaque24B() { check(3); U32 r=0; ((U8*)&r)[1] = *ptr++;                                 ((U8*)&r)[2] = *ptr++; ((U8*)&r)[3] = *ptr++;                                 return r; }    // pos() returns the position in the stream.    virtual int pos() = 0;    // getptr(), getend() and setptr() are "dirty" methods which allow you to    // manipulate the buffer directly.  This is useful for a stream which is a    // wrapper around an underlying stream.    inline const U8* getptr() const { return ptr; }    inline const U8* getend() const { return end; }    inline void setptr(const U8* p) { ptr = p; }  private:    // overrun() is implemented by a derived class to cope with buffer overrun.    // It ensures there are at least itemSize bytes of buffer data.  Returns    // the number of items in the buffer (up to a maximum of nItems).  itemSize    // is supposed to be "small" (a few bytes).    virtual int overrun(int itemSize, int nItems) = 0;  protected:    InStream() {}    const U8* ptr;    const U8* end;  };

构造函数为protected权限,摆明是给子类提供接口规范,derived class。

两个变量根据名字来分析,ptr指向的应该是当前位置,end则是最后一个字节。

inline int check(int itemSize, int nItems=1)

检查剩余缓存是否够用,代码对三种情况进行了处理,当前缓存不足一个itemSize的时候会运行overrun。overrun在这里是一个纯虚函数,稍后我们可以在子类中它的应用。

readU系列

以readU32为例,这里没有用memcpy进行拷贝,而是将源低地址的8位赋值给目标变量的高8位,既默认大端字节序(网络字节序)。

readS只是将readU的返回值作一个有符号类型的强制转换。

char* InStream::readString()

这里默认当前缓存头32位保存的是将要读取内容的长度,然后动态分配了相应长度(+1保证C STYLE字符串),拷贝后返回头指针,意味着由调用函数者来释放。上限是U32 InStream::maxStringLength = 65535;

inline void skip(int bytes)

用来快进当前指针bytes个字节。

virtual void readBytes(void* data, int length)

读取指定长度length的内容到data。这里用了while循环,虽然之前看到了有overrun函数,所以貌似有不读到就阻塞的嫌疑。

readOpaque系列则

这里就是一个字节一个字节的原始拷贝了。因为一个类型指针在内存中永远指向的是低地址。附一个判断CPU大小端的函数

view plain
  1. bool IsBig_Endian(){unsigned short test = 0x1100;if(*( (unsigned char*) &test ) == 0x11){#define HAVE_BIGENDIANreturn true;}else{#undef  HAVE_BIGENDIANreturn false;}}

class FdInStream : public InStream

这个类是IO端口的一个缓存,从IO端口读取数据,并且计算流量。

view plain
  1. int FdInStream::readWithTimeoutOrCallback(void* buf, int len){  struct timeval before, after;  if (timing)    gettimeofday(&before, 0);  int n = checkReadable(fd, timeout);//查看端口是否有数据到达  if (n < 0) throw SystemException("select",errno);  if (n == 0) {    if (timeout) throw TimedOut();    if (blockCallback) (*blockCallback)(blockCallbackArg);//如果IO没有数据,则调用回调函数blockCallback  }  while (true) {    n = ::read(fd, buf, len);    if (n != -1 || errno != EINTR)      break;//从IO读到数据就退出循环,出错则阻塞尝试    fprintf(stderr,"read returned EINTR/n");  }  if (n < 0) throw SystemException("read",errno);  if (n == 0) throw EndOfStream();  if (timing) {    gettimeofday(&amp;after, 0);//      fprintf(stderr,"%d.%06d/n",(after.tv_sec - before.tv_sec),//              (after.tv_usec - before.tv_usec));    int newTimeWaited = ((after.tv_sec - before.tv_sec) * 10000 +                         (after.tv_usec - before.tv_usec) / 100);//单位100微秒    int newKbits = n * 8 / 1000;//单位1000位      if (newTimeWaited == 0) {        fprintf(stderr,"new kbps infinite t %d k %d/n",                newTimeWaited, newKbits);      } else {        fprintf(stderr,"new kbps %d t %d k %d/n",                newKbits * 10000 / newTimeWaited, newTimeWaited, newKbits);      }//计算本次IO读取的kbps    // limit rate to between 10kbit/s and 40Mbit/s    if (newTimeWaited > newKbits*1000) newTimeWaited = newKbits*1000;//在10kbit/s以下的计为10kbit/s    if (newTimeWaited < newKbits/4)    newTimeWaited = newKbits/4;//在40Mbit/s以上的计为40Mbit/s    timeWaitedIn100us += newTimeWaited;    timedKbits += newKbits;//累计读取时间和数据量  }  return n;}

这个是从IO读取长度len数据到buf。

1 用select函数检查端口是否有数据到达。

2 如果没有的话调用构造函数传入的回调函数blockCallback。

3 然后读取数据,虽然::read(fd, buf, len);用了全局符号,但是头文件有定义#define read(s,b,l) recv(s,(char*)b,l,0)。

4 计算流量,先看下获取时间的函数:尝试用两种办法获取当前时间

static void gettimeofday(struct timeval* tv, void*)

view plain
  1. #ifdef _WIN32static void gettimeofday(struct timeval* tv, void*){  LARGE_INTEGER counts, countsPerSec;  static double usecPerCount = 0.0;  if (QueryPerformanceCounter(&amp;counts)) {    if (usecPerCount == 0.0) {      QueryPerformanceFrequency(&amp;countsPerSec);      usecPerCount = 1000000.0 / countsPerSec.QuadPart;  //一个CPU时钟 时间单位微秒    }    LONGLONG usecs = (LONGLONG)(counts.QuadPart * usecPerCount);//计算系统运行到现在过了多少微秒    tv->tv_usec = (long)(usecs % 1000000);    tv->tv_sec = (long)(usecs / 1000000);  } else {    struct timeb tb;    ftime(&tb);    tv->tv_sec = tb.time;    tv-&gt;tv_usec = tb.millitm * 1000;//tv保存的是1970.1.1到现在经过的时间  }}#endif

startTiming、stopTiming和kbitsPerSecond分别用于开始计算、停止计算和返回从开始至今的传输率。

但都作了最小和最大的传输率限制,暂时猜不透用意。

现在我们看看基类中没有实现的虚函数

FdInStream::overrun(int itemSize, int nItems)

view plain
  1. int FdInStream::overrun(int itemSize, int nItems){  if (itemSize > bufSize)    throw Exception("FdInStream overrun: max itemSize exceeded");  if (end - ptr != 0)    memmove(start, ptr, end - ptr);//把缓冲区的未读数据拷贝到缓冲区的头部  offset += ptr - start;  end -= ptr - start;  ptr = start;  while (end &lt; start + itemSize) {    int n = readWithTimeoutOrCallback((U8*)end, start + bufSize - end);//从IO读取数据填充缓冲区剩余部分    end += n;  }  if (itemSize * nItems &gt; end - ptr)    nItems = (end - ptr) / itemSize;  return nItems;}

对于比缓冲还大的itemsize只能吼下“OUT”

overrun是在缓冲区剩余未读长度小于itemsize的时候运行的,我们先得理解下几个变量:

ptr 缓冲区中未读数据的首字节地址

end 缓冲区中未读数据的尾字节地址

start 永远都指向了缓冲区的首字节地址

offset 数据传输总量。

首先memmove(start, ptr, end - ptr);把缓冲区的未读数据拷贝到缓冲区的头部,然后调用FdInStream::readWithTimeoutOrCallback从IO读取数据填充缓冲区剩余部分。

对于大量的数据,我们首先把缓冲区中未读数据读取出来,然后调用FdInStream::readWithTimeoutOrCallback直接读取到目标地址,而不再经过缓存区,这样可以提高效率。

也就是说缓冲区虽然方便了管理数据读写管理,但对效率相应地有些影响。

class MemInStream : public InStream

int overrun(int itemSize, int nItems) { throw EndOfStream(); }

可以看出来这个类用于对内存数据的管理。

ZlibInStream : public InStream

本来想到研究zlib库的时候再说,但zlib的接口类非常简洁。它增加的几个变量

view plain
  1. InStream* underlying;//压缩前数据存放的缓冲区,暂时把它叫做A,ZlibInStream是压缩后数据存放的缓冲区,为了区分叫C。z_stream_s* zs;//zlib接口类指针,把它看成是一个压缩加工车间B。int bytesIn;//记录当前A中需要压缩的数据长度

构造函数:

view plain
  1. ZlibInStream::ZlibInStream(int bufSize_)  : underlying(0), bufSize(bufSize_ ? bufSize_ : DEFAULT_BUF_SIZE), offset(0),    bytesIn(0){  zs = new z_stream;  zs->zalloc    = Z_NULL;  zs->zfree     = Z_NULL;  zs->opaque    = Z_NULL;  zs->next_in   = Z_NULL;  zs->avail_in  = 0;  if (inflateInit(zs) != Z_OK) {    delete zs;    throw Exception("ZlibInStream: inflateInit failed");  }  ptr = end = start = new U8[bufSize];}

B的初始化。

void ZlibInStream::setUnderlying(InStream* is, int bytesIn_)

view plain
  1. void ZlibInStream::setUnderlying(InStream* is, int bytesIn_){  underlying = is;  bytesIn = bytesIn_;  ptr = end = start;}

A的初始化。

int ZlibInStream::overrun(int itemSize, int nItems)

view plain
  1. int ZlibInStream::overrun(int itemSize, int nItems){  if (itemSize > bufSize)    throw Exception("ZlibInStream overrun: max itemSize exceeded");  if (!underlying)    throw Exception("ZlibInStream overrun: no underlying stream");  if (end - ptr != 0)    memmove(start, ptr, end - ptr);  offset += ptr - start;  end -= ptr - start;  ptr = start;  while (end - ptr < itemSize) {    decompress();  }  if (itemSize * nItems > end - ptr)    nItems = (end - ptr) / itemSize;  return nItems;}

对照FdInStream::overrun函数,看出它是调用了核心处理函数decompress来解决无数据可读的情况

void ZlibInStream::decompress()

view plain
  1. void ZlibInStream::decompress(){  zs->next_out = (U8*)end;  zs->avail_out = start + bufSize - end;  //用C中保存被解压数据的控件指针初始化B的接口变量  underlying->check(1);  zs->next_in = (U8*)underlying->getptr();  zs->avail_in = underlying->getend() - underlying->getptr();  if ((int)zs->avail_in > bytesIn)    zs->avail_in = bytesIn;  int rc = inflate(zs, Z_SYNC_FLUSH);  if (rc != Z_OK) {    throw Exception("ZlibInStream: inflate failed");  }  bytesIn -= zs-&gt;next_in - underlying->getptr();  end = zs->next_out;   //解压后zs->next_out会发生偏移,偏移量为缓冲接收到的被解压的数据量  underlying->setptr(zs->next_in);  //解压后zs->next_in会发生偏移,偏移量为underlying中被解压的数据量}

它是逻辑很简单,压缩前初始化B的几个关键变量,然后直接调用压缩函数inflate,压缩完后,我们要相应地修改A和C的成员指针到相应地偏移量。

void ZlibInStream::reset()

view plain
  1. void ZlibInStream::reset(){  ptr = end = start;  if (!underlying) return;  while (bytesIn > 0) {    decompress();    end = start; // throw away any data  }  underlying = 0;}

把A中需要压缩的数据全部压缩完并输出到C中,然后初始化成员变量。我们可以通过函数setUnderlying来切换A,但是记住在之前调用RESET。

这个时候看OUT系列就轻松很多了。

class FdOutStream : public OutStream

void FdOutStream::flush()

view plain
  1. void FdOutStream::flush(){  U8* sentUpTo = start;  while (sentUpTo < ptr) {    int n = write(fd, (const void*) sentUpTo, ptr - sentUpTo);    if (n < 0) throw SystemException("write",errno);    sentUpTo += n;    offset += n;  }  ptr = start;}

所谓的flush就是把缓冲区的数据写入IO。

FdOutStream::writeBytes(const void* data, int length)

view plain
  1. void FdOutStream::writeBytes(const void* data, int length){  if (length < MIN_BULK_SIZE) {    OutStream::writeBytes(data, length);    return;  }  const U8* dataPtr = (const U8*)data;  flush();  while (length > 0) {    int n = write(fd, dataPtr, length);    if (n < 0) throw SystemException("write",errno);    length -= n;    dataPtr += n;    offset += n;  }}

发现对于大量数据写入也是先刷新缓冲区,然后绕过缓冲区直接IO写入。

在这里可以清晰地看出offset的用途

它就是用来记录写入IO的数据总量。

int FdOutStream::length()

{

return offset + ptr - start;

}

当然对于外界得到的只能是通过OutStream接口写入的数据总量,因为它会把缓冲区的当前数据量加上去。

MemOutStream

唯一要注意的就是int overrun(int itemSize, int nItems)

在这里如果原来的缓冲不够写,那我们就重新分配一个足够大的缓冲区,把原来缓冲区已写入的数据拷贝过来,然后释放掉旧的缓冲区。

class NullOutStream : public OutStream

这个家伙根本就没有作任何数据的操作,仅仅是计量,猜测是用来计算数据吞吐量的。

class ZlibOutStream : public OutStream

成员变量:

view plain
  1. OutStream* underlying;//压缩后数据存放的缓冲区,暂时把它叫做C,ZlibOutStream 是压缩前数据存放的缓冲区,为了区分叫A。z_stream_s* zs;zlib接口类指针,把它看成是一个解压加工车间B。

构造函数同样是初始化了B。但这里调用的deflateInit。

void ZlibOutStream::flush()

view plain
  1. void ZlibOutStream::flush(){  zs->next_in = start;  zs->avail_in = ptr - start;  //用A需要压缩的数据头指针和长度初始化B接口//    fprintf(stderr,"zos flush: avail_in %d/n",zs->avail_in);  while (zs->avail_in != 0) {//循环保证A中数据能完全压缩完    do {      underlying->check(1);      zs->next_out = underlying-&gt;getptr();      zs->avail_out = underlying->getend() - underlying->getptr();//        fprintf(stderr,"zos flush: calling deflate, avail_in %d, avail_out %d/n",//                zs->avail_in,zs->avail_out);      int rc = deflate(zs, Z_SYNC_FLUSH);      if (rc != Z_OK) throw Exception("ZlibOutStream: deflate failed");//        fprintf(stderr,"zos flush: after deflate: %d bytes/n",//                zs->next_out-underlying->getptr());      underlying->setptr(zs->next_out);    } while (zs->avail_out == 0);//第二个循环,检查C是否还有空余的空间,如果没有了,则通过check来刷新C获取足够空间来接收压缩后的数据。//deflate被循环调用,可以理解它是智能的,如果输出空间不够,它会自动记录上次压缩数据的状态,不用调用者操心next_in重置,next_in总是指向已压缩的数据源的后一个字节。  }  offset += ptr - start;  ptr = start;}

保证了A中所有需要压缩的数据全部压缩并输出。

int ZlibOutStream::overrun(int itemSize, int nItems)

view plain
  1. int ZlibOutStream::overrun(int itemSize, int nItems){//    fprintf(stderr,"ZlibOutStream overrun/n");  if (itemSize > bufSize)    throw Exception("ZlibOutStream overrun: max itemSize exceeded");  while (end - ptr < itemSize) {    zs->next_in = start;    zs->avail_in = ptr - start;    do {      underlying->check(1);      zs->next_out = underlying->getptr();      zs->avail_out = underlying->getend() - underlying->getptr();//        fprintf(stderr,"zos overrun: calling deflate, avail_in %d, avail_out %d/n",//                zs->avail_in,zs->avail_out);      int rc = deflate(zs, 0);      if (rc != Z_OK) throw Exception("ZlibOutStream: deflate failed");//        fprintf(stderr,"zos overrun: after deflate: %d bytes/n",//                zs->next_out-underlying->getptr());      underlying->setptr(zs->next_out);    } while (zs->avail_out == 0);    // output buffer not full    if (zs->avail_in == 0) {      offset += ptr - start;      ptr = start;    } else {      // but didn't consume all the data?  try shifting what's left to the      // start of the buffer.      fprintf(stderr,"z out buf not full, but in data not consumed/n");      memmove(start, zs->next_in, ptr - zs->next_in);      offset += zs->next_in - start;      ptr -= zs->next_in - start;    }  }  if (itemSize * nItems > end - ptr)    nItems = (end - ptr) / itemSize;  return nItems;}

这个函数并没直接调用flush来解决A空间不足的问题,因为只要保证A中有空间满足itemSize就好,而不是flush这种函数完全清空。

void ZlibOutStream::setUnderlying(OutStream* os)

view plain
  1. void ZlibOutStream::setUnderlying(OutStream* os){  underlying = os;}

重置A,记得在它之前调用ZlibOutStream::flush。

WINVNC源码分析(四)——IO之rdr库相关推荐

  1. 【投屏】Scrcpy源码分析四(最终章 - Server篇)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  2. Rxjava源码分析之IO.Reactivex.Observer

    Android 中的观察者模式,Rxjava中有两个重要的类Observable和Observer,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable).通 ...

  3. ABP源码分析四十七:ABP中的异常处理

    ABP源码分析四十七:ABP中的异常处理 参考文章: (1)ABP源码分析四十七:ABP中的异常处理 (2)https://www.cnblogs.com/1zhk/p/5538983.html (3 ...

  4. gSOAP 源码分析(四)

    gSOAP 源码分析(四) 2012-6-2 邵盛松 前言 本文主要说明gSOAP中对Client的认证分析 gSOAP中包含了HTTP基本认证,NTLM认证等,还可以自定义SOAP Heard实现认 ...

  5. Rxjava源码分析之IO.Reactivex.Observable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  6. Rxjava源码分析之IO.Reactivex.CompositeDisposable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  7. Spring 源码分析(四) ——MVC(二)概述

    随时随地技术实战干货,获取项目源码.学习资料,请关注源代码社区公众号(ydmsq666) from:Spring 源码分析(四) --MVC(二)概述 - 水门-kay的个人页面 - OSCHINA ...

  8. 【转】ABP源码分析四十七:ABP中的异常处理

    ABP 中异常处理的思路是很清晰的.一共五种类型的异常类. AbpInitializationException用于封装ABP初始化过程中出现的异常,只要抛出AbpInitializationExce ...

  9. 【转】ABP源码分析四:Configuration

    核心模块的配置 Configuration是ABP中设计比较巧妙的地方.其通过AbpStartupConfiguration,Castle的依赖注入,Dictionary对象和扩展方法很巧妙的实现了配 ...

  10. springfox源码_springfox 源码分析(四) 配置类初始化

    时间:2019-5-23 12:46:50 地点:单位.家中 @EnableSwagger2 有了二三章的理解,此时我们再来看EnableSwagger2注解的内容 @Retention(value ...

最新文章

  1. echarts怎么用在php,在Vue.JS中怎样使用echarts
  2. python 中 sorted() 和 list.sort() 的用法
  3. 网络带宽由什么决定_加工中心价格分析,加工中心价格是由什么决定的呢?
  4. mysql-自动备份数据库服务
  5. h.265编码库x265实例
  6. php静态资源服务器,Node实现静态资源服务器
  7. Java中的ConcurrentHashMap
  8. Docker 基础 ( 二十 ) 部署Redis集群,问题记录
  9. 图解设计模式-Flyweight模式
  10. 决策树分析例题经典案例_分析模型案例解析:决策树分析法
  11. python四级考试时间_2016年四六级英语考试注意事项四六级考试建议
  12. Redis-事务(集成SpringBoot工程)
  13. 联想ThinkPad笔记本Fn键关闭与启用方法
  14. 小程序分账系统是什么?能解决二清吗?
  15. 4480: [Jsoi2013]快乐的jyy
  16. 关于单边账的解释及解决
  17. 2019年,转行到互联网行业,还有必要么?
  18. 单行文本溢出和多行文本溢出省略...
  19. 下三角矩阵线性方程的求解
  20. 计算机毕业设计ssm红旗家具城管理系统29a0m系统+程序+源码+lw+远程部署

热门文章

  1. java 实现word文档的在线签批圈阅(手写批注)
  2. 业界 | 摩根大通报告12个亮点总结:金融领域的机器学习工具有哪些?
  3. SAP FICO 第一节 后台配置
  4. 静态路由 动态路由 php,静态路由汇总(路由聚合)
  5. oracle hot patch david,Hot Patch工具Rollout
  6. python判断是否回文_Python语言判断输入的是否是回文数的方法
  7. 学习搜狗workflow心路历程(1)Windows版本的环境搭建
  8. 梅特勒托利多电子秤显示EEP服务器错误,托利多电子秤TCII故障维修方法(一)...
  9. 三星发布全球首款太阳能笔记本
  10. Vue过滤器filter和filters的使用详解