五、队列层
1、软中断与下半部
当用中断处理的时候,为了减少中断处理的工作量,比如,一般中断处理时,需要屏蔽其它中断,如果中断处理时间过长,那么其它中断
有可能得不到及时处理,也以,有一种机制,就是把“不必马上处理”的工作,推迟一点,让它在中断处理后的某一个时刻得到处理。这就
是下半部。
下半部只是一个机制,它在Linux中,有多种实现方式,其中一种对时间要求最严格的实现方式,叫“软中断”,可以使用:
open_softirq()
来向内核注册一个软中断,
然后,在合适的时候,调用
raise_softirq_irqoff()
触发它。
如果采用中断方式接收数据(这一节就是在说中断方式接收,后面,就不用这种假设了),同样也需要软中断,可以调用
open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);
向内核注册一个名为NET_RX_SOFTIR的软中断,net_rx_action是软中断的处理函数。
然后,在驱动中断处理完后的某一个时刻,调用
raise_softirq_irqoff(NET_RX_SOFTIRQ);
触发它,这样net_rx_action将得到执行。
2、队列层
什么是队列层?通常,在网卡收发数据的时候,需要维护一个缓冲区队列,来缓存可能存在的突发数据,类似于前面的DMA环形缓冲区。
队列层中,包含了一个叫做struct softnet_#:
[cpp] view plain copy
1. struct softnet_#
2. {
3.
4. int throttle;
5.
6. int cng_level;
7. int avg_blog;
8.
9. struct sk_buff_head input_pkt_queue;
10.
11. struct list_head poll_list;
12. struct net_device *output_queue;
13. struct sk_buff *completion_queue;
14.
15. struct net_device backlog_dev;
16.};
内核使用了一个同名的变量softnet_#,它是一个Per-CPU变量,每个CPU都有一个。
net/core/dev.c
CODE:
DECLARE_PER_CPU(struct softnet_#,softnet_#);
[cpp] view plain copy
1. static int __init net_dev_init(void)
2. {
3. int i, rc = -ENOMEM;
4.
5. BUG_ON(!dev_boot_phase);
6.
7. net_random_init();
8.
9. if (dev_proc_init())
10. goto out;
11.
12. if (netdev_sysfs_init())
13. goto out;
14.
15.
16. INIT_LIST_HEAD(&ptype_all);
17. for (i = 0; i < 16; i++)
18. INIT_LIST_HEAD(&ptype_base[i]);
19.
20. for (i = 0; i < ARRAY_SIZE(dev_name_head); i++)
21. INIT_HLIST_HEAD(&dev_name_head[i]);
22.
23. for (i = 0; i < ARRAY_SIZE(dev_index_head); i++)
24. INIT_HLIST_HEAD(&dev_index_head[i]);
25.
26.
27.
28.
29. for (i = 0; i < NR_CPUS; i++) {
30. struct softnet_# *queue;
31.
32.
33. queue = &per_cpu(softnet_#, i);
34.
35. skb_queue_head_init(&queue->input_pkt_queue);
36. queue->throttle = 0;
37. queue->cng_level = 0;
38. queue->avg_blog = 10;
39. queue->completion_queue = NULL;
40. INIT_LIST_HEAD(&queue->poll_list);
41. set_bit(__LINK_STATE_START, &queue->backlog_dev.state);
42. queue->backlog_dev.weight = weight_p;
43.
44. queue->backlog_dev.poll = process_backlog;
45. atomic_set(&queue->backlog_dev.refcnt, 1);
46. }
47.
48.#ifdef OFFLINE_SAMPLE
49. samp_timer.expires = jiffies + (10 * HZ);
50. add_timer(&samp_timer);
51.#endif
52.
53. dev_boot_phase = 0;
54.
55.
56. open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);
57. open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);
58.
59. hotcpu_notifier(dev_cpu_callback, 0);
60. dst_init();
61. dev_mcast_init();
62. rc = 0;
63.out:
64. return rc;
65.}
这样,初始化完成后,在驱动程序中,在中断处理函数中,会调用netif_rx将数据交上来,这与采用轮询技术,有本质的不同:
[cpp] view plain copy
1. int netif_rx(struct sk_buff *skb)
2. {
3. int this_cpu;
4. struct softnet_# *queue;
5. unsigned long flags;
6.
7.
8. if (netpoll_rx(skb))
9. return NET_RX_DROP;
10.
11.
12. if (!skb->stamp.tv_sec)
13. net_timestamp(&skb->stamp);
14.
15.
16. local_irq_save(flags);
17.
18. this_cpu = smp_processor_id();
19. queue = &__get_cpu_var(softnet_#);
20.
21.
22. __get_cpu_var(netdev_rx_stat).total++;
23.
24.
25. if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
26. if (queue->input_pkt_queue.qlen) {
27. if (queue->throttle)
28. goto drop;
29.
30.
31.enqueue:
32. dev_hold(skb->dev);
33. __skb_queue_tail(&queue->input_pkt_queue, skb);
34.#ifndef OFFLINE_SAMPLE
35. get_sample_stats(this_cpu);
36.#endif
37. local_irq_restore(flags);
38. return queue->cng_level;
39. }
40.
41.
42. if (queue->throttle)
43. queue->throttle = 0;
44.
45.
46. netif_rx_schedule(&queue->backlog_dev);
47. goto enqueue;
48. }
49.
50.
51. if (!queue->throttle) {
52. queue->throttle = 1;
53. __get_cpu_var(netdev_rx_stat).throttled++;
54. }
55.
56.
57.drop:
58. __get_cpu_var(netdev_rx_stat).dropped++;
59. local_irq_restore(flags);
60.
61. kfree_skb(skb);
62. return NET_RX_DROP;
63.}
从这段代码的分析中,我们可以看到,当数据被接收后,netif_rx的工作,就是取得当前CPU的队列,然后入队,然后返回,然后中断函数
现调用它,它再把数据包入队……
当队列接收完成后,netif_rx就调用netif_rx_schedule进一步处理数据包,我们注意到:
1、前面讨论过,采用轮询技术时,同样地,也是调用netif_rx_schedule,把设备自己传递了过去;
2、这里,采用中断方式,传递的是队列中的一个“伪设备”,并且,这个伪设备的poll函数指针,指向了一个叫做process_backlog的函数;
netif_rx_schedule函数完成两件重要的工作:
1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;
2、触发软中断函数,进行数据包接收处理;
这样,我们可以猜想,在软中断函数中,不论是伪设备bakclog_dev,还是真实的设备(如前面讨论过的e100),都会被软中断函数以:
dev->poll()
的形式调用,对于e100来说,poll函数的接收过程已经分析了,而对于其它所有没有采用轮询技术的网络设备来说,它们将统统调用
process_backlog函数(我觉得把它改名为pseudo-poll是否更合适一些^o^)。
OK,我想分析到这里,关于中断处理与轮询技术的差异,已经基本分析开了……
继续来看,netif_rx_schedule进一步调用__netif_rx_schedule:
[cpp] view plain copy
1. static inline void netif_rx_schedule(struct net_device *dev)
2. {
3. if (netif_rx_schedule_prep(dev))
4. __netif_rx_schedule(dev);
5. }
[cpp] view plain copy
1. static inline void __netif_rx_schedule(struct net_device *dev)
2. {
3. unsigned long flags;
4.
5. local_irq_save(flags);
6. dev_hold(dev);
7.
8. list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_#).poll_list);
9. if (dev->quota < 0)
10. dev->quota += dev->weight;
11. else
12. dev->quota = dev->weight;
13.
14. __raise_softirq_irqoff(NET_RX_SOFTIRQ);
15. local_irq_restore(flags);
16.}
软中断被触发,注册的net_rx_action函数将被调用
[cpp] view plain copy
1. static void net_rx_action(struct softirq_action *h)
2. {
3. struct softnet_# *queue = &__get_cpu_var(softnet_#);
4. unsigned long start_time = jiffies;
5. int budget = netdev_max_backlog;
6.
7.
8. local_irq_disable();
9.
10.
11. while (!list_empty(&queue->poll_list)) {
12. struct net_device *dev;
13.
14. if (budget <= 0 || jiffies - start_time > 1)
15. goto softnet_break;
16.
17. local_irq_enable();
18.
19.
20. dev = list_entry(queue->poll_list.next,
21. struct net_device, poll_list);
22. netpoll_poll_lock(dev);
23.
24.
25. if (dev->quota <= 0 || dev->poll(dev, &budget)) {
26. netpoll_poll_unlock(dev);
27.
28.
29. local_irq_disable();
30. list_del(&dev->poll_list);
31. list_add_tail(&dev->poll_list, &queue->poll_list);
32. if (dev->quota < 0)
33. dev->quota += dev->weight;
34. else
35. dev->quota = dev->weight;
36. } else {
37. netpoll_poll_unlock(dev);
38. dev_put(dev);
39. local_irq_disable();
40. }
41. }
42.out:
43. local_irq_enable();
44. return;
45.
46.softnet_break:
47. __get_cpu_var(netdev_rx_stat).time_squeeze++;
48. __raise_softirq_irqoff(NET_RX_SOFTIRQ);
49. goto out;
50.}
对于dev->poll(dev,&budget)的调用,一个真实的poll函数的例子,我们已经分析过了,现在来看process_backlog
[cpp] view plain copy
1. static int process_backlog(struct net_device *backlog_dev, int *budget)
2. {
3. int work = 0;
4. int quota = min(backlog_dev->quota, *budget);
5. struct softnet_# *queue = &__get_cpu_var(softnet_#);
6. unsigned long start_time = jiffies;
7.
8. backlog_dev->weight = weight_p;
9.
10.
11. for (;;) {
12. struct sk_buff *skb;
13. struct net_device *dev;
14.
15. local_irq_disable();
16. skb = __skb_dequeue(&queue->input_pkt_queue);
17. if (!skb)
18. goto job_done;
19. local_irq_enable();
20.
21. dev = skb->dev;
22.
23. netif_receive_skb(skb);
24.
25. dev_put(dev);
26.
27. work++;
28.
29. if (work >= quota || jiffies - start_time > 1)
30. break;
31.
32. }
33.
34. backlog_dev->quota -= work;
35. *budget -= work;
36. return -1;
37.
38.
39.job_done:
40. backlog_dev->quota -= work;
41. *budget -= work;
42.
43. list_del(&backlog_dev->poll_list);
44. smp_mb__before_clear_bit();
45. netif_poll_enable(backlog_dev);
46.
47. if (queue->throttle)
48. queue->throttle = 0;
49. local_irq_enable();
50. return 0;
51.}
这个函数重要的工作,就是出队,然后调用netif_receive_skb()将数据包交给上层,这与上一节讨论的poll是一样的。这也是为什么,
在网卡驱动的编写中,采用中断技术,要调用netif_rx,而采用轮询技术,要调用netif_receive_skb啦!
到了这里,就处理完数据包与设备相关的部分了,数据包将进入上层协议栈……