原文:https://databrains.cn/2021/11/27/Stanford-CS144-Lab1/

该lab要求我们实现一个流重组类,可以将Sender发来的带索引号的字节碎片重组成有序的字节写入到byte_stream。接收端从发送端读取数据,调用流重组器,流重组器对数据进行排序,排序好后写入byte_stream。

流重组器也是有capasity的,也就是说流重组器也有一定的容量,一次性处理太多的信息会导致流重组器不能够正常地工作.同样的我们把流处理器当成一个双端队列即可.

private类中还有一个ByteStream类型的变量,所有的内容都输出给ByteStream,还有一个容量变量.其中ByteStream中的bytes_read返回ByteStream处理了多少元素.

因为重组类的函数中,支持的index是first unread=_output.bytes_read()(已经读取的元素)到first unacceptable的这一块区域,我们要保证输入的字节编号是在这个区域里面的.

在重组器类的函数中,push_substring函数完成重组工作。其参数有data(string),index(size_t),eof(bool),data是待排序的数据,index是数据首元素的序号,eof用于判断是否结束。结束后将不再有数据输入。

在重组的过程中,我们会遇到重复、丢包、重叠、乱序的现象。为了使乱序的数据找到有序的位置,我使用’\0’维护重组器中数据的相对序号,例如,第一次data为make,index为0,第二次data为great,index为13,而处于两组数据中间的数据未知,我们就用’\0’代替,即make\0\0\0\0\0\0\0\0\0great。这样就维护了已经进入重组器的数据的有序。当然,写入的data中也有可能含有\0,这是,我们就需要一个bool双端队列,来记录相应位置的数据是否有序,在上述例子中,队列的bool值为111100000000011111。

所以说我们在数据结构中添加几项,一个是_unassembled_byte,是一个std::deque<char>,暂时存储还乱序的字符串,_check_byte是std::deque<bool>,这个元素与_unassembled_byte一一对应,当un[i]存储着还没有发送的字符的时候,ch[I]=true,否则为false,还有一个_lens_un,这个记录乱序的字符的长度.

程序的总体结构:

发送端的数据->流重组器(重组成有序的数据)->Bytestream(在Lab0就做好的队列)->TCP接收端.
流重组器需要做的是,把所有有序的数据写入到接收端.
其中字符的编号是从1一直往后延伸的,因为队列的首和尾都可以记录.TCP的发送端发送的数据也是(字符号、字符串)字符的编号一直往后延伸.

这个时候我们回忆一下对应数据的表示:

output.bytes_read():接收端从ByteStream获得的字符数量.

output.bytes_write():流重组器写入ByteStream的字符数量-1.而且是流重组器的有效数据中index最小的序号

_lens_un指的还在流重组器里面的数据的长度.

其中:output.bytes_read()+_capacity是ByteStream可以接受的范围,output.bytes_write()+_lens_un是流重组器的有效数据中index最大的序号.

1.我们判断输入序号是否大于内存与已读取数据之和,也就是说,该数据是否属于unacceptable中的数据,如果是这样的数据,我们没有足够的内存写入,因为写入这样的数据需要添加\0,从而超过capasity的大小。代码如下:

1
2
3
if(index>_output.bytes_read()+_capacity){
return;
}

2.字符串部分在区域内,但是部分在区域外,那就把区域外的内容舍弃,只读取区域内的内容.

我们需要判断data中最后一个数据的序号是否大于内存与已读取数据之和,如果大于,我们就要将能写入的部分写入,也就是按data的顺序尽可能地写入数据而不超过capasity,在写入的过程中,我们也会遇到两种情况,一种是序号index大于此时已经在流重组器的最后一个数据的序号,在这种情况下我们要在流重组器最后一个序号与index之间填入’\0′,同时将相应的bool双端队列(_check_byte)设置为false,做完这些工作后,才开始写入新的数据。另一种情况是index的小于或者等于流重组器最后一个数据的序号,我们需要弹出冲突的数据,举个例子就是,index序号为5,此时流重组器中的数据为stanford,我们就要从序号5的数据也就是o开始弹出,变成stanf,再写入data中的数据。代码如下:

