Lab 1: stitching substrings into a byte stream

  • Lab Guide: Checkpoint 1: stitching substrings into a byte stream
  • Lab Code: https://github.com/peakcrosser7/sponge/tree/lab1-startercode

3 Putting substrings in sequence

要点

  • 实现 StreamReassembler 类, 用于将接收到的子串重组成顺序的字节流.
  • StreamReassembler 的容量 capacity 既包括未被读取的字节流 ByteStream 中的字节数, 也包括未被重排的字节流的最大数目.
  • 子串可能相互重叠, 且不能重复对其进行存储.

优化前

思路

数据结构

首先也是较为关键的是数据结构的选择. 需要对乱序的子串存储到缓冲区 _buffer, 并重组成有序的字节流. 笔者首先想到的是使用哈希表存储 index 和子串 data 的映射, 但存在的问题在于子串之间可能是相互重叠的, 而哈希表的方式便无法判断, 进而会造成重复存储重叠部分.
而为解决重叠不重复存的问题, 就需要考虑可随机访问的顺序容器(由于乱序因此要求能够随机访问), 且以字节为单位而非整个子串. 此外由于会频繁的将重排后的子串写入 ByteStream 中, 即频繁对容器头部进行删除操作, 因此最终选择使用 deque.
在选定了容器后还要考虑乱序带来的一个问题, 即区分缓冲区的一个字节是数据还是为空. 很显然在子串中不能保证一定没有 \0 字符, 因此直接利用在原字符位置无法直接判别, 因此笔者又使用了一个 deque<bool> _map 用于标识每个字节是否数据.

重组过程

实验的最关键部分是子串重组过程, 即 push_substring() 函数的实现. 在确定了数据结构之后, 重组过程实际上就相对比较清晰了. 先将子串存入缓冲区 _buffer 中, 然后将缓冲区头部所有重排好的子串写入 ByteStream 中. 在写入缓冲区时, 需要记录字节数, 用于统计未重排的字节总数, 即对应字段 _unassembled_bytes, 同时在写入 ByteStream 后也需要进行调整.

临界判断

由于子串可乱序可重复, 因此有较多临界情况需要判断, 即对于不在未重组范围的字节, 可以直接丢弃. 具体而言, 这包括子串 index 超出 capacity, 整个子串丢弃; 整个子串已被写入 ByteStream 中, 也无需再处理; 同样对于部分超出 capacity 或部分已经写入的字节, 仍无需处理.
而为了保证 capacity 的限制, 需要字段 _buffer_begin 来记录缓冲区未排序的字节的起始索引.

EOF

在子串标有 EOF 标识时, 表明该子串时字节流的结尾部分. 需要在这部分子串都被重排写入 ByteStream 后, 调用 ByteStream::end_input() 方法来停止 ByteStream 的写入.
这里需要注意的是, 由于子串可能由于超出容量被截短, 因此只有保证带 EOF 的整个子串都能被存入缓冲区后才能记录下 EOF 标识.

代码

libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH#include "byte_stream.hh"#include <cstdint>
#include <string>
#include <deque>//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {private:// Your code here -- add private members as necessary.std::deque<char> _buffer;       //!< The buffer to store unassembled bytesstd::deque<bool> _map;          //!< The map to identify whether the byte is datasize_t _unassembled_bytes = 0;  //!< Total number of unassembled bytessize_t _buffer_begin = 0;       //!< Starting index of the `_buffer`.bool _eof = false;              //!< Flag indicating that the end of bytes has been stored into `_buffer`ByteStream _output;  //!< The reassembled in-order byte streamsize_t _capacity;    //!< The maximum number of bytespublic://! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.//! \note This capacity limits both the bytes that have been reassembled,//! and those that have not yet been reassembled.StreamReassembler(const size_t capacity);//! \brief Receive a substring and write any newly contiguous bytes into the stream.//!//! The StreamReassembler will stay within the memory limits of the `capacity`.//! Bytes that would exceed the capacity are silently discarded.//!//! \param data the substring//! \param index indicates the index (place in sequence) of the first byte in `data`//! \param eof the last byte of `data` will be the last byte in the entire streamvoid push_substring(const std::string &data, const uint64_t index, const bool eof);//! \name Access the reassembled byte stream//!@{const ByteStream &stream_out() const { return _output; }ByteStream &stream_out() { return _output; }//!@}//! The number of bytes in the substrings stored but not yet reassembled//!//! \note If the byte at a particular index has been pushed more than once, it//! should only be counted once for the purpose of this function.size_t unassembled_bytes() const;//! \brief Is the internal state empty (other than the output stream)?//! \returns `true` if no substrings are waiting to be assembledbool empty() const;
};#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"// Dummy implementation of a stream reassembler.// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.// You will need to add private members to the class declaration in `stream_reassembler.hh`template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}using namespace std;StreamReassembler::StreamReassembler(const size_t capacity): _buffer(capacity), _map(capacity), _output(capacity), _capacity(capacity) {}//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {// the maximum index of byte that `_buffer` can storeconst size_t buffer_end = _buffer_begin + _output.remaining_capacity();// if the whole substring exceed the capacity, it will be silently discarded.if (index >= buffer_end) {return;}// set the `_eof` flag if the end byte can be storedif (eof && index + data.size() <= buffer_end) {_eof = true;}// traverse each byte of the substring in `capacity` range for reassembling.for (size_t buf_idx = max(index, _buffer_begin), data_idx = buf_idx - index;data_idx < data.size() && buf_idx < buffer_end;++data_idx, ++buf_idx) {// not store bytes repeatedlyif (_map[buf_idx - _buffer_begin]) {continue;}_buffer[buf_idx - _buffer_begin] = data[data_idx];_map[buf_idx - _buffer_begin] = true;++_unassembled_bytes;}string bytes;// write the assembled bytes to `_output`while (_map.front()) {bytes.push_back(_buffer.front());_buffer.pop_front();_map.pop_front();// keep the size of `_buffer` and `_map` always `_capacity`_buffer.push_back('\0');_map.push_back(false);}size_t len = bytes.size();if (len > 0) {// adjust `_buffer_begin` and `_unassembled_bytes` if some bytes have been reassembled _buffer_begin += len;_unassembled_bytes -= len;_output.write(bytes);}// end the input of `_output` if `_eof` flag has been set and no unassembled bytesif (_eof && _unassembled_bytes == 0) {_output.end_input();}
}size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }

代码需要注意的是:

  1. 中间遍历的子串的字节范围被限定在 _buffer_beginbuffer_end 之间, 以确保已重排写入 _output 的字节以及超出 capacity 上限的字节不予处理.
  2. 由于子串可能是乱序的, 因此在存储时使用的是 deque::operator [] 下标操作去存, 下标索引为字节的索引 index 相对缓冲区起始索引 _buffer_begin 的差值.
  3. 由于存储是直接使用 [] 下标存, 这就需要保证访问的位置不能超过 deque 的大小, 因此在出队写入 _output 的同时要入队元素以保持 _buffer 的大小不变, 防止后续存字节时出现问题.

遇到问题

  • Test #18: The reassembler was expected to be at EOF, but was not.

    解决: 根据提供的测试用例 data "", index 0, eof 1以及报错信息, 可以看到是由于 ByteStream 没有设置 EOF. 原因是笔者在 push_substring() 方法开始使用 index+data.size()<=_buffer_begin 对于已被重排过的子串直接过滤掉, 但这存在 data.size()==0 即空串的特殊情况, 此时若返回的话便不会设置 EOF 标识. 而考虑到后续重排时有检查, 因此应该直接去掉该语句.
  • Test #18: The reassembler was expected to be at EOF, but was not.

    解决: 根据报错信息可以看到同样与 EOF 有关. 最终发现是 _eof 标识设置的判断条件有误, index+data.size() 需要小于等于 buffer_end, 笔者误设置为了"小于".

优化后

先前方法的缺点

上述方法虽然能够通过测试, 但是在 Lab4 经过测试后显示其实际性能并不优秀, 于是笔者后来有对此部分进行了修改.
先前方法使用了两个 deque<char> 的队列作为核心数据结构, 用于存放乱序的字符串, 目的在于利用 deque 本身的随机访问, 这样可以以字节为单位较为方便的去重. 但这实际上也是一个问题, 即在放入 _buffer 时, 会逐字节遍历整个接收的字符串, 同时判断 _buffer 中是否重叠并放入, 同理在_buffer 转移至 _output 输出字节流时, 同样是逐字节的从 _buffer 中取出. 这两个过程是与字符串长度成线性关系的时间复杂度. 换句话说, 若字符串长度很长, 则会浪费大量时间在逐字节的扫描中. 因此也成为了 TCP 连接接收的一大性能瓶颈.

优化思路

这里重点介绍该优化方法相较之前方法的不同, 共同的部分则不再赘述.

数据结构

