大致的处理过程
TCP的接收流程:在tcp_v4_do_rcv中的相关处理(网卡收到报文触发)中,会首先通过tcp_check_urg设置tcp_sock的urg_data为TCP_URG_NOTYET(urgent point指向的可能不是本报文,而是后续报文或者前面收到的乱序报文),并保存最新的urgent data的sequence和对于的1 BYTE urgent data到tcp_sock的urg_data (如果之前的urgent data没有读取,就会被覆盖)。
用户接收流程:在tcp_recvmsg流程中,如果发现当前的skb的数据中有urgent data,首先拷贝urgent data之前的数据,然后tcp_recvmsg退出,提示用户来接收OOB数据;在用户下一次调用tcp_recvmsg来接收数据的时候,会跳过urgent data,并设置urgent data数据接收完成
/* This is the 'fast' part of urgent handling. Linxu内核在默认情况下,把urgent data实现为OOB数据 TCP的接收流程:在tcp_v4_do_rcv中的相关处理(网卡收到报文触发)中,会首先通过tcp_check_urg设置tcp_sock的urg_data为 TCP_URG_NOTYET(urgent point指向的可能不是本报文,而是后续报文或者前面收到的乱序报文),并保存最新的urgent data的sequence 和对于的1 BYTE urgent data到tcp_sock的urg_data (如果之前的urgent data没有读取,就会被覆盖)。 用户接收流程:在tcp_recvmsg流程中,如果发现当前的skb的数据中有urgent data,首先拷贝urgent data之前的数据, 然后tcp_recvmsg退出,提示用户来接收OOB数据;在用户下一次调用tcp_recvmsg来接收数据的时候,会跳过urgent data, 并设置urgent data数据接收完成。 相关的数据结构和定义 urg_data成员,其高8bit为urgent data的接收状态;其低8位为保存的1BYTE urgent数据。 urgent data的接收状态对应的宏的含义描述: #defineTCP_URG_VALID 0x0100 //*urgent data已经读到了tcp_sock::urg_data #defineTCP_URG_NOTYET 0x0200 //*已经发现有urgent data,还没有读取到tcp_sock::urg_data #defineTCP_URG_READ 0x0400 //*urgent data已经被用户通过MSG_OOB读取了 */ static void tcp_urg(struct sock *sk, struct sk_buff *skb, const struct tcphdr *th) { struct tcp_sock *tp = tcp_sk(sk); /* Check if we get a new urgent pointer - normally not. */ if (th->urg)/*收到了urgent data,则检查和设置urg_data和urg_seq成员*/ tcp_check_urg(sk, th); /* Do we wait for any urgent data? - normally not... */ if (tp->urg_data == TCP_URG_NOTYET) { u32 ptr = tp->urg_seq - ntohl(th->seq) + (th->doff * 4) - th->syn; /* Is the urgent pointer pointing into this packet? 发现了有urgent data,但是还没有保存到tp->urg_data*/ */ if (ptr < skb->len) { u8 tmp; if (skb_copy_bits(skb, ptr, &tmp, 1)) BUG(); tp->urg_data = TCP_URG_VALID | tmp; if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk); } } }
用户接收数据接口
用户接收URG数据的接口
static int tcp_recv_urg(struct sock *sk, struct msghdr *msg, int len, int flags) { struct tcp_sock *tp = tcp_sk(sk); /* No URG data to read. 用户已经读取过 或者没数据*/ if (sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data || tp->urg_data == TCP_URG_READ) return -EINVAL; /* Yes this is right ! */ if (sk->sk_state == TCP_CLOSE && !sock_flag(sk, SOCK_DONE)) return -ENOTCONN; /*当前的tp->urg_data为合法的数据,可以读取*/ if (tp->urg_data & TCP_URG_VALID) { int err = 0; char c = tp->urg_data; if (!(flags & MSG_PEEK)) tp->urg_data = TCP_URG_READ; /* Read urgent data. */ msg->msg_flags |= MSG_OOB; if (len > 0) { if (!(flags & MSG_TRUNC)) err = memcpy_to_msg(msg, &c, 1); len = 1; } else msg->msg_flags |= MSG_TRUNC; return err ? -EFAULT : len; } if (sk->sk_state == TCP_CLOSE || (sk->sk_shutdown & RCV_SHUTDOWN)) return 0; /* Fixed the recv(..., MSG_OOB) behaviour. BSD docs and * the available implementations agree in this case: * this call should never block, independent of the * blocking state of the socket. * Mike <pall@rz.uni-karlsruhe.de> */ return -EAGAIN; }
用户接收普通数据的接口中的相关处理
在用户接收数据的tcp_recvmsg函数中,在查找到待拷贝的skb后,首先拷贝urgent data数据前的数据,然后退出接收过程,在用户下一次执行tcp_recvmsg的时候跳过urgent data,设置urgent data读取结束
查找到准备拷贝的skb后的处理:
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, int flags, int *addr_len) { ------------------------- /* Urgent data needs to be handled specially. */ if (flags & MSG_OOB) goto recv_urg; --------------------------------------------- /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. 在接收完urgent data数据前的所有数据之后, tcp_recvmsg的以下代码片段得到执行, 这段代码退出当前接收过程,提示用户有urgent data数据到来,需要用MSG_OOB来接收 */ if (tp->urg_data && tp->urg_seq == *seq) { if (copied) break; if (signal_pending(current)) { copied = timeo ? sock_intr_errno(timeo) : -EAGAIN; break; } } ------------------------------------- /* Do we have urgent data here? */ if (tp->urg_data) {/* 当前有urg_data数据*/ u32 urg_offset = tp->urg_seq - *seq; if (urg_offset < used) {/*urgent data在当前待拷贝的数据范围内*/ if (!urg_offset) {/*待拷贝的数据就是urgent data,跨过该urgent data, 只给用户读取后面的数据*/ if (!sock_flag(sk, SOCK_URGINLINE)) { ++*seq; urg_hole++; offset++; used--; if (!used) goto skip_copy; } } else/*指定只拷贝urgent data数据之前的,完成后在下一次循环 开始的位置,会退出循环,返回用户;下一次用户调用tcp_recvmsg 就进入到上面的分支了*/ used = urg_offset; } } --------------------- skip_copy:/*用户读取的数据跨过了urgent point,设置读取结束 开启fast path*/ if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) { tp->urg_data = 0; tcp_fast_path_check(sk); } ----------------------------------- out: release_sock(sk); return err; recv_urg: err = tcp_recv_urg(sk, msg, len, flags); goto out; }
TCP的urg数据,由于定义和实现上的混乱,当前已经不建议使用,但是为了兼容之前已经已经存在的实现,该机制会长期在内核中存在,如果不了解该机制及其内核行为,有可能就很难解释一些奇怪的问题:比如某段代码不小心地造成send接口事实上设置了MSG_OOB,就会造成接收端少了一个BYTE
} /* This is what the send packet queuing engine uses to pass * TCP per-packet control information to the transmission code. * We also store the host-order sequence numbers in here too. * This is 44 bytes if IPV6 is enabled. * If this grows please adjust skbuff.h:skbuff->cb[xxx] size appropriately. */ struct tcp_skb_cb { __u32 seq; /* Starting sequence number // 当前tcp包的第一个序列号*/ __u32 end_seq; /* SEQ + FIN + SYN + datalen// 表示当前tcp 包结束的序列号:seq + FIN + SYN + 数据长度len */ union { /* Note : tcp_tw_isn is used in input path only * (isn chosen by tcp_timewait_state_process()) * * tcp_gso_segs/size are used in write queue only, * cf tcp_skb_pcount()/tcp_skb_mss() */ __u32 tcp_tw_isn; struct { u16 tcp_gso_segs; u16 tcp_gso_size; }; }; __u8 tcp_flags; /* TCP header flags. (tcp[13]) tcp头的flag */ __u8 sacked; /* State flags for SACK/FACK. SACK/FACK的状态flag或者是sack option的偏移 */ #define TCPCB_SACKED_ACKED 0x01 /* SKB ACK'd by a SACK block */ #define TCPCB_SACKED_RETRANS 0x02 /* SKB retransmitted */ #define TCPCB_LOST 0x04 /* SKB is lost */ #define TCPCB_TAGBITS 0x07 /* All tag bits */ #define TCPCB_REPAIRED 0x10 /* SKB repaired (no skb_mstamp) */ #define TCPCB_EVER_RETRANS 0x80 /* Ever retransmitted frame */ #define TCPCB_RETRANS (TCPCB_SACKED_RETRANS|TCPCB_EVER_RETRANS| TCPCB_REPAIRED) __u8 ip_dsfield; /* IPv4 tos or IPv6 dsfield */ __u8 txstamp_ack:1, /* Record TX timestamp for ack? */ eor:1, /* Is skb MSG_EOR marked? */ unused:6; __u32 ack_seq; /* Sequence number ACK'd ack(确认)的序列号*/ union { struct { /* There is space for up to 24 bytes */ __u32 in_flight:30,/* Bytes in flight at transmit */ is_app_limited:1, /* cwnd not fully used? */ unused:1; /* pkts S/ACKed so far upon tx of skb, incl retrans: */ __u32 delivered; /* start of send pipeline phase */ struct skb_mstamp first_tx_mstamp; /* when we reached the "delivered" count */ struct skb_mstamp delivered_mstamp; } tx; /* only used for outgoing skbs */ union { struct inet_skb_parm h4; #if IS_ENABLED(CONFIG_IPV6) struct inet6_skb_parm h6; #endif } header; /* For incoming skbs */ }; }; static void tcp_data_queue(struct sock *sk, struct sk_buff *skb) { struct tcp_sock *tp = tcp_sk(sk); bool fragstolen = false; int eaten = -1; //没有数据部分,直接释放 if (TCP_SKB_CB(skb)->seq == TCP_SKB_CB(skb)->end_seq) { __kfree_skb(skb); return; } skb_dst_drop(skb); __skb_pull(skb, tcp_hdr(skb)->doff * 4); tcp_ecn_accept_cwr(tp, skb); tp->rx_opt.dsack = 0; /* Queue data for delivery to the user. * Packets in sequence go to the receive queue. * Out of sequence packets to the out_of_order_queue. *///如果该数据包刚好是下一个要接收的数据,则可以直接copy到用户空间(如果存在且可用 if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {//非乱序包 if (tcp_receive_window(tp) == 0)//接受窗口满了,不能接受 goto out_of_window; /* Ok. In sequence. In window. */ if (tp->ucopy.task == current &&//应用程序上下文中,在tcp_recvmsg tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&//正要读取这个包,并且应用程序还有剩余缓存 sock_owned_by_user(sk) && !tp->urg_data) {//应用程序持有锁 int chunk = min_t(unsigned int, skb->len, tp->ucopy.len);/* 带读取长度和数据段长度的较小值 */ __set_current_state(TASK_RUNNING); if (!skb_copy_datagram_msg(skb, 0, tp->ucopy.msg, chunk)) {//copy数据到ucopy中 tp->ucopy.len -= chunk; tp->copied_seq += chunk; eaten = (chunk == skb->len); tcp_rcv_space_adjust(sk);//每次copy数据到ucopy都要调用该函数,动态更新sk_rcvbuf大小 } } if (eaten <= 0) {//没有copy到ucopy,或者没有全部copy queue_and_out: if (eaten < 0) {//没有copy /* 没有拷贝到用户空间,对内存进行检查 */ if (skb_queue_len(&sk->sk_receive_queue) == 0)//sk_receive_queue没有数据 sk_forced_mem_schedule(sk, skb->truesize);//检查是否需要分配配额 else if (tcp_try_rmem_schedule(sk, skb, skb->truesize))//查看接收缓存空间够不够 goto drop; } eaten = tcp_queue_rcv(sk, skb, 0, &fragstolen);//添加到sk_receive_queue } tcp_rcv_nxt_update(tp, TCP_SKB_CB(skb)->end_seq);//尝试更新rcv_nxt if (skb->len) tcp_event_data_recv(sk, skb); if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN) tcp_fin(sk); //fin处理 if (!RB_EMPTY_ROOT(&tp->out_of_order_queue)) {//ofo队列非空 tcp_ofo_queue(sk);//尝试把ofo中的包移动到sk_receive_queue中 /* RFC2581. 4.2. SHOULD send immediate ACK, when * gap in queue is filled. */ if (RB_EMPTY_ROOT(&tp->out_of_order_queue)) inet_csk(sk)->icsk_ack.pingpong = 0;//ofo队列被清空了,需要立即发送ack } if (tp->rx_opt.num_sacks)// 新数据到达导致返回给对方的SACK Block 调整 tcp_sack_remove(tp);//如果当前收到的包带sack,则删除其中不需要的部分,有可能ofo填充造成了rcv_next的移动 tcp_fast_path_check(sk);//当前是slow path, 尝试开启快速路径 重新设置 tcp 首部预测标志 if (eaten > 0) kfree_skb_partial(skb, fragstolen);//已经全部copy到ucopy,可以释放skb了 if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk);//通知poll数据到达 return; } /* 重传 */ if (!after(TCP_SKB_CB(skb)->end_seq, tp->rcv_nxt)) {//判断是否是重传的旧包,标记dsack /* A retransmit, 2nd most common case. Force an immediate ack. */ NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKLOST); tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq);//设置dsack out_of_window: tcp_enter_quickack_mode(sk);//进入快速ack inet_csk_schedule_ack(sk);///标记需要ack drop: tcp_drop(sk, skb); return; } /* Out of window. F.e. zero window probe. *///收到的包在接收窗口范围之外 /* 窗口以外的数据,比如零窗口探测报文段 */ if (!before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt + tcp_receive_window(tp))) goto out_of_window; //乱序包或者需要dsack都要快速ack tcp_enter_quickack_mode(sk); /* 数据段重叠 */ if (before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt)) { /* Partial packet, seq < rcv_next < end_seq *///seq < rcv_next < end_seq,部分旧数据 SOCK_DEBUG(sk, "partial packet: rcv_next %X seq %X - %X ", tp->rcv_nxt, TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq); tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, tp->rcv_nxt);//设置dsack /* If window is closed, drop tail of packet. But after * remembering D-SACK for its head made in previous line. */ if (!tcp_receive_window(tp))//接收窗口不足,快速ack通知对方 goto out_of_window; goto queue_and_out;//调用tcp_queue_rcv添加到sk_receive_queue,会尝试合并 } tcp_data_queue_ofo(sk, skb);//乱续包,添加到ofo队列 }