Stanford CS144 Lab1.stitching substrings into a byte stream
原文:https://databrains.cn/2021/11/27/Stanford-CS144-Lab1/
该lab要求我们实现一个流重组类,可以将Sender发来的带索引号的字节碎片重组成有序的字节写入到byte_stream。接收端从发送端读取数据,调用流重组器,流重组器对数据进行排序,排序好后写入byte_stream。
流重组器也是有capasity的,也就是说流重组器也有一定的容量,一次性处理太多的信息会导致流重组器不能够正常地工作.同样的我们把流处理器当成一个双端队列即可.

private类中还有一个ByteStream类型的变量,所有的内容都输出给ByteStream,还有一个容量变量.其中ByteStream中的bytes_read返回ByteStream处理了多少元素.

因为重组类的函数中,支持的index是first unread=_output.bytes_read()(已经读取的元素)到first unacceptable的这一块区域,我们要保证输入的字节编号是在这个区域里面的.
在重组器类的函数中,push_substring函数完成重组工作。其参数有data(string),index(size_t),eof(bool),data是待排序的数据,index是数据首元素的序号,eof用于判断是否结束。结束后将不再有数据输入。
在重组的过程中,我们会遇到重复、丢包、重叠、乱序的现象。为了使乱序的数据找到有序的位置,我使用’\0’维护重组器中数据的相对序号,例如,第一次data为make,index为0,第二次data为great,index为13,而处于两组数据中间的数据未知,我们就用’\0’代替,即make\0\0\0\0\0\0\0\0\0great。这样就维护了已经进入重组器的数据的有序。当然,写入的data中也有可能含有\0,这是,我们就需要一个bool双端队列,来记录相应位置的数据是否有序,在上述例子中,队列的bool值为111100000000011111。
所以说我们在数据结构中添加几项,一个是_unassembled_byte,是一个std::deque<char>,暂时存储还乱序的字符串,_check_byte是std::deque<bool>,这个元素与_unassembled_byte一一对应,当un[i]存储着还没有发送的字符的时候,ch[I]=true,否则为false,还有一个_lens_un,这个记录乱序的字符的长度.
程序的总体结构:
发送端的数据->流重组器(重组成有序的数据)->Bytestream(在Lab0就做好的队列)->TCP接收端.
流重组器需要做的是,把所有有序的数据写入到接收端.
其中字符的编号是从1一直往后延伸的,因为队列的首和尾都可以记录.TCP的发送端发送的数据也是(字符号、字符串)字符的编号一直往后延伸.
这个时候我们回忆一下对应数据的表示:
output.bytes_read()
:接收端从ByteStream获得的字符数量.
output.bytes_write()
:流重组器写入ByteStream的字符数量-1.而且是流重组器的有效数据中index最小的序号
_lens_un
指的还在流重组器里面的数据的长度.
其中:output.bytes_read()+_capacity
是ByteStream可以接受的范围,output.bytes_write()+_lens_un
是流重组器的有效数据中index最大的序号.
1.我们判断输入序号是否大于内存与已读取数据之和,也就是说,该数据是否属于unacceptable中的数据,如果是这样的数据,我们没有足够的内存写入,因为写入这样的数据需要添加\0,从而超过capasity的大小。代码如下:
1 2 3 | if(index>_output.bytes_read()+_capacity){ return; } |
2.字符串部分在区域内,但是部分在区域外,那就把区域外的内容舍弃,只读取区域内的内容.
我们需要判断data中最后一个数据的序号是否大于内存与已读取数据之和,如果大于,我们就要将能写入的部分写入,也就是按data的顺序尽可能地写入数据而不超过capasity,在写入的过程中,我们也会遇到两种情况,一种是序号index大于此时已经在流重组器的最后一个数据的序号,在这种情况下我们要在流重组器最后一个序号与index之间填入’\0′,同时将相应的bool双端队列(_check_byte)设置为false,做完这些工作后,才开始写入新的数据。另一种情况是index的小于或者等于流重组器最后一个数据的序号,我们需要弹出冲突的数据,举个例子就是,index序号为5,此时流重组器中的数据为stanford,我们就要从序号5的数据也就是o开始弹出,变成stanf,再写入data中的数据。代码如下:
if(index+data.length()>_capacity+_output.bytes_read()){
for(size_t i=_lens_un+_output.bytes_written();i<_capacity+_output.bytes_read();i++){
if(i<index){
_unassembled_byte.push_back('\0');
_check_byte.push_back(false);
}else{
_unassembled_byte.push_back(data[i-index]);
_check_byte.push_back(true);
}
_lens_un++;
}
}
3.我们要判断index是否等于已经写入byte_stream(_output)中的数据,如果是的,我们就直接将data中的数据写入byte_stream,然后在重组器中弹出data.length()个数据,值得注意的是,当重组器中的数据个数小于data.length(),我们就全部弹出。但是后面的数据会被当成无效数据而不进行处理,代码如下:
if(index==_output.bytes_written()){
//直接写
_output.write(data);
size_t temp_len=std::min(_lens_un,data.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
}
4.我们要判断index是否大于流重组器中的最后一个数据的序号和写入byte_stream中的数据个数之和,如果大于,我们就可以参考1的处理,代码如下:
if(index>_output.bytes_written()+_lens_un){
for(size_t i=_output.bytes_written()+_lens_un;i<index;i++){
_unassembled_byte.push_back('\0');
_check_byte.push_back(false);
_lens_un++;
}
[原来的data][空][新的data]
for(char i : data){
_unassembled_byte.push_back(i);
_lens_un++;
_check_byte.push_back(true);
}
}
5.我们要判断data中的数据是否已经被写入byte_stream,这个说法有些不准确,准确的说是相应序号的数据被写入,如果data中的所有数据都被写入了byte_stream,我们就直接返回,如果只是部分被写入,我们就将data中未被写入的部分写入。代码如下:
if(index<_output.bytes_written()){
if(_output.bytes_written()>index+data.length()){
return;
}
//[已经写入Byte_stream的][bytes_written()][新传来的data在bytes_written()之后的,入队][原来在_output.bytes_written()+_lens_un之后的data]
//还是要写,一直写到data最后.
std::string data_cut(data.begin()+_output.bytes_written()-index,data.end());
_output.write(data_cut);
size_t temp_len=std::min(_lens_un,data_cut.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
6.不是任何情况:首先我们知道要把_output.bytes_written()~index这一部分的内容保存好,然后再把data加入进去即可
//在中间插入元素
//先弹出一部分数据保存到栈中
std::stack<char> temp;
std::stack<bool> temp_check;
for(size_t i=0;i<index-_output.bytes_written();i++){
temp.push(_unassembled_byte.at(i));
temp_check.push(_check_byte.at(i));
}
[原data,入队][index][新传来的data,入队][原来在_output.bytes_written()+_lens_un之后的data]
//这里是看数据的最后一个index有没有达到_output.bytes_written()+_lens_un,达到的话后面的内容要保留,没达到就全部删除即可
size_t temp_len=std::min(_lens_un,data.length()+index-_output.bytes_written());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
for(int i=data.length()-1;i>=0;i--){
_unassembled_byte.push_front(data[i]);
_check_byte.push_front(true);
_lens_un++;
}
while(!temp.empty()){
_unassembled_byte.push_front(temp.top());
_check_byte.push_front(temp_check.top());
_lens_un++;
temp.pop();
temp_check.pop();
}
7.输入字符串到ByteStream中:
size_t i=0;
while(i<_lens_un){
if(!_check_byte.at(i)){
break;
}
i++;
}
std::string n(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_output.write(n);
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_lens_un-=i;
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+i);
if(eof) input_end_index=index+data.length();
if(input_end_index==_output.bytes_written()) _output.end_input();

0 条评论