优化的核心在于将 _buffer 的数据结构由 deque<char> 改为 set<Block>. 首先介绍 Block, 其为一个结构体, 有两个字段: 一个用于记录接收到的部分字符串 data, 另一个则记录该部分字符串的索引 index.
在上文中提到过, 笔者开始想到的哈希表的问题在于无需性, 而 set 底层基于红黑树, 数据结构是有序的. 而利用 Block 实际上就是以片段为单位存储字符串, 而非字节, 这样在处理时自然更快速. 这里笔者选择了 set, 实际上选择 map 也是可以的, 这样就是将 index 作为键名 data 作为键值.
而由于是以片段为单位的, 相较于先前的方法, 需要额外的一步操作则是去除重复部分, 这里只需要遍历 _buffer 的每个片段, 根据已存储的片段来去除当前需要插入片段的头部或者尾部即可.

BufferPlus

在做到 Lab4 时, 对 Lab0 中 ByteStream 优化的一点即将 string 数据结构改为 BufferList, 以减少复制的开销. 这里实际上是相同的考虑, 在裁剪字符串时即调用 string::substr() 方法, 即从新开辟一块内存来存储字符串, 且在向 _buffer 中转移时也均为字符串的复制, 有较大的开销. 因此容易想到使用 Buffer 通过共享内存和偏移量(具体可以见其实现)来替代 substr(), 将复制改为移动以减少开销.
这里未使用原本的 Buffer 的原因在于其只有 remove_prefix() 方法去除字符串的首部, 而不能去除尾部. 而自然在字符串去除重复部分时也可能需要在尾部去除, 因此笔者参照原本的 Buffer 写了一个新的 BufferPlus 类, 可以同时在首尾去除字符串.
注意, 在 BufferPlusBuffer 中的首尾去除字符串并不是真正的去除, 而是通过一个偏移指针来隐藏去除过的字符, 只有当整个字符串都去除时才会直接清除整个内存, 这样可以减少内存释放的开销, 只需要进行偏移值的改变, 从而提高性能.

代码

