本文主要内容:网络数据包接收的上半部实现,主要分析内核接口相关部分。
内核版本:2.6.37
Author:zhangskd @ csdn blog
上半部的实现
接收数据包的上半部处理流程为:
el_interrupt() // 网卡驱动
|--> el_receive() // 网卡驱动
|--> netif_rx() // 内核接口
|--> enqueue_to_backlog() // 内核接口
我们已经分析了网卡驱动相关部分,现在来看下内核接口相关部分:)
netif_rx
netif_rx()是内核接收网络数据包的入口(目前多数网卡支持新的接口NAPI,后续文章会分析)。
netif_rx()主要调用enqueue_to_backlog()进行后续处理。
/** * netif_rx - post buffer to the network code * @skb: buffer to post * This function receives a packet from a device and queues it for the upper (protocol) * levels to process. It always succeeds. The buffer may be dropped during processing * for congestion control or by the protocol layers. * return values: * NET_RX_SUCCESS (no congestion) * NET_RX_DROP (packet was dropped) */ int netif_rx(struct sk_buff *skb) { int ret; /* if netpoll wants it, pretend we never saw it */ if (netpoll_rx(skb)) return NET_RX_DROP; /* 记录接收时间到skb->tstamp */ if (netdev_tstamp_prequeue) net_timestamp_check(skb); trace_netif_rx(skb); #ifdef CONFIG_RPS /* 暂不考虑RPS,后续再分析 */ ... #else { unsigned int qtail; ret = enqueue_to_backlog(skb, get_cpu(), &qtail); put_cpu(); } #endif return ret; }
softnet_data
每个cpu都有一个softnet_data实例,用于收发数据包。
/* Incoming packets are placed on per-cpu queues */ struct softnet_data { struct Qdisc *output_queue; /* 输出包队列 */ struct Qdisc **output_queue_tailp; /* 其中设备是处于轮询状态的,即入口队列有新的帧等待处理 */ struct list_head poll_list; struct sk_buff *completion_queue; /* 成功传输的数据包队列 */ /* 处理队列,把input_pkt_queue接入 */ struct sk_buff_head process_queue; /* stats */ unsigned int processed; /* 处理过的数据包个数 */ unsigned int time_squeeze; /* poll受限于允许的时间或数据包个数 */ unsigned int cpu_collision; unsigned int received_rps; #ifdef CONFIG_RPS /* 暂不研究RPS */ ... #endif unsigned dropped; /* 因输入队列满而丢包的个数 */ /* 输入队列,保存接收到的数据包。 * 非NAPI使用,支持NAPI的网卡驱动有自己的私有队列。 */ struct sk_buff_head input_pkt_queue; struct napi_struct backlog; /* 虚拟设备,非NAPI设备共用 */ };
定义
/* Device drivers call our routines to queue packets here. * We empty the queue in the local softnet handler. */ DEFINE_PER_CPU_ALIGNED(struct softnet_data, softnet_data); EXPORT_PER_CPU_SYMBOL(softnet_data);
初始化
/* Initialize the DEV module. At boot time this walks the device list and * unhooks any devices that fail to initialise (normally hardware not present) * and leaves us with a valid list of present and active devices. * * This is called single threaded during boot, so no need to take the rtnl semaphore. */ static int __init net_dev_init(void) { ... /* Initialise the packet receive queues. * 初始化每个cpu的softnet_data实例。 */ for_each_possible_cpu(i) { struct softnet_data *sd = &per_cpu(softnet_data, i); memset(sd, 0, sizeof(*sd)); skb_queue_head_init(&sd->input_pkt_queue); skb_queue_head_init(&sd->process_queue); sd->completion_queue = NULL; INIT_LIST_HEAD(&sd->poll_list); sd->output_queue = NULL; sd->output_queue_tailp = &sd->output_queue; #ifdef CONFIG_RPS ... #endif sd->backlog.poll = process_backlog; /* 非NAPI的默认轮询函数 */ sd->backlog.weight = weight_p; /* 64,每次轮询处理数据包个数上限 */ sd->backlog.gro_list = NULL; sd->backlog.gro_count = 0; } ... /* 注册软中断处理函数 */ open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); ... }
enqueue_to_backlog
netif_rx()调用enqueue_to_backlog()来处理。
首先获取当前cpu的softnet_data实例sd,然后:
1. 如果接收队列sd->input_pkt_queue不为空,说明已经有软中断在处理数据包了,
则不需要再次触发软中断,直接将数据包添加到接收队列尾部即可。
2. 如果接收队列sd->input_pkt_queue为空,说明当前没有软中断在处理数据包,
则把虚拟设备backlog添加到sd->poll_list中以便进行轮询,最后设置NET_RX_SOFTIRQ
标志触发软中断。
3. 如果接收队列sd->input_pkt_queue满了,则直接丢弃数据包。
/* queue an skb to a per CPU backlog queue (may be a remote CPU queue). */ static int enqueue_to_backlog(struct sk_buff *skb, int cpu, unsigned int *qtail) { struct softnet_data *sd; unsigned long flags; sd = &per_cpu(softnet_data, cpu); /* 获取当前cpu上的softnet_data实例 */ local_irq_save(flags); /* 禁止本地中断 */ rps_lock(sd); if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) { /* 如果接收队列不为空,则说明已经有软中断在处理数据包了, * 则不需要再次触发软中断,直接将数据包添加到接收队列尾部即可。 */ if (skb_queue_len(&sd->input_pkt_queue)) { enqueue: __skb_queue_tail(&sd->input_pkt_queue, skb); /* 添加到接收队列尾部 */ input_queue_tail_incr_save(sd, qtail); rps_unlock(sd); local_irq_restore(flags); /* 恢复本地中断 */ return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device. * 如果接收队列为空,说明当前没有软中断在处理数据包, * 把虚拟设备backlog添加到sd->poll_list中以便进行轮询, * 最后设置NET_RX_SOFTIRQ标志触发软中断。 */ if (! __test_and_set_bit(NAPT_STATE_SCHED, &sd->backlog.state)) { if (! rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } goto enqueue; } sd->dropped++; /* 如果接收队列满了就直接丢弃 */ rps_unlock(sd); local_irq_restore(flags); /* 恢复本地中断 */ atomic_long_inc(&skb->dev->rx_dropped); kfree_skb(skb); /* 释放数据包 */ return NET_RX_DROP; } int netdev_tstamp_prequeue = 1; /* 记录接收时间 */ int netdev_max_backlog = 1000; /* 接收队列的最大长度 */
napi_struct代表一个虚拟设备,用于兼容非NAPI的驱动。
/* Structure for NAPI scheduling similar to tasklet but with weighting */ struct napi_struct { /* The poll_list must only be managed by the entity which changes the * state of the NAPI_STATE_SCHED bit. This means whoever atomically * sets that bit can add this napi_struct to the per-cpu poll_list, and * whoever clears that bit can remove from the list right before clearing the bit. */ struct list_head poll_list; /* 用于加入处于轮询状态的设备队列 */ unsigned long state; /* 虚拟设备的状态 */ int weight; /* 每次处理的最大数量,非NAPI为weight_p,默认为64 */ int (*poll) (struct napi_struct *, int); /* 此设备的轮询方法,默认为process_backlog() */ #ifdef CONFIG_NETPOLL ... #endif unsigned int gro_count; struct net_device *dev; struct list_head dev_list; struct sk_buff *gro_list; struct sk_buff *skb; };
static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) { /* 把napi_struct添加到softnet_data的poll_list中 */ list_add_tail(&napi->poll_list, &sd->poll_list); __raise_softirq_irqoff(NET_RX_SOFTIRQ); /* 设置软中断标志位 */ }