zoukankan      html  css  js  c++  java
  • Python高级编程之生成器(Generator)与coroutine(四):一个简单的多任务系统

    啊,终于要把这一个系列写完整了,好高兴啊

    在前面的三篇文章中介绍了Python的Python的Generator和coroutine(协程)相关的编程技术,接下来这篇文章会用Python的coroutine技术实现一个简单的多任务的操作系统

    代码如下,可看注释

      1 #-*-coding:utf-8 -*-
      2 '''
      3 用Python和coroutine实现一个简单的多任务系统
      4 '''
      5 # ##Step 1:Define Tasks###################################
      6 import select
      7 class Task(object):
      8     taskid = 0
      9 
     10     def __init__(self,target):
     11         Task.taskid += 1
     12         self.tid = Task.taskid    # Task id
     13         self.target = target      # Target coroutine
     14         self.sendval = None       # Value to send
     15 
     16     def run(self):
     17         return self.target.send(self.sendval)
     18 # ###############################################
     19 
     20 # ##Step 2:The Scheduler#########################
     21 import Queue
     22 class Scheduler(object):
     23     def __init__(self):
     24         self.ready = Queue.Queue()
     25         self.taskmap = {}
     26 
     27         # 正在等待的Tasks,key是taskid
     28         self.exit_waiting = {}
     29 
     30         # 异步IO
     31         # Holding areas for tasks blocking on I/O.These are
     32         # dictionaries mapping file descriptions to tasks
     33         # 键值为文件描述符
     34         self.read_waiting = {}
     35         self.write_waiting = {}
     36 
     37 
     38     def iotask(self):
     39         while True:
     40             if self.ready.empty():
     41                 # 如果ready为空,表示没有正在等待执行的队列
     42                 # timeout 为None,表示不关心任何文件描述符的变化
     43                 self.iopool(None)
     44             else:
     45                 # ready不为空,则设置select函数不管文件描述符是否发生变化都立即返回
     46                 self.iopool(0)
     47             yield
     48 
     49 
     50     def new(self,target):
     51         newtask = Task(target)
     52         self.taskmap[newtask.tid] = newtask
     53         self.schedule(newtask)
     54         return newtask.tid
     55 
     56     def schedule(self,task):
     57         # 把task放到任务队列中去
     58         self.ready.put(task)
     59 
     60     def exit(self,task):
     61         print "Task %d terminated" %task.tid
     62         del self.taskmap[task.tid]
     63         # Notify other tasks waiting for exit
     64         # 把正在等待的任务加入到正在执行的队列中去
     65         for task in self.exit_waiting.pop(task.tid,[]):
     66             self.schedule(task)
     67 
     68     def waitforexit(self,task,waittid):
     69         '''
     70         让一个任务等待另外一个任务,把这个任务加入到exit_waiting中去
     71         返回True表示这个task正在等待队列中
     72         '''
     73         if waittid in self.taskmap:
     74             self.exit_waiting.setdefault(waittid,[]).append(task)
     75             return True
     76         else:
     77             return False
     78 
     79 
     80     def waitforread(self,task,fd):
     81         '''
     82         functions that simply put a task into to
     83         one of the above dictionaries
     84         '''
     85         self.read_waiting[fd] = task
     86 
     87     def waitforwrite(self,task,fd):
     88         self.write_waiting[fd] = task
     89 
     90     def iopool(self,timeout):
     91         '''
     92         I/O Polling.Use select() to determine which file
     93         descriptors can be used.Unblock any associated task
     94         '''
     95         if self.read_waiting or self.write_waiting:
     96             # 获取I/O事件,一旦获取到,就放入到执行队列中取,等待执行
     97             r,w,e = select.select(self.read_waiting,
     98                                   self.write_waiting,[],timeout)
     99             for fd in r:
    100                 self.schedule(self.read_waiting.pop(fd))
    101 
    102             for fd in w:
    103                 self.schedule(self.write_waiting.pop(fd))
    104 
    105     def mainloop(self):
    106         self.new(self.iotask())  # Launch I/O polls
    107         while self.taskmap:
    108             task = self.ready.get()
    109             try:
    110                 result = task.run()
    111                 # 如果task执行的是System call,则对当前环境进行保存
    112                 # 然后在执行System Call
    113                 if isinstance(result,SystemCall):
    114                     # 把当前的环境保存,即保存当前运行的task和sched
    115                     result.task = task
    116                     result.sched = self
    117                     result.handle()
    118                     continue
    119             except StopIteration:
    120                 self.exit(task)
    121                 # print("task is over")
    122                 continue
    123             self.schedule(task)
    124 # ##Step 2:The Scheduler#########################
    125 
    126 
    127 # ##SystemCall#########################
    128 class SystemCall(object):
    129     '''
    130     所有系统调用的基类,继承自该类的类要重写handle函数
    131     '''
    132     def handle(self):
    133         pass
    134 
    135 
    136 class GetTid(SystemCall):
    137     '''
    138     获取任务ID
    139     '''
    140     def handle(self):
    141         self.task.sendval = self.task.tid
    142         self.sched.schedule(self.task)
    143 
    144 
    145 class NewTask(SystemCall):
    146     '''
    147     新建一个Task
    148     '''
    149     def __init__(self,target):
    150         self.target = target
    151 
    152     def handle(self):
    153         # 在这里把target封装成Task
    154         # 是在这里把新生成的task加入到执行队列当中去
    155         tid = self.sched.new(self.target)
    156         self.task.sendval = tid
    157         # 把执行这个系统调用的父task重新加入到执行队列中去
    158         # 这一点很关键,因为判断一个task是否结束是通过taskmap的
    159         # 这个task只是暂时被挂起,要重新放到queue中去
    160         self.sched.schedule(self.task)
    161 
    162 class KillTask(SystemCall):
    163     '''
    164     杀死一个Task
    165     '''
    166     def __init__(self,tid):
    167         self.tid = tid
    168 
    169     def handle(self):
    170         task = self.sched.taskmap.get(self.tid,None)
    171         # task指的是要被kill掉的那个task
    172         # self.task指的是发起KillTask这个系统调用task
    173         if task:
    174             task.target.close()
    175             self.task.sendval = None
    176         else:
    177             self.task.sendval = False
    178         # target.close()只是产生一个StopIteration异常
    179         self.sched.schedule(self.task)
    180 
    181 
    182 class WaitTask(SystemCall):
    183     '''
    184     让任务进行等待 系统调用
    185     '''
    186     def __init__(self,tid):
    187         self.tid = tid
    188 
    189     def handle(self):
    190         result = self.sched.waitforexit(self.task,self.tid)
    191         self.task.sendval = result
    192         # 如果等待的是一个不存在的task,则立即返回
    193         if not  result:
    194             self.sched.schedule(self.task)
    195 
    196 
    197 
    198 
    199 class ReadWait(SystemCall):
    200     '''
    201     异步读 系统调用
    202     '''
    203     def __init__(self,f):
    204         self.f = f
    205 
    206     def handle(self):
    207         fd = self.f.fileno()
    208         self.sched.waitforread(self.task,fd)
    209 
    210 class WriteWait(SystemCall):
    211     '''
    212     异步写 系统调用
    213     '''
    214     def _init__(self,f):
    215         self.f = f
    216 
    217     def handle(self):
    218         fd = self.f.fileno()
    219         self.sched.waitforwrite(self.task,fd)
  • 相关阅读:
    Day 25 网络基础2
    Day 25 网络基础
    Day 24 定时任务
    Day 23 系统服务之救援模式
    Day4 总结
    Day 22 进程管理2之系统的平均负载
    【Distributed】分布式Session一致性问题
    【Distributed】分布式系统中遇到的问题
    【Redis】分布式Session
    【Zookeeper】应用场景概述
  • 原文地址:https://www.cnblogs.com/rio2607/p/4570353.html
Copyright © 2011-2022 走看看