libsponge/buffer_plus.hh
#ifndef SPONGE_BUFFER_PLUS_HH
#define SPONGE_BUFFER_PLUS_HH#include <memory>
#include <string>//! \brief A reference-counted read-only string that can discard bytes from the front
class BufferPlus {private:std::shared_ptr<std::string> _storage{};size_t _starting_offset{};size_t _ending_offset{};public:BufferPlus() = default;//! \brief Construct by taking ownership of a stringBufferPlus(std::string &&str) noexcept : _storage(std::make_shared<std::string>(std::move(str))) {}//! \name Expose contents as a std::string_view//!@{std::string_view str() const {if (!_storage) {return {};}return {_storage->data() + _starting_offset, _storage->size() - _starting_offset - _ending_offset};}operator std::string_view() const { return str(); }//!@}//! \brief Get character at location `n`uint8_t at(const size_t n) const { return str().at(n); }//! \brief Size of the stringsize_t size() const {return _storage ? _storage->size() - _starting_offset - _ending_offset : 0;}//! \brief Make a copy to a new std::stringstd::string copy() const { return std::string(str()); }//! \brief Discard the first `n` bytes of the string (does not require a copy or move)//! \note Doesn't free any memory until the whole string has been discarded in all copies of the Buffer.void remove_prefix(size_t n) {if(!_storage) {return;}if (n >= str().size()) {_storage.reset();return;}_starting_offset += n;}//! \brief Discard the last `n` bytes of the string (does not require a copy or move)void remove_suffix(size_t n) {if(!_storage) {return;}if(n >= str().size()) {_storage.reset();return;}_ending_offset += n;}
};#endif  // SPONGE_BUFFER_PLUS_HH
libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH#include "buffer_plus.hh"
#include "byte_stream.hh"#include <cstdint>
#include <set>
#include <string>//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {private:// Your code here -- add private members as necessary.struct Block {size_t index;     //!< The index of the data in BufferPlusBufferPlus data;  //!< The buffer to store the stringBlock(size_t idx, std::string &&str) noexcept : index(idx), data(std::move(str)) {}bool operator<(const Block &block) const { return index < block.index; }//! \brief the size of string in blocksize_t size() const { return data.size(); }};std::set<Block> _buffer{};     //!< The set to store unassembled stringssize_t _unassembled_bytes{0};  //!< Total number of unassembled bytesbool _eof{false};              //!< Flag indicating that the end of bytes has been stored into `_buffer`ByteStream _output;            //!< The reassembled in-order byte streamsize_t _capacity;              //!< The maximum number of bytesvoid insert_block(size_t index, std::string &&data);public://! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.//! \note This capacity limits both the bytes that have been reassembled,//! and those that have not yet been reassembled.StreamReassembler(const size_t capacity);//! \brief Receive a substring and write any newly contiguous bytes into the stream.//!//! The StreamReassembler will stay within the memory limits of the `capacity`.//! Bytes that would exceed the capacity are silently discarded.//!//! \param data the substring//! \param index indicates the index (place in sequence) of the first byte in `data`//! \param eof the last byte of `data` will be the last byte in the entire streamvoid push_substring(const std::string &data, const uint64_t index, const bool eof);//! \name Access the reassembled byte stream//!@{const ByteStream &stream_out() const { return _output; }ByteStream &stream_out() { return _output; }//!@}//! The number of bytes in the substrings stored but not yet reassembled//!//! \note If the byte at a particular index has been pushed more than once, it//! should only be counted once for the purpose of this function.size_t unassembled_bytes() const;//! \brief Is the internal state empty (other than the output stream)?//! \returns `true` if no substrings are waiting to be assembledbool empty() const;
};#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"// Dummy implementation of a stream reassembler.// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.// You will need to add private members to the class declaration in `stream_reassembler.hh`template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}using namespace std;StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) {}//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {size_t first_unassembled = _output.bytes_read() + _output.buffer_size();size_t first_unacceptable = _output.bytes_read() + _capacity;// the index of data after removing the reassembled bytessize_t idx = index;// the length of data after removing the reassembled or unacceptable bytessize_t len = data.size();// the eof flag of data after removingbool eof_flag = eof;// if the whole data string is reassembled or unacceptable, it will be discardedif (idx + len < first_unassembled || idx >= first_unacceptable) {return;}// remove the unassembled and unacceptable bytes in data// only update the `idx` and `len`if (idx < first_unassembled) {len -= first_unassembled - idx;idx = first_unassembled;}if (idx + len > first_unacceptable) {len = first_unacceptable - idx;// the eof must be false when removing the tail of dataeof_flag = false;}// update the `_eof` flag, only when the end byte can be stored, will `_eof` be true_eof = _eof || eof_flag;// insert the substr of data after removing to `_buffer`insert_block(idx, data.substr(idx - index, len));// traverse the beginning of `_buffer` and write the assembled strings to `_output`while (!_buffer.empty()) {const Block &b = *_buffer.begin();// if the block `b` is not the assembled stringif (first_unassembled != b.index) {break;}_output.write(b.data.copy());_unassembled_bytes -= b.size();first_unassembled += b.size();_buffer.erase(_buffer.begin());}// end the input of `_output` if `_eof` flag has been set and no unassembled bytesif (_eof && _unassembled_bytes == 0) {_output.end_input();}
}size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }//! use the data and its index to build a `Block` and insert it to `_buffer`
void StreamReassembler::insert_block(size_t index, string &&data) {// the size of `_unassembled_bytes` should updatesize_t delta_bytes = data.size();// do nothing if no dataif (delta_bytes == 0) {return;}Block block(index, move(data));// a flag to record whether to insertbool hasInsert = false;// traverse all blocks in `_buffer`// note: using iterator because some blocks will be erased, use `for-range` is wrongfor (auto it = _buffer.begin(); it != _buffer.end();) {const Block &b = *it;// if the whole `block` is to the left of `b`, it can be inserted allif (block.index + block.size() <= b.index) {_buffer.insert(block);hasInsert = true;break;}// if the whole `block` is to the right of `b`, it should compare with next oneif (block.index >= b.index + b.size()) {++it;continue;}// if `b` is part of the `block`, we should erase `b`if (block.index < b.index && block.index + block.size() > b.index + b.size()) {// update `delta_bytes`delta_bytes -= b.size();it = _buffer.erase(it);} else if (block.index < b.index) {// if the tail of `block` overlaps with `b`, remove its suffix and insert to `_buffer`size_t delta = block.index + block.size() - b.index;block.data.remove_suffix(delta);_buffer.insert(block);delta_bytes -= delta;hasInsert = true;break;} else if (block.index + block.size() > b.index + b.size()) {// if the head of `block` overlaps with `b`, remove its prefix and compare with the next blocksize_t delta = b.index + b.size() - block.index;block.data.remove_prefix(delta);block.index += delta;delta_bytes -= delta;++it;} else {// if the `block` is part of `b`, it should not be inserted to `_buffer`delta_bytes = 0;hasInsert = true;break;}}// insert `block` to `_buffer` if it has not been inserted after traversing `_buffer`if (!hasInsert) {_buffer.insert(block);}_unassembled_bytes += delta_bytes;
}

