这一次我们要实现TCP的发送方,这一次我把必要的注释写在代码里面了.

1.头文件:

class TCPSender {
  private:
    //! our initial sequence number, the number for our SYN.
    WrappingInt32 _isn;
    uint64_t base{0};
    //! outbound queue of segments that the TCPSender wants sent
    std::queue<TCPSegment> _segments_out{};
    //cached TCPSegment.
    std::queue<TCPSegment> _segments_out_cached{};
    //! retransmission timer for the connection
    unsigned int _initial_retransmission_timeout;

    //! outgoing stream of bytes that have not yet been sent
    ByteStream _stream;
    //nextseq numbers as the absolute TCP number.
    uint64_t _next_seqnum{0};
    //slide windows size
    uint16_t _curr_window_size;
    //isfinished?
    bool _isfin;
    size_t _times;
    //ticking?
    bool _time_waiting;
    //remission times.
    int _consecutive_remission;
    // when is time out?
    size_t _time_out;
    //empty windows?
    bool _window_zero;
    //! the (absolute) sequence number for the next byte to be sent
    uint64_t _next_seqno{0};

  public:
    //! Initialize a TCPSender
    TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
              const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
              const std::optional<WrappingInt32> fixed_isn = {});

    //! \name "Input" interface for the writer
    //!@{
    ByteStream &stream_in() { return _stream; }
    const ByteStream &stream_in() const { return _stream; }
    //!@}

    //! \name Methods that can cause the TCPSender to send a segment
    //!@{

    //! \brief A new acknowledgment was received
    void ack_received(const WrappingInt32 ackno, const uint16_t window_size);

    //! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
    void send_empty_segment();

    //! \brief create and send segments to fill as much of the window as possible
    void fill_window();

    //! \brief Notifies the TCPSender of the passage of time
    void tick(const size_t ms_since_last_tick);
    //!@}

    //! \name Accessors
    //!@{

    //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
    //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
    //! (see TCPSegment::length_in_sequence_space())
    size_t bytes_in_flight() const;

    //! \brief Number of consecutive retransmissions that have occurred in a row
    unsigned int consecutive_retransmissions() const;

    //! \brief TCPSegments that the TCPSender has enqueued for transmission.
    //! \note These must be dequeued and sent by the TCPConnection,
    //! which will need to fill in the fields that are set by the TCPReceiver
    //! (ackno and window size) before sending.
    std::queue<TCPSegment> &segments_out() { return _segments_out; }
    //!@}

    //! \name What is the next sequence number? (used for testing)
    //!@{

    //! \brief absolute seqno for the next byte to be sent
    uint64_t next_seqno_absolute() const { return _next_seqno; }

    //! \brief relative seqno for the next byte to be sent
    WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
    //!@}
};

2.发送数据函数:

void TCPSender::fill_window() {
    // windows is full or the programe is finished.
    if(_curr_window_size==0||_isfin){
        return;
    }
    //haven't send any bytes.
    if(_next_seqno==0){
        TCPSegment seg;
        // the TCP transmission start from _isn.
        seg.header().seqno = _isn;
        seg.header().syn = true;
        // the TCP first connection just send 1 bytes;
        _next_seqno = 1;
        _curr_window_size--;
        _segments_out.push(seg);
        _segments_out_cached.push(seg);
    }
    //the end of the file
    else if(_stream.eof()){
        //set the finish flag to true;
        _isfin = true;
        TCPSegment seg;
        seg.header().syn=false;
        seg.header().fin=true;
        //convert the absolute TCP number to TCP number.
        seg.header().seqno = wrap(_next_seqno,_isn);
        //the fin packet only send a byte.
        _next_seqno++;
        _curr_window_size--;
        _segments_out.push(seg);
        _segments_out_cached.push(seg);
    }
    //normal file
    else{
        //make sure the windows is not full and there's any data to convert.
        while(!_stream.buffer_empty()&&_curr_window_size>0){
            //decide the length of the TCP Segment.
            //make sure the length of TCP segment is below the silde windows size and data length.
            uint64_t lens_byte=std::min(_stream.buffer_size(),uint64_t (_curr_window_size));
            lens_byte=std::min(lens_byte,TCPConfig::MAX_PAYLOAD_SIZE);
            TCPSegment seg;
            seg.header().seqno = wrap(_next_seqno,_isn);
            seg.header().syn = false;
            //get the lens_byte data to the payload.
            seg.payload()=_stream.read(lens_byte);
            // increase the next seq_no;
            _next_seqno += lens_byte;
            _curr_window_size -= lens_byte;
            // get the end of the file.
            if(_stream.eof()&&_curr_window_size>0){
                _isfin = true;
                seg.header().fin=true;
                //the fin packet only send a byte.
                _next_seqno++;
                _curr_window_size--;
            }
            _segments_out.push(seg);
            _segments_out_cached.push(seg);
            if(_isfin){
                break;
            }        
        }
    }
    //start ticking...
    if(!_time_waiting){
        _time_out = _initial_retransmission_timeout;
        _time_waiting = true;
        _times = 0;
    }
}

