在处理 ngbe 驱动问题时, 目前发现 hook netifrecv 会使用GRO 功能:
GRO(Generic Receive Offload)的功能将多个 TCP 数据聚合在一个skb结构,然后作为一个大数据包交付给上层的网络协议栈,以减少上层协议栈处理skb的开销,提高系统接收TCP数据包的性能。这个功能需要网卡驱动程序的支持。合并了多个skb的超级 skb能够一次性通过网络协议栈,从而减轻CPU负载。
GRO是针对网络收包流程进行改进的,并且只有NAPI类型的驱动才支持此功能。因此如果要支持GRO,不仅要内核支持,驱动也必须调用相应的接口来开启此功能。用ethtool -K gro on来开启GRO,如果报错就说明网卡驱动本身就不支持GRO。
GRO与TSO类似,但TSO只支持发送数据包。支持GRO的驱动会在NAPI的回调poll方法中读取数据包,然后调用GRO的接口napi_gro_receive或者napi_gro_frags来将数据包送进协议栈。
GRO将数据送进协议栈的点有两处,一个是在napi_skb_finish里,它会通过判断dev_gro_receive的返回值,来决定是否需要将数据包送入进协议栈;还有一个点是当napi的循环执行完毕执行napi_complete或者主动调用napi_gro_complete 的时候
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb) { trace_napi_gro_receive_entry(skb); skb_gro_reset_offset(skb); return napi_skb_finish(dev_gro_receive(napi, skb), skb); }
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb) { switch (ret) { case GRO_NORMAL://将数据包送进协议栈 if (netif_receive_skb_internal(skb)) ret = GRO_DROP; break; case GRO_DROP://表示skb可以被free,因为gro已经将skb合并并保存起来。 kfree_skb(skb); break; case GRO_MERGED_FREE://表示skb可以被free,因为gro已经将skb合并并保存起来。 if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD) napi_skb_free_stolen_head(skb); else __kfree_skb(skb); break; //这个表示当前数据已经被gro保存起来,但是并没有进行合并,因此skb还需要保存。 case GRO_HELD: case GRO_MERGED: break; } return ret; }
dev_gro_receive函数用于合并skb,并决定是否将合并后的大skb送入网络协议栈
static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb) { struct sk_buff **pp = NULL; struct packet_offload *ptype; __be16 type = skb->protocol; struct list_head *head = &offload_base;//&ptype_base[ntohs(type) & PTYPE_HASH_MASK] int same_flow; enum gro_result ret; int grow; if (!(skb->dev->features & NETIF_F_GRO)) goto normal; /*/ gro不支持切片的ip包,因为ip切片的组包在内核的ip会做一遍,因此这里gro如果合并的话, 没有多大意义,而且还增加复杂度 */ if (skb_is_gso(skb) || skb_has_frag_list(skb) || skb->csum_bad) goto normal; /*它主要是遍历gro_list,然后给same_flow赋值,这里要注意,same_flow是一个标记, 表示某个skb是否有可能会和当前要处理的skb是相同的流,而这里的相同会在每层都进行判断, 也就是在设备层,ip层,tcp层都会判断,这里就是设备层的判断了。 这里的判断很简单,有2个条件: 1 设备是否相同 2 mac的头必须相等 */ /*判断napi->gro_list链表里是否有跟skb是同一条流的,如果存在, 则将skb合并到对应的skb里,如果不存在,返回到dev_gro_receive函数后, 将新的skb插入到napi->gro_list的末尾,作为这条流的首包 */ gro_list_prepare(napi, skb); rcu_read_lock();//开始遍历对应的协议表 list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || !ptype->callbacks.gro_receive) continue; skb_set_network_header(skb, skb_gro_offset(skb)); skb_reset_mac_len(skb); NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; NAPI_GRO_CB(skb)->encap_mark = 0; NAPI_GRO_CB(skb)->recursion_counter = 0; NAPI_GRO_CB(skb)->gro_remcsum_start = 0; /* Setup for GRO checksum validation */ switch (skb->ip_summed) { case CHECKSUM_COMPLETE: NAPI_GRO_CB(skb)->csum = skb->csum; NAPI_GRO_CB(skb)->csum_valid = 1; NAPI_GRO_CB(skb)->csum_cnt = 0; break; case CHECKSUM_UNNECESSARY: NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1; NAPI_GRO_CB(skb)->csum_valid = 0; break; default: NAPI_GRO_CB(skb)->csum_cnt = 0; NAPI_GRO_CB(skb)->csum_valid = 0; } //调用对应的gro接收函数 pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); break;// 比如执行 gro inet_gro_receive } rcu_read_unlock(); if (&ptype->list == head)//如果是没有实现gro的协议则也直接调到normal处理 goto normal; //到达这里,则说明gro_receive已经调用过了,因此进行后续的处理 //得到same_flow same_flow = NAPI_GRO_CB(skb)->same_flow; //看是否有需要free对应的skb ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED; if (pp) {//如果返回值pp部为 非 空,则说明pp需要马上被feed进协议栈 struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; } //如果存在同一条流的, 说明在gro_receive流程里已经将skb合入到gro_list里了,因此这里不需在处理 if (same_flow)//如果same_flow有设置,则说明skb已经被正确的合并,因此直接返回。 goto ok; //这个skb需要直接上送协议栈,不能添加到gro_list if (NAPI_GRO_CB(skb)->flush) goto normal; //gro链表上一共有8条流了,则再添加新的一条流前,把链表里最老的那条流的skb先发送出去 if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) {//超过限制 struct sk_buff *nskb = napi->gro_list; /* locate the end of the list to select the 'oldest' flow */ while (nskb->next) { pp = &nskb->next; nskb = *pp; } *pp = NULL; nskb->next = NULL; napi_gro_complete(nskb); } else { napi->gro_count++; } /*到达这里说明skb对应gro list来说是一个新的skb, 也就是说当前的gro list并不存在可以和skb合并的数据包, 因此此时将这个skb插入到gro_list的头。 */ //走到这里说明,待合入的skb是这条流的首包,因此将其挂到gro_list里, NAPI_GRO_CB(skb)->count = 1; NAPI_GRO_CB(skb)->age = jiffies; NAPI_GRO_CB(skb)->last = skb; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD; pull: /* 有些包的包头会存在skb->frag[0]里面,gro合并时会调用skb_gro_header_slow将包头拉到线性空间中, 那么在非线性skb->frag[0]中的包头部分就应该删掉。 */ grow = skb_gro_offset(skb) - skb_headlen(skb); if (grow > 0) gro_pull_from_frag0(skb, grow); ok: return ret; normal: ret = GRO_NORMAL; goto pull; }
inet_gro_receive 函数是网络层skb聚合处理函数:
/*接下来就是inet_gro_receive,这个函数是ip层的gro receive回调函数,函数很简单, 首先取得ip头,然后判断是否需要从frag复制数据,如果需要则复制数据 */static struct sk_buff **inet_gro_receive(struct sk_buff **head, struct sk_buff *skb) { const struct net_offload *ops; struct sk_buff **pp = NULL; struct sk_buff *p; const struct iphdr *iph; unsigned int hlen; unsigned int off; unsigned int id; int flush = 1; int proto; off = skb_gro_offset(skb);//得到偏移 hlen = off + sizeof(*iph);//得到头的整个长度(mac+ip) iph = skb_gro_header_fast(skb, off);//得到ip头 if (skb_gro_header_hard(skb, hlen)) {//是否需要复制 iph = skb_gro_header_slow(skb, hlen, off); if (unlikely(!iph)) goto out; } proto = iph->protocol; rcu_read_lock(); ops = rcu_dereference(inet_offloads[proto]); if (!ops || !ops->callbacks.gro_receive) goto out_unlock;//如协议是否支持gro_reveive,ip头是否合法等等 //ip头是否合法, iph->version = 4, iph->ipl = 5 if (*(u8 *)iph != 0x45) goto out_unlock; //ip头crc if (unlikely(ip_fast_csum((u8 *)iph, 5))) goto out_unlock; id = ntohl(*(__be32 *)&iph->id); flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF)); id >>= 16; /*然后就是核心的处理部分,它会遍历整个gro_list,然后进行same_flow和是否需要flush的判断。 这里ip层设置same_flow是根据下面的规则的: 1 4层的协议必须相同 2 tos域必须相同 3 源,目的地址必须相同 如果3个条件一个不满足,则会设置same_flow为0。 这里还有一个就是判断是否需要flush 对应的skb到协议栈,这里的判断条件是这样子的。 1 ip包的ttl不一样 2 ip包的id顺序不对 3 如果是切片包 如果上面两个条件某一个满足,则说明skb需要被flush出gro。 */ for (p = *head; p; p = p->next) { struct iphdr *iph2; //如果上一层已经不可能same flow则直接继续下一个 if (!NAPI_GRO_CB(p)->same_flow) continue; iph2 = (struct iphdr *)(p->data + off); /* The above works because, with the exception of the top * (inner most) layer, we only aggregate pkts with the same * hdr length so all the hdrs we'll need to verify will start * at the same offset. *///取出ip头 //开始same flow的判断 if ((iph->protocol ^ iph2->protocol) | ((__force u32)iph->saddr ^ (__force u32)iph2->saddr) | ((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) { NAPI_GRO_CB(p)->same_flow = 0; continue; } //开始flush的判断。这里注意如果不是same_flow的话,就没必要进行flush的判断。 /* All fields must match except length and checksum. */ NAPI_GRO_CB(p)->flush |= (iph->ttl ^ iph2->ttl) | (iph->tos ^ iph2->tos) | ((iph->frag_off ^ iph2->frag_off) & htons(IP_DF)); /* Save the IP ID check to be included later when we get to * the transport layer so only the inner most IP ID is checked. * This is because some GSO/TSO implementations do not * correctly increment the IP ID for the outer hdrs. *///pull ip头进gro,这里更新data_offset NAPI_GRO_CB(p)->flush_id = ((u16)(ntohs(iph2->id) + NAPI_GRO_CB(p)->count) ^ id); NAPI_GRO_CB(p)->flush |= flush; } NAPI_GRO_CB(skb)->flush |= flush; skb_set_network_header(skb, off); /* The above will be needed by the transport layer if there is one * immediately following this IP hdr. */ /* Note : No need to call skb_gro_postpull_rcsum() here, * as we already checked checksum over ipv4 header was 0 */ skb_gro_pull(skb, sizeof(*iph));//设置传输层的头的位置 skb_set_transport_header(skb, skb_gro_offset(skb)); //调用传输层的gso reveive方法。 然后就是tcp层的gro方法, //它的主要实现函数是tcp_gro_receive 他的流程和inet_gro_receiv类似,就是取得tcp的头,然后对gro list进行遍历,最终会调用合并方法。 pp = call_gro_receive(ops->callbacks.gro_receive, head, skb); out_unlock: rcu_read_unlock(); out: NAPI_GRO_CB(skb)->flush |= flush; return pp; }
tcp4_gro_receive/tcp_gro_receive函数 是传输层skb聚合处理函数:其会调用 skb_gro_receive 用于合并同流的skb
2942 int skb_gro_receive(struct sk_buff **head, struct sk_buff *skb) 2943 { 2944 struct sk_buff *p = *head; 2945 struct sk_buff *nskb; 2946 struct skb_shared_info *skbinfo = skb_shinfo(skb); 2947 struct skb_shared_info *pinfo = skb_shinfo(p); 2948 unsigned int headroom; 2949 unsigned int len = skb_gro_len(skb); 2950 unsigned int offset = skb_gro_offset(skb); 2951 unsigned int headlen = skb_headlen(skb); 2952 unsigned int delta_truesize; 2953 2954 if (p->len + len >= 65536) 2955 return -E2BIG; 2956 2957 if (pinfo->frag_list) //frag_list中有skb,证明不支持分散-聚集IO 2958 goto merge; 2959 else if (headlen <= offset) {//有一部分头在page中 2960 skb_frag_t *frag; 2961 skb_frag_t *frag2; 2962 int i = skbinfo->nr_frags; 2963 int nr_frags = pinfo->nr_frags + i; 2964 2965 offset -= headlen; 2966 2967 if (nr_frags > MAX_SKB_FRAGS) 2968 return -E2BIG; 2969 2970 pinfo->nr_frags = nr_frags; 2971 skbinfo->nr_frags = 0; 2972 2973 frag = pinfo->frags + nr_frags; 2974 frag2 = skbinfo->frags + i; 2975 do {//遍历赋值,将skb的frag加到pinfo的frgas后面 2976 *--frag = *--frag2; 2977 } while (--i); 2978 2979 frag->page_offset += offset;//去除剩余的头,只保留数据部分 2980 skb_frag_size_sub(frag, offset); 2981 2982 /* all fragments truesize : remove (head size + sk_buff) */ 2983 delta_truesize = skb->truesize - 2984 SKB_TRUESIZE(skb_end_offset(skb)); 2985 2986 skb->truesize -= skb->data_len; 2987 skb->len -= skb->data_len; 2988 skb->data_len = 0; 2989 2990 NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE; 2991 goto done; 2992 } else if (skb->head_frag) {//支持分散-聚集IO 2993 int nr_frags = pinfo->nr_frags; 2994 skb_frag_t *frag = pinfo->frags + nr_frags; 2995 struct page *page = virt_to_head_page(skb->head); 2996 unsigned int first_size = headlen - offset; 2997 unsigned int first_offset; 2998 2999 if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS) 3000 return -E2BIG; 3001 3002 first_offset = skb->data - 3003 (unsigned char *)page_address(page) + 3004 offset; 3005 3006 pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags; 3007 3008 frag->page.p = page; 3009 frag->page_offset = first_offset; 3010 skb_frag_size_set(frag, first_size); 3011 3012 memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags); 3013 /* We dont need to clear skbinfo->nr_frags here */ 3014 3015 delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff)); 3016 NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD; 3017 goto done; 3018 } else if (skb_gro_len(p) != pinfo->gso_size) 3019 return -E2BIG; 3020 //不支持分散-聚集IO,则网卡不会将数据放在skb的frags数组中 3021 headroom = skb_headroom(p); 3022 nskb = alloc_skb(headroom + skb_gro_offset(p), GFP_ATOMIC);//申请新的skb 3023 if (unlikely(!nskb)) 3024 return -ENOMEM; 3025 3026 __copy_skb_header(nskb, p); 3027 nskb->mac_len = p->mac_len; 3028 3029 skb_reserve(nskb, headroom); 3030 __skb_put(nskb, skb_gro_offset(p)); 3031 3032 skb_set_mac_header(nskb, skb_mac_header(p) - p->data); 3033 skb_set_network_header(nskb, skb_network_offset(p)); 3034 skb_set_transport_header(nskb, skb_transport_offset(p)); 3035 3036 __skb_pull(p, skb_gro_offset(p)); 3037 memcpy(skb_mac_header(nskb), skb_mac_header(p), 3038 p->data - skb_mac_header(p)); 3039 3040 skb_shinfo(nskb)->frag_list = p;//将旧GRO队列头放入frag_list队列中 3041 skb_shinfo(nskb)->gso_size = pinfo->gso_size; 3042 pinfo->gso_size = 0; 3043 skb_header_release(p); 3044 NAPI_GRO_CB(nskb)->last = p; 3045 3046 nskb->data_len += p->len; 3047 nskb->truesize += p->truesize; 3048 nskb->len += p->len; 3049 3050 *head = nskb; 3051 nskb->next = p->next; 3052 p->next = NULL; 3053 3054 p = nskb; 3055 3056 merge: 3057 delta_truesize = skb->truesize; 3058 if (offset > headlen) { 3059 unsigned int eat = offset - headlen; 3060 3061 skbinfo->frags[0].page_offset += eat; 3062 skb_frag_size_sub(&skbinfo->frags[0], eat); 3063 skb->data_len -= eat; 3064 skb->len -= eat; 3065 offset = headlen; 3066 } 3067 3068 __skb_pull(skb, offset); 3069 3070 NAPI_GRO_CB(p)->last->next = skb;//将包放入GRO队列中 3071 NAPI_GRO_CB(p)->last = skb; 3072 skb_header_release(skb); 3073 3074 done: 3075 NAPI_GRO_CB(p)->count++; 3076 p->data_len += len; 3077 p->truesize += delta_truesize; 3078 p->len += len; 3079 3080 NAPI_GRO_CB(skb)->same_flow = 1; //标识当前skb已经找到同流的skb并进行了合并 3081 return 0; 3082 }
当网卡支持分散-聚集IO时,GRO会将多个skb合并到一个skb的frag page数组中,否则会合并到skb的的frag_list中:
即使在上述流程中skb被放入GRO队列中保存而没有被立即送入协议栈,它们也不会在队列中滞留太长时间,因为在收包软中断中会调用napi_gro_flush函数将GRO队列中的包送入协议栈:
/* napi->gro_list contains packets ordered by age. * youngest packets at the head of it. * Complete skbs in reverse order to reduce latencies. */ void napi_gro_flush(struct napi_struct *napi, bool flush_old) { struct sk_buff *skb, *prev = NULL; /* scan list and build reverse chain */ for (skb = napi->gro_list; skb != NULL; skb = skb->next) { skb->prev = prev; prev = skb;//按照从老到新的顺序构建链表 反转链表 } for (skb = prev; skb; skb = prev) { skb->next = NULL; if (flush_old && NAPI_GRO_CB(skb)->age == jiffies) return; prev = skb->prev; napi_gro_complete(skb); napi->gro_count--; } napi->gro_list = NULL; }
包加入GRO队列的时间比当前仅晚1个jiffies也会被视作旧包并交付协议栈处理,可见如果软中断每个jiffies都调用一次napi_gro_flush函数的话,开启GRO功能最多增加1个jiffies(1ms或10ms)的延迟 .
GRO的基本原理是将MAC层、IP层和TCP层都能合并的包的头只留一个,数据部分在frag数组或frag_list中存储,这样大大提高了包携带数据的效率。
在完成GRO处理后,skb会被交付到Linux网络协议栈入口进行协议处理。聚合后的skb在被送入到网络协议栈后,在网络层协议、TCP协议处理函数中会调用pskb_may_pull函数将GRO skb的数据整合到线性空间:
pskb_may_pull的整合保证了TCP首部数据全部被放入线性空间,从而使GRO不影响TCP协议的处理
int tcp_v4_rcv(struct sk_buff *skb) { const struct iphdr *iph; const struct tcphdr *th; struct sock *sk; int ret; struct net *net = dev_net(skb->dev); ------------------------------------------------------------ if (!pskb_may_pull(skb, th->doff * 4)) goto discard_it; --------------------------------------- }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/** * __pskb_pull_tail - advance tail of skb header * @skb: buffer to reallocate * @delta: number of bytes to advance tail * * The function makes a sense only on a fragmented &sk_buff, * it expands header moving its tail forward and copying necessary * data from fragmented part. * * &sk_buff MUST have reference count of 1. * * Returns %NULL (and &sk_buff does not change) if pull failed * or value of new tail of skb in the case of success. * * All the pointers pointing into skb header may change and must be * reloaded after call to this function. */ /* Moves tail of skb head forward, copying data from fragmented part, * when it is necessary. * 1. It may fail due to malloc failure. * 2. It may change skb pointers. * * It is pretty complicated. Luckily, it is called only in exceptional cases. */ unsigned char *__pskb_pull_tail(struct sk_buff *skb, int delta) { /* If skb has not enough free space at tail, get new one * plus 128 bytes for future expansions. If we have enough * room at tail, reallocate without expansion only if skb is cloned. */ int i, k, eat = (skb->tail + delta) - skb->end; // eat 大于0 表示 tail如果移动len-head_len 就会超出 end区域 也就是为去除当前skb可用内存,还需要多少内存 // 如果skb 是已经被cloned multiple shared copies if (eat > 0 || skb_cloned(skb)) { if (pskb_expand_head(skb, 0, eat > 0 ? eat + 128 : 0, GFP_ATOMIC)) return NULL; } //当前skb可用内存 足够pull // head---data----tail----end----frag /* delta ==== pull len headlen== len--data_len === linerdata */ //end---tai > delta 可以直接copy 到 线性区内存块----从skb的offset(skb->tail),拷贝delta个字节到skb->tail之后 if (skb_copy_bits(skb, skb_headlen(skb), skb_tail_pointer(skb), delta)) BUG(); /* Optimization: no fragments, no reasons to preestimate * size of pulled pages. Superb. *///没有分段 if (!skb_has_frag_list(skb)) goto pull_pages; //由于数据已经拷贝到了skb->data中,因此需要释放frags,frag_list中被拷贝过的数据 //计算从frags数组中拷贝的数据量 /* Estimate size of pulled pages. */ eat = delta; //寻找到满足eat这么多数据量的最后一个page for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { int size = skb_frag_size(&skb_shinfo(skb)->frags[i]); if (size >= eat) goto pull_pages; eat -= size; } /* If we need update frag list, we are in troubles. * Certainly, it possible to add an offset to skb data, * but taking into account that pulling is expected to * be very rare operation, it is worth to fight against * further bloating skb head and crucify ourselves here instead. * Pure masohism, indeed. 8)8) *///eat仍不为0,说明从frag_list中进行了拷贝,释放frag_list if (eat) {skb_shared_info struct sk_buff *list = skb_shinfo(skb)->frag_list; struct sk_buff *clone = NULL; struct sk_buff *insp = NULL; do { BUG_ON(!list); if (list->len <= eat) { /* Eaten as whole. */ eat -= list->len; list = list->next; insp = list; } else { /* Eaten partially. */ if (skb_shared(list)) { /* Sucks! We need to fork list. :-( */ clone = skb_clone(list, GFP_ATOMIC); if (!clone) return NULL; insp = list->next; list = clone; } else { /* This may be pulled without * problems. */ insp = list; } if (!pskb_pull(list, eat)) { kfree_skb(clone); return NULL; } break; } } while (eat); //list指向frag_list头 //直到list遍历到数据量足够的最后一个skb /* Free pulled out fragments. */ while ((list = skb_shinfo(skb)->frag_list) != insp) { skb_shinfo(skb)->frag_list = list->next; kfree_skb(list); } /* And insert new clone at head. */ if (clone) { clone->next = list; skb_shinfo(skb)->frag_list = clone; } } /* Success! Now we may commit changes to skb data. */ pull_pages: eat = delta; k = 0;//释放frags中的page for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { int size = skb_frag_size(&skb_shinfo(skb)->frags[i]); if (size <= eat) { skb_frag_unref(skb, i); eat -= size; } else { skb_shinfo(skb)->frags[k] = skb_shinfo(skb)->frags[i]; if (eat) { skb_shinfo(skb)->frags[k].page_offset += eat; skb_frag_size_sub(&skb_shinfo(skb)->frags[k], eat); eat = 0; } k++; } } skb_shinfo(skb)->nr_frags = k; skb->tail += delta; skb->data_len -= delta; return skb_tail_pointer(skb); }