啊,终于要把这一个系列写完整了,好高兴啊
在前面的三篇文章中介绍了Python的Python的Generator和coroutine(协程)相关的编程技术,接下来这篇文章会用Python的coroutine技术实现一个简单的多任务的操作系统
代码如下,可看注释
1 #-*-coding:utf-8 -*- 2 ''' 3 用Python和coroutine实现一个简单的多任务系统 4 ''' 5 # ##Step 1:Define Tasks################################### 6 import select 7 class Task(object): 8 taskid = 0 9 10 def __init__(self,target): 11 Task.taskid += 1 12 self.tid = Task.taskid # Task id 13 self.target = target # Target coroutine 14 self.sendval = None # Value to send 15 16 def run(self): 17 return self.target.send(self.sendval) 18 # ############################################### 19 20 # ##Step 2:The Scheduler######################### 21 import Queue 22 class Scheduler(object): 23 def __init__(self): 24 self.ready = Queue.Queue() 25 self.taskmap = {} 26 27 # 正在等待的Tasks,key是taskid 28 self.exit_waiting = {} 29 30 # 异步IO 31 # Holding areas for tasks blocking on I/O.These are 32 # dictionaries mapping file descriptions to tasks 33 # 键值为文件描述符 34 self.read_waiting = {} 35 self.write_waiting = {} 36 37 38 def iotask(self): 39 while True: 40 if self.ready.empty(): 41 # 如果ready为空,表示没有正在等待执行的队列 42 # timeout 为None,表示不关心任何文件描述符的变化 43 self.iopool(None) 44 else: 45 # ready不为空,则设置select函数不管文件描述符是否发生变化都立即返回 46 self.iopool(0) 47 yield 48 49 50 def new(self,target): 51 newtask = Task(target) 52 self.taskmap[newtask.tid] = newtask 53 self.schedule(newtask) 54 return newtask.tid 55 56 def schedule(self,task): 57 # 把task放到任务队列中去 58 self.ready.put(task) 59 60 def exit(self,task): 61 print "Task %d terminated" %task.tid 62 del self.taskmap[task.tid] 63 # Notify other tasks waiting for exit 64 # 把正在等待的任务加入到正在执行的队列中去 65 for task in self.exit_waiting.pop(task.tid,[]): 66 self.schedule(task) 67 68 def waitforexit(self,task,waittid): 69 ''' 70 让一个任务等待另外一个任务,把这个任务加入到exit_waiting中去 71 返回True表示这个task正在等待队列中 72 ''' 73 if waittid in self.taskmap: 74 self.exit_waiting.setdefault(waittid,[]).append(task) 75 return True 76 else: 77 return False 78 79 80 def waitforread(self,task,fd): 81 ''' 82 functions that simply put a task into to 83 one of the above dictionaries 84 ''' 85 self.read_waiting[fd] = task 86 87 def waitforwrite(self,task,fd): 88 self.write_waiting[fd] = task 89 90 def iopool(self,timeout): 91 ''' 92 I/O Polling.Use select() to determine which file 93 descriptors can be used.Unblock any associated task 94 ''' 95 if self.read_waiting or self.write_waiting: 96 # 获取I/O事件,一旦获取到,就放入到执行队列中取,等待执行 97 r,w,e = select.select(self.read_waiting, 98 self.write_waiting,[],timeout) 99 for fd in r: 100 self.schedule(self.read_waiting.pop(fd)) 101 102 for fd in w: 103 self.schedule(self.write_waiting.pop(fd)) 104 105 def mainloop(self): 106 self.new(self.iotask()) # Launch I/O polls 107 while self.taskmap: 108 task = self.ready.get() 109 try: 110 result = task.run() 111 # 如果task执行的是System call,则对当前环境进行保存 112 # 然后在执行System Call 113 if isinstance(result,SystemCall): 114 # 把当前的环境保存,即保存当前运行的task和sched 115 result.task = task 116 result.sched = self 117 result.handle() 118 continue 119 except StopIteration: 120 self.exit(task) 121 # print("task is over") 122 continue 123 self.schedule(task) 124 # ##Step 2:The Scheduler######################### 125 126 127 # ##SystemCall######################### 128 class SystemCall(object): 129 ''' 130 所有系统调用的基类,继承自该类的类要重写handle函数 131 ''' 132 def handle(self): 133 pass 134 135 136 class GetTid(SystemCall): 137 ''' 138 获取任务ID 139 ''' 140 def handle(self): 141 self.task.sendval = self.task.tid 142 self.sched.schedule(self.task) 143 144 145 class NewTask(SystemCall): 146 ''' 147 新建一个Task 148 ''' 149 def __init__(self,target): 150 self.target = target 151 152 def handle(self): 153 # 在这里把target封装成Task 154 # 是在这里把新生成的task加入到执行队列当中去 155 tid = self.sched.new(self.target) 156 self.task.sendval = tid 157 # 把执行这个系统调用的父task重新加入到执行队列中去 158 # 这一点很关键,因为判断一个task是否结束是通过taskmap的 159 # 这个task只是暂时被挂起,要重新放到queue中去 160 self.sched.schedule(self.task) 161 162 class KillTask(SystemCall): 163 ''' 164 杀死一个Task 165 ''' 166 def __init__(self,tid): 167 self.tid = tid 168 169 def handle(self): 170 task = self.sched.taskmap.get(self.tid,None) 171 # task指的是要被kill掉的那个task 172 # self.task指的是发起KillTask这个系统调用task 173 if task: 174 task.target.close() 175 self.task.sendval = None 176 else: 177 self.task.sendval = False 178 # target.close()只是产生一个StopIteration异常 179 self.sched.schedule(self.task) 180 181 182 class WaitTask(SystemCall): 183 ''' 184 让任务进行等待 系统调用 185 ''' 186 def __init__(self,tid): 187 self.tid = tid 188 189 def handle(self): 190 result = self.sched.waitforexit(self.task,self.tid) 191 self.task.sendval = result 192 # 如果等待的是一个不存在的task,则立即返回 193 if not result: 194 self.sched.schedule(self.task) 195 196 197 198 199 class ReadWait(SystemCall): 200 ''' 201 异步读 系统调用 202 ''' 203 def __init__(self,f): 204 self.f = f 205 206 def handle(self): 207 fd = self.f.fileno() 208 self.sched.waitforread(self.task,fd) 209 210 class WriteWait(SystemCall): 211 ''' 212 异步写 系统调用 213 ''' 214 def _init__(self,f): 215 self.f = f 216 217 def handle(self): 218 fd = self.f.fileno() 219 self.sched.waitforwrite(self.task,fd)