Linux驱动:使用workqueue、tasklet处理中断
背景
中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。
因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。
例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。
因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理。
首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。
下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。
但是,内核到底什时候执行下半部,以何种方式组织下半部?这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottomhalf(简称bh),在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。
对于在上半部和下半部之间划分工作,尽管不存在某种严格的规则,但还是有一些提示可供借鉴:
- 如果一个任务对时问非常敏感,将其放在中断处理程序中执行。
- 如果一个任务和硬件相关,将其放在中断处理程序中执行。
- 如果一个任务要保证不被其他中断(特别是相同的中断)打断,将其放在中断处理程执行。
- 其他所有任务,考虑放置在下半部执行。
下面主要介绍常用的小任务(Tasklet
)机制及2.6内核中的工作队列(workqueue
)机制。
在2.6版本的内核中,内核提供了三种不同形式的下半部实现机制:软中断、tasklets和workqueue。
参考:
- https://blog.csdn.net/qq_31505483/article/details/78534609
- http://seen.blog.chinaunix.net/uid-25695950-id-4471821.html
- http://www.ibm.com/developerworks/cn/linux/l-cn-cncrrc-mngd-wkq
workqueue与tasklet的区别
从表面上看,workqueue
类似于tasklet
;允许内核在将来的某个时间调用一个函数。但是,两者还是存在着显著的差异,两个机制有各自适合的情形:
1、运行的环境不同:
tasklet
运行在软中断上下文中,所以其代码必须是原子操作。相反,workqueue运行在内核进程上下文中;结果就是workqueue具有更大的灵活性。特别是,workqueue可以休眠。tasklet
始终运行在最初提交它们的处理器上。默认情况下,workqueue以相同方式工作。
2、调度策略不同:可以将workqueue中的函数延迟一段时间后再执行(适用于长周期且不需要是原子的处理)
所以,workqueue
和tasklet
最大的不同就是tasklet
执行的更快,因为其是原子的;但因为workqueue不必是原子的,所以workqueue具有更高的延迟。
前面比较了workqueue与其他基于中断上下文的延迟机制之间的优势,但workqueue并非没有缺点。首先是公共的共享workqueue不能提供更多的好处,因为如果其中的任一工作项阻塞,则其他工作项将不能被执行,因此在实际的使用中,使用者多会自己创建workqueue,而这又导致下面的一些问题:
-
MT的workqueue导致了内核的线程数增加得非常的快,这样带来一些问题:一个是占用了 pid 数目,这对于服务器可不是一个好消息,因为 pid实际上是一种全局资源;而大量的工作线程对于资源的竞争也导致了无效的调度,而这些调度其实是不需要的,对调度器也带来了压力。
-
现有的workqueue机制某些情况下有导致死锁的倾向,特别是在两个工作项之间存在依赖时。如果你曾经调试过这种偶尔出现的死锁,会知道这种问题让人非常的沮丧。
关于MT的部分详见
并发可管理workqueue
。
什么情况下使用workqueue
,什么情况下使用tasklet
?
- 如果推后执行的任务需要睡眠(调用引起阻塞的函数),那么就选择
workqueue
。 - 如果推后执行的任务不需要睡眠(调用不涉及引起阻塞的函数),那么就选择
tasklet
。 - 另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用
workqueue
。
tasklet
在内核中的中断机制中,为了防止解决中断嵌套(防止一个中断打断另一个中断)的问题,引进小任务(tasklet)机制,在中断处理中tasklet机制被广泛应用,对于中断处理的实时性响应特别有帮助。
思想:硬件中断必须尽快处理, 但大部分的数据管理可以延后到以后安全的时间执行
tasklet用于减少硬中断处理的时间,将本来是在硬中断服务程序中完成的任务转化成软中断完成,即是:将一些非紧急的任务留到tasklet中完成,而紧急的任务则在硬中断服务程序中完成。
使用步骤:
- 1、定义一个tasklet的执行任务;
- 2、初始化taskelet,将处理任务的函数和takslet任务通过
DECLARE_TASKLET
捆绑; - 3、调度tasklet :
tasklet_schedule(&tasklet);
特点:
- tasklet不能引起休眠,同一个tasklet不能在两个CPU上同时运行。
- 但是不同tasklet可能在不同CPU上同时运行,则需要注意共享数据的保护。
声明、初始化
Tasklet的使用比较简单,关键在于定义tasklet及其处理函数并将两者关联。
#include <linux/interrupt.h>
#if 0 // 一个宏完成 关联
DECLARE_TASKLET(tasklet_name, tasklet_func, func_data);
#else //相当于:
struct t asklet_struct tasklet_name;
tasklet_init(&tasklet_name, tasklet_func, func_data);
#endif
// 结构体原型
struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
#define DECLARE_TASKLET(name, func, data)
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data);
描述:将函数和一个tasklet绑定在一起。
参数解析:
- tasklet_name : tasklet的名字
- tasklet_func : 执行的tasklet的函数名
- func_data : 执行tasklet所附带的参数,
unsigned long
,可以像ioctl一样传递指针值进去。
执行
#include <linux/interrupt.h>
// 执行tasklet
tasklet_schedule(struct tasklet_struct* t);
描述:调用后,让系统在适当的时候进行调度之前绑定好的tasklet。
参数解析:t: 某一个 tasklet 。
接口
tasklet 以一个数据结构形式存在,使用前必须被初始化。初始化能够通过调用一个特定函数或者通过使用某些宏定义声明结构:
void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data);
#define DECLARE_TASKLET(name, func, data)
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
#define DECLARE_TASKLET_DISABLED(name, func, data)
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
void tasklet_disable(struct tasklet_struct *t);
/*函数暂时禁止给定的tasklet被tasklet_schedule调度,直到这个tasklet再次被enable;若这个tasklet当前在运行, 这个函数忙等待直到这个tasklet退出*/
void tasklet_disable_nosync(struct tasklet_struct *t);
/*和tasklet_disable类似,但是tasklet可能仍然运行在另一个 CPU */
void tasklet_enable(struct tasklet_struct *t);
/*使能一个之前被disable的tasklet;若这个tasklet已经被调度, 它会很快运行。tasklet_enable和tasklet_disable 必须匹配调用, 因为内核跟踪每个tasklet的"禁止次数"*/
void tasklet_schedule(struct tasklet_struct *t);
/*调度 tasklet 执行,如果tasklet在运行中被调度, 它在完成后会再次运行; 这保证了在其他事件被处理当中发生的事件受到应有的注意. 这个做法也允许一个 tasklet 重新调度它自己*/
void tasklet_hi_schedule(struct tasklet_struct *t);
/*和tasklet_schedule类似,只是在更高优先级执行。当软中断处理运行时, 它处理高优先级 tasklet 在其他软中断之前,只有具有低响应周期要求的驱动才应使用这个函数, 可避免其他软件中断处理引入的附加周期*/
void tasklet_kill(struct tasklet_struct *t);
/*确保了 tasklet 不会被再次调度来运行,通常当一个设备正被关闭或者模块卸载时被调用。如果 tasklet 正在运行, 这个函数等待直到它执行完毕。若 tasklet 重新调度它自己,则必须阻止在调用 tasklet_kill 前它重新调度它自己,如同使用 del_timer_sync*/
例子
// 有关函数声明
void xxx_do_tasklet(unsigned long);
irqreturn_t xxx_interrupt(int irq,void *dev_id,struct pt_regs *regs);
// 声明一个小任务,并绑定某个函数
DECLARE_TASKLET(xxx_tasklet, xxx_do_tasklet, 0);
// 构造这个模块
int _init xxx_init(void)
{
// …… ;
// 注册中断
result = request_irq(xxx_irq,xxx_interrupt,SA_INTERRUPT,”xxx”,NULL);
// …… ;
}
// 析构这个模块
void _exit xxx_exit(void)
{
// …… ;
// 释放中断
free_irq(xxx_irq,xxx_irq_interrupt);
// 取消某个小任务
tasklet_kill(&xxx_tasklet);
// …… ;
}
// 写好中断处理函数
irqreturn_t xxx_interrupt(int irq,void *dev_id,struct pt_regs *regs)
{
// …… ;
// 在某个中断中调度小任务
tasklet_schedule(&xxx_tasklet);
// …… ;
}
// 执行tasklet
void xxx_do_tasklet(unsigned long)
{
// …… ;
}
workqueue
工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。
workqueue其实相当于内核线程(因此可以睡眠,可以等待),创建一个workqueue后,如果有新的event发生(常常是中断),可以将你的work提交到workqueue里后,内核会在合适的时间点去调用workqueue
来执行提交的work。
一个CPU上的所有worker线程共同构成了一个worker pool(此概念由内核v3.8引入)。
我们可能比较熟悉memory pool,当需要内存时,就从空余的memory pool中去获取。
同样地,当workqueue上有work item待处理时,我们就从worker pool里挑选一个空闲的worker线程来服务这个work item。
也就是说,这个下半部分可以在进程上下文中执行。这样,通过workqueue执行的代码能占尽进程上下文的所有优势。最重要的就是workqueue允许被重新调度甚至是睡眠。
它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。
如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct
,这些工作以队列结构组织成workqueue(workqueue),其数据结构为workqueue_struct
,而工作线程就是负责执行workqueue中的工作。
在讨论之前,先定义几个内核中使用workqueue时用到的术语方便后面描述。
- work queues:所有工作项被 ( 需要被执行的工作 ) 排列于该队列,因此称作workqueue (workqueues) 。
- worker thread:工作者线程 (worker thread) 是一个用于执行workqueue中各个工作项的内核线程,当workqueue中没有工作项时,该线程将变为 idle 状态。系统默认的工作者线程为events,也可以创建自己的工作者线程(每一条
workqueue
对应一条worker thread
)。
workqueue之所以成为使用最多的延迟执行机制,得益于它的实现中的一些有意思的地方:
- 使用的接口简单明了
- 执行在进程上下文中,这样使得它可以睡眠,被调度及被抢占
对于使用者,基本上只需要做 3 件事情,依次为:
- 创建workqueue(workqueue) ( 如果使用内核默认的workqueue,连这一步都可以省略掉 )
- 创建工作项(work item)
- 向workqueue中提交工作项
由于workqueue的实现中,已有默认的共享workqueue,因此在选择接口时,就出现了2种选择:要么使用内核已经提供的共享workqueue;要么自己创建workqueue。
使用步骤:
0、创建workqueue(可省略):create_workqueue
1、定义一个workqueue:struct work_struct my_wq;
2、声明、实现处理任务:void my_wq_func(void* data);
3、通过INIT_WORK初始化这个workqueue并将workqueue与处理函数绑定:INIT_WORK(&my_wq, my_wq_func)
4、提交工作、适时调度workqueue:schedule_work(&my_wq)
使用workqueue
/* 原型*/
struct work_struct {
atomic_long_t data; /*工作处理函数func的参数*/
struct list_head entry; /*连接工作的链表结点*/
work_func_t func; /*工作处理函数*/
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
/*
"data"表示的意义就比较丰富了。这种在一个C语言变量里塞入不同的类型的数据的方法在Linux的代码实现中还是不难见到的:
- 最后的4个bits是作为"flags"标志位使用的;
- 中间的4个bits是用于flush功能的"color",flush功能简单地说就是:等待workqueue队列上的任务都处理完,并清空workqueue队列
- 剩下的bits在不同的场景下有不同的含义(相当于C语言里的"union"),它可以指向work item所在的workqueue队列的地址,由于低8位被挪作他用,因此要求workqueue队列的地址是按照256字节对齐的。它还可以表示处理work item的worker线程所在的pool的ID(关于pool将在本文的后半部分介绍)。
*/
};
创建workqueue(可选)
如果因为某些原因,如需要执行的是个阻塞性质的任务而不愿或不能使用内核提供的共享workqueue,这时需要自己创建workqueue。
创建workqueue时,有 2 个选择,可选择系统范围内的 ST,也可选择每 CPU 一个内核线程的 MT。
- single threaded(ST)::工作者线程的表现形式之一,在系统范围内,只有一个工作者线程为workqueue服务
- multi threaded(MT):工作者线程的表现形式之一,在多 CPU 系统上每个 CPU 上都有一个工作者线程为workqueue服务
#include <linux/workqueue.h>
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
struct list_head list;
const char *name; /*workqueue name*/
int singlethread; /*是不是单线程 - 单线程我们首选第一个CPU -0表示采用默认的工作者线程event*/
int freezeable; /* Freeze threads during suspend */
int rt;
};
struct workqueue_struct *create_workqueue(const char *name);
// MT形式,用于创建一个workqueue队列,为系统中的每个CPU都创建一个内核线程。
struct workqueue_struct *create_singlethread_workqueue(const char *name);
// 用于创建workqueue,只创建一个内核线程。
//--------------
void destroy_workqueue(struct workqueue_struct *queue);
// 释放workqueue队列。
描述:创建workqueue线程,可用来指定work执行时的内核线程。
相对于create_singlethread_workqueue,create_workqueue 同样会分配一个 wq的workqueue。
不同之处在于,对于多 CPU 系统而言,
create_workqueue
对每一个 active 的 CPU,都会为之创建一个 per-CPU 的 cwq结构,对应每一个 cwq,都会生成一个新的 worker_thread。
返回值:workqueue线程对象。
声明、初始化一个工作项目
#if 0
/* 静态创建工作项 */
typedef void (*work_func_t)(struct work_struct *work);
DECLARE_WORK(work, func);
DECLARE_DELAYED_WORK(work, func);
#else
/* 动态创建工作项 */
struct work_struct work;
INIT_WORK(struct work_struct *work, work_func_t func);
PREPARE_WORK(struct work_struct *work, work_func_t func);
INIT_DELAYED_WORK(struct delayed_work *work, work_func_t func);
PREPARE_DELAYED_WORK(struct delayed_work *work, work_func_t func);
#endif
该系列宏最终都会创建一个以 work
命名的工作项,并设置了回调函数 func
,例如:
void workqueue_func(void * arg);
#if 0
DECLARE_WORK(work, workqueue_func);
#else
// 相当于
struct work_struct work;
INIT_WORK(&work, workqueue_func);
#endif
描述:声明一个工作,绑定对应的执行函数。
调度work
// 使用内核默认的 workqueue
int schedule_work(struct work_struct *work);
// 使用指定的 workqueue
int queue_work(struct workqueue_struct *wq, struct work_struct *work);
描述:将工作项添加到某条wq,内核会适时执行。
参数解析:
- wq:workqueue
- work:需要执行的任务。
取消work
如果需要取消一个挂起的workqueue中的工作项 , 调用:
int cancel_delayed_work(struct work_struct *work);
描述:取消workqueue中挂起的工作项。
返回值:
- 如果这个工作项在它开始执行前被取消,返回值是非零。内核保证给定工作项的执行不会在调用 cancel_delay_work 成功后被执行。
- 如果 cancel_delay_work 返回0,则这个工作项可能已经运行在一个不同的处理器,并且仍然可能在调用 cancel_delayed_work之后被执行。要绝对确保工作函数没有在 cancel_delayed_work 返回 0 后在任何地方运行,你必须跟随这个调用之后接着调用flush_workqueue。在 flush_workqueue 返回后。任何在改调用之前提交的工作函数都不会在系统任何地方运行。
例子
这个例子展示了如何创建workqueue并在其中执行work。
struct my_work_struct{
int test;
struct work_struct save;
};
struct my_work_struct test_work;
struct workqueue_struct *test_workqueue;
void do_save(struct work_struct *p_work)
{
struct my_work_struct *p_test_work = container_of(p_work, struct my_work_struct, save);
printk("%d
",p_test_work->test);
}
void test_init()
{
INIT_WORK(&(test_work.save), do_save);
test_work.test = 1;
#if 0
test_workqueue = create_workqueue("test_workqueue");
if (!test_workqueue)
panic("Failed to create test_workqueue
");
queue_work(test_workqueue, &(test_work.save));
#else
schedule_work(&(test_work.save));
#endif
}
void test_destory(void)
{
if(test_workqueue)
destroy_workqueue(test_workqueue);
}
这个是常规中使用默认wq的例子。
delaywork
在内核中,除了work_struct外还有一个结构体delayed_work
该工作队列里拥有一个timer定时器结构体,从而实现延时工作。
struct delayed_work {
struct work_struct work;
struct timer_list timer;
};
使用delaywork
定义
struct delayed_work my_delaywork;
初始化
绑定delaywork对应的函数
INIT_DELAYED_WORK(&my_delaywork , my_delaywork_func);
实现对应的回调函数:
static void my_delaywork_func(struct work_struct *work)
{
printk("delaywork running
");
}
调度delaywork
// 使用内核默认的 workqueue
static inline bool schedule_delayed_work(struct delayed_work *dwork,
unsigned long delay);
// 提交到指定的 wq 上执行。
int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay);
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay);
使用delayed workqueue最主要的是调用queue_delayed_work。
描述:提交work到某条wq,工作项随后在某个合适时机将被执行。
参数解析:
- wq:workqueue
- work:需要执行的任务。
- dalay:延迟时间,以ms为单位。(内部通过注册定时器实现延迟)最少延迟 delay jiffies 之后该工作才会被执行
例子:
schedule_delayed_work(&my_delaywork , msecs_to_jiffies(200));
取消delaywork
cancel_delayed_work_sync(&my_delaywork);
例子
#include <linux/workqueue.h>
// ...
struct my_work_struct{
int test;
struct delayed_work my_delay_work;
};
static struct my_work_struct test_work;
static struct workqueue_struct *test_workqueue;
void do_my_delay_work(struct work_struct *p_work)
{
// 注意,这里的container_of要求 `.work`。
struct my_work_struct *p_test_work = container_of(p_work, struct my_work_struct, my_delay_work.work);
printk("%d
",p_test_work->test);
// 再次运行
schedule_delayed_work(&(test_work.my_delay_work), msecs_to_jiffies(200));
}
void test_init()
{
unsigned long delay_ms;
INIT_DELAYED_WORK(&(test_work.my_delay_work), do_my_delay_work);
test_work.test = 1;
// 延迟 200ms
delay_ms = msecs_to_jiffies(200);
#if 1
// 以 独立的 wq 运行work
test_workqueue = create_workqueue("test_workqueue");
if (!test_workqueue)
panic("Failed to create test_workqueue
");
queue_delayed_work(test_workqueue, &(test_work.my_delay_work), delay_ms);
#else
// 以内核中默认的wq运行work
schedule_delayed_work(&(test_work.my_delay_work), delay_ms);
#endif
}
void test_destory(void)
{
if(test_workqueue)
destroy_workqueue(test_workqueue);
}
并发可管理workqueue
介绍
并发可管理工作队列:Concurrency-managed workqueues,CMWQ。
在2.6.36 之前的workqueue,其核心是每个workqueue都有专有的内核线程为其服务——系统范围内的 ST 或每个 CPU 都有一个内核线程的MT。
新的 cmwq 在实现上摒弃了这一点:不再有专有的线程与每个workqueue关联。事实上,现在变成了 Online CPU number + 1个线程池来为workqueue服务,这样将线程的管理权实际上从workqueue的使用者交还给了内核。
当一个工作项被创建以及排队,将在合适的时机被传递给其中一个线程,而 cmwq 最有意思的改变是:被提交到相同workqueue,相同 CPU 的工作项可能并发执行,这也是命名为并发可管理workqueue的原因。
兼容性
cmwq 的实现遵循了以下几个原则:
- 与原有的workqueue接口保持兼容,cmwq 只是更改了创建workqueue的接口,很容易移植到新的接口。
- workqueue共享 per-CPU 的线程池,提供灵活的并发级别而不再浪费大量的资源。
- 自动平衡工作者线程池和并发级别,这样workqueue的用户不再需要关注如此多的细节。
在workqueue的用户眼中,cmwq 与之前的workqueue相比,创建workqueue的接口实现的后端有所改变,现在的新接口为:alloc_workqueue
。其他用法与workqueue相同。
对于 MT 的情况,当用 queue_work 向cwq 上提交工作项节点时, 是哪个 active CPU 正在调用该函数,那么便向该 CPU 对应的 cwq 上的 worklist上增加工作项节点。
alloc_workqueue
struct workqueue_struct *alloc_workqueue(char *name, unsigned int flags, int max_active);
描述:创建cmwq。
参数解析:
name:为workqueue的名字,而不像 2.6.36 之前实际是为workqueue服务的内核线程的名字。
flag 指明workqueue的属性,可以设定的标记如下:
- WQ_NON_REENTRANT:默认情况下,workqueue只是确保在同一 CPU 上不可重入,即工作项不能在同一 CPU上被多个工作者线程并发执行,但容许在多个 CPU 上并发执行。但该标志标明在多个 CPU上也是不可重入的,工作项将在一个不可重入workqueue中排队,并确保至多在一个系统范围内的工作者线程被执行。
- WQ_UNBOUND:工作项被放入一个由特定 gcwq 服务的未限定workqueue,该客户工作者线程没有被限定到特定的 CPU,这样,未限定工作者队列就像简单的执行上下文一般,没有并发管理。未限定的 gcwq 试图尽可能快的执行工作项。
- WQ_FREEZEABLE:可冻结 wq 参与系统的暂停操作。该workqueue的工作项将被暂停,除非被唤醒,否者没有新的工作项被执行。
- WQ_MEM_RECLAIM:所有的workqueue可能在内存回收路径上被使用。使用该标志则保证至少有一个执行上下文而不管在任何内存压力之下。
- WQ_HIGHPRI:高优先级的工作项将被排练在队列头上,并且执行时不考虑并发级别;换句话说,只要资源可用,高优先级的工作项将尽可能快的执行。高优先工作项之间依据提交的顺序被执行。
- WQ_CPU_INTENSIVE:CPU 密集的工作项对并发级别并无贡献,换句话说,可运行的 CPU 密集型工作项将不阻止其它工作项。这对于限定得工作项非常有用,因为它期望更多的 CPU 时钟周期,所以将它们的执行调度交给系统调度器。
返回值:成功时返回一个workqueue_struct
实例。