测试

build 目录下执行 make 后执行 make check_lab1:

[CS144] Lab 1: stitching substrings into a byte相关推荐

  1. Stanford CS144: Lab 4

    1 文档解读 如果本lab中测试点99及之后有问题,考虑是环境问题!! 1.1 Receiving segments 如果RST标志位被设置,把 inbound and outbound stream ...

  2. Stanford University CS144 Lab2 The TCP Receiver

    文章目录 前引 Lab2 The TCP Receiver 获取实验文档+调试方法介绍 1.Overview(总览) 2.Getting started(开始) 编译报错 获取LIBPCAP-DEV ...

  3. Stanford University CS144 Lab0 Networking Warmup

    文章目录 前引 二次编辑补引 Lab0 Networking Warmup 获取实验文档 Ubuntu20.04安装(Vmware Workstation) Ubuntu20.04安装G++8 切换G ...

  4. 国内高校计算机教育存在哪些问题?

    最近我在逛知乎的时候看到一个问题: 随手写了个回答,没想到很多读者都表示说出了心声,所以也同步发到公众号. 以下是原回答 我自己是 CS 科班的,读者里也有很多各大高校计算机的同学,覆盖了上交.北邮. ...

  5. CS144 计算机网络 lab1

    CS144 计算机网络 lab1 上一学期刚学习了计算机网络这门课程,感觉还是挺有意思的,了解到CS144的lab所以便打算利用假期练练手,顺便也尝试着开始写博客.代码风格还很新手,多多包涵,欢迎讨论 ...

  6. CS144 计算机网络实验 lab3 笔记

    CS144 计算机网络实验 lab3 笔记 介绍 本实验中,我们将会在之前实验的基础上,实现一个TCP sender ----将字节流转换成数据报并发送. TCP协议是一个在不可靠的协议上提供可靠的, ...

  7. CS144 lab0 笔记

    CS144 lab0 笔记 下学期就学习计算机网络了,假期正好找个lab预习一下 配置 直接用 WSL2 + Clion(安装在WSL2上) 做的实验,还是比用vscode方便一些的 直接fork仓库 ...

  8. 【斯坦福计网CS144项目】环境配置 Lab0: ByteStream

    前言 感觉学了不少 C++ 编程的知识和技术但比较缺少实践,于是打算找一些项目跟着做一做. 首先安利一个自学网站 CS自学指南,北大的同学做的,汇总了很多国内外高校 CS 相关的高质量公开课,其中大部 ...

  9. data lab 1(暂时只放题目)

    2019独角兽企业重金招聘Python工程师标准>>> /* * CS:APP Data Lab * * <Please put your name and userid he ...

最新文章

  1. ASP.NET缓存 Cache之数据缓存
  2. android 强制打开gps定位_Android 6.0 默认关闭定位和GPS,开启后默认选省电
  3. 【做题记录】CF1444A Division
  4. (15)HTML面试题集锦
  5. PAT (Basic Level) Practice (中文)1043 输出PATest (20 分)
  6. shellcode编写技巧
  7. Linux/Unix服务端和客户端Socket编程入门实例(含源码下载)
  8. SVD原理及代码实现
  9. java 设置启动参数设置_1.java程序启动参数配置
  10. Azure School女神相邀,把每分钟都过的更充实
  11. 【闸机】KEIL安装pack包
  12. couldn't connect to the device trackpad
  13. 计算机局域网地址设置方法,怎么设置局域网电脑的ip地址和DNS?
  14. 解析P2P金融的业务安全
  15. SpringBoot exclude的使用
  16. 年终总结:华为|字节|腾讯|京东|网易|滴滴面经分享(斩获6个offer)
  17. 从学生到社会人_EmbeddedLove
  18. 推荐闪电王子和非洲王子鱼
  19. AI工具是帮手还是助手:
  20. qt——widget

热门文章

  1. 【数据聚类】第五章第一节:基于网格的聚类算法概述
  2. java指令工具_jvm 指令工具 jcmd 命令(Java多功能命令行)
  3. 微信小程序 监听手势滑动切换页面
  4. 嵌入式软件面试题整理
  5. C++ filesystem 文件系统初体验
  6. 趋势面法优缺点_趋势面分析法
  7. OpenCV实战——拟合直线
  8. Google chrome谷歌浏览器,打开后是百度搜索或其他搜索怎么办?
  9. 关于《职场路上》专栏介绍
  10. GEE--LandTrendr