3.接受ACK:

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { 
    DUMMY_CODE(ackno, window_size); 
    // get the absolute TCP number of ACK...
    uint64_t acknos = unwrap(ackno,_isn,base);
    //thrid connection...
    //means the 0th bytes gets and desire to 1st bytes...
    if(base==0&&acknos==1){
        base=1;
        _segments_out_cached.pop();
        _consecutive_remission=0;
    }
    else if(acknos > _next_seqno){
        return;
    }
    //the ack number is bigger than first cached segment...
    //means the cached data gets by the reciever...
    else if(!_segments_out_cached.empty() && acknos >= base + _segments_out_cached.front().length_in_sequence_space()){
        //first segment in cache, and get the seqno and length of the segment...
        uint64_t copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
        uint64_t copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
        //find the segments that acked by recevier...
        //hint:if seqno+len<=ackno:means the data is acked by recevier...
        while(copy_seg_len+copy_seg_seqno<=acknos){
            //move the base, base is the 1st bytes that nor acked...
            base += _segments_out_cached.front().length_in_sequence_space();
            _segments_out_cached.pop();
            if(_segments_out_cached.empty()) break;
            // judge the 2nd segs...
            copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
            copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
        }
        _time_out = _initial_retransmission_timeout;
        _times = 0;
        _consecutive_remission = 0;
    }
    // 3rd disconnection.
    else if(acknos == _next_seqno && _isfin){
        base = acknos;
        _segments_out_cached.pop();
    }
    //the windows is empty
    if(_next_seqno-base==0){
        _time_waiting = false;
    }
    // 流量控制,发送方窗口不大于接受方窗口
    else if(_next_seqno-base>=window_size){
        _curr_window_size = 0;
        return;
    }
    if(window_size==0){
        _curr_window_size = 1;
        _window_zero = true;
    }
    else{
        _curr_window_size = window_size;
        _window_zero = false;
        _consecutive_remission = 0;
    }
    fill_window();
}

4. 构造函数

//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , base(0)
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _curr_window_size(1)
    , _isfin(false)
    , _times(0)
    , _time_waiting(false)
    , _consecutive_remission(0)
    , _time_out(0)
    , _window_zero(false)
    {

    }

5.超时处理:


//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) { 
    DUMMY_CODE(ms_since_last_tick);
    // the times pased by
    _times += ms_since_last_tick;
    //timeout and non-empty cache. resend...
    if(!_segments_out_cached.empty()&&_time_waiting&&_times>=_time_out){
        //resend..
        _segments_out.push(_segments_out_cached.front());
        // increase the time out times...
        if(!_window_zero){
            //add remissions
            _consecutive_remission++;
            _time_out*=2;
            _time_waiting = true;
        }
        _times=0;
    }
}


0 条评论

发表评论

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用*标注

隐藏