if(index+data.length()>_capacity+_output.bytes_read()){
    for(size_t i=_lens_un+_output.bytes_written();i<_capacity+_output.bytes_read();i++){
        if(i<index){
            _unassembled_byte.push_back('\0');
            _check_byte.push_back(false);

        }else{
            _unassembled_byte.push_back(data[i-index]);
            _check_byte.push_back(true);

        }
        _lens_un++;
    }
}

3.我们要判断index是否等于已经写入byte_stream(_output)中的数据,如果是的,我们就直接将data中的数据写入byte_stream,然后在重组器中弹出data.length()个数据,值得注意的是,当重组器中的数据个数小于data.length(),我们就全部弹出。但是后面的数据会被当成无效数据而不进行处理,代码如下:

if(index==_output.bytes_written()){
//直接写
    _output.write(data);
    size_t temp_len=std::min(_lens_un,data.length());
    _unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
    _check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
    _lens_un-=temp_len;
}

4.我们要判断index是否大于流重组器中的最后一个数据的序号和写入byte_stream中的数据个数之和,如果大于,我们就可以参考1的处理,代码如下:

if(index>_output.bytes_written()+_lens_un){
    for(size_t i=_output.bytes_written()+_lens_un;i<index;i++){
        _unassembled_byte.push_back('\0');
        _check_byte.push_back(false);
        _lens_un++;
    }
[原来的data][空][新的data]
    for(char i : data){
        _unassembled_byte.push_back(i);
        _lens_un++;
        _check_byte.push_back(true);
    }
}

5.我们要判断data中的数据是否已经被写入byte_stream,这个说法有些不准确,准确的说是相应序号的数据被写入,如果data中的所有数据都被写入了byte_stream,我们就直接返回,如果只是部分被写入,我们就将data中未被写入的部分写入。代码如下:

if(index<_output.bytes_written()){
    if(_output.bytes_written()>index+data.length()){
        return;
    }
//[已经写入Byte_stream的][bytes_written()][新传来的data在bytes_written()之后的,入队][原来在_output.bytes_written()+_lens_un之后的data]
//还是要写,一直写到data最后.
    std::string data_cut(data.begin()+_output.bytes_written()-index,data.end());
    _output.write(data_cut);
    size_t temp_len=std::min(_lens_un,data_cut.length());
    _unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
    _check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
    _lens_un-=temp_len;

6.不是任何情况:首先我们知道要把_output.bytes_written()~index这一部分的内容保存好,然后再把data加入进去即可

//在中间插入元素
//先弹出一部分数据保存到栈中
std::stack<char> temp;
std::stack<bool> temp_check;
for(size_t i=0;i<index-_output.bytes_written();i++){
    temp.push(_unassembled_byte.at(i));
    temp_check.push(_check_byte.at(i));
}
[原data,入队][index][新传来的data,入队][原来在_output.bytes_written()+_lens_un之后的data]
//这里是看数据的最后一个index有没有达到_output.bytes_written()+_lens_un,达到的话后面的内容要保留,没达到就全部删除即可
size_t temp_len=std::min(_lens_un,data.length()+index-_output.bytes_written());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
for(int i=data.length()-1;i>=0;i--){
    _unassembled_byte.push_front(data[i]);
    _check_byte.push_front(true);
    _lens_un++;
}
while(!temp.empty()){
    _unassembled_byte.push_front(temp.top());
    _check_byte.push_front(temp_check.top());
    _lens_un++;
    temp.pop();
    temp_check.pop();
}

7.输入字符串到ByteStream中:

    size_t i=0;
    while(i<_lens_un){
        if(!_check_byte.at(i)){
            break;
        }
        i++;
    }
    std::string n(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
    _output.write(n);
    _unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
    _lens_un-=i;
    _check_byte.erase(_check_byte.begin(),_check_byte.begin()+i);
    if(eof) input_end_index=index+data.length();
    if(input_end_index==_output.bytes_written()) _output.end_input();

0 条评论

发表评论

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用*标注

隐藏