zoukankan      html  css  js  c++  java
  • asyncio系列之Queue实现

      1 import types
      2 import select
      3 import time
      4 import socket
      5 import functools
      6 import collections
      7 
      8 
      9 class Future:
     10     def __init__(self, *, loop=None):
     11         self._result = None
     12         self._callbacks = []
     13         self._loop = loop
     14 
     15     def set_result(self, result):
     16         self._result = result
     17         callbacks = self._callbacks[:]
     18         self._callbacks = []
     19         for callback in callbacks:
     20             self._loop._ready.append(callback)
     21 
     22     def add_callback(self, callback):
     23         self._callbacks.append(callback)
     24 
     25     def __iter__(self):
     26         print("挂起在yield处")
     27         yield self
     28         print("恢复执行")
     29         return "future"
     30 
     31     __await__ = __iter__
     32 
     33 
     34 class Task:
     35     def __init__(self, cor, *, loop=None):
     36         self.cor = cor
     37         self._loop = loop
     38 
     39     def _step(self):
     40         cor = self.cor
     41         try:
     42             result = cor.send(None)
     43         except StopIteration as e:
     44             self._loop._task_count -= 1
     45             if self._loop._task_count == 0:
     46                 self._loop.close()
     47         except Exception as e:
     48             pass
     49         else:
     50             if isinstance(result, Future):
     51                 result.add_callback(self._wakeup)
     52 
     53     def _wakeup(self):
     54         self._step()
     55 
     56 
     57 class Loop:
     58     def __init__(self):
     59         self._stop = False
     60         self._ready = []
     61         self._scheduled = []
     62         self._time = lambda: time.time()
     63         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     64         sock.setblocking(False)
     65         self._select = functools.partial(select.select, [sock], [], [])
     66         self._task_count = 0
     67 
     68     def create_task(self, cor):
     69         task = Task(cor, loop=self)
     70         self._ready.append(task._step)
     71         self._task_count += 1
     72         return task
     73 
     74     def call_later(self, delay, callback, *args):
     75         callback._when = delay
     76         self._scheduled.append((callback, *args))
     77 
     78     def run_until_complete(self, task):
     79         assert isinstance(task, Task)
     80         timeout = None
     81         while not self._stop:
     82             if self._ready:
     83                 timeout = 0
     84             if self._scheduled:
     85                 callback, *args = self._scheduled.pop()
     86                 timeout = callback._when
     87                 self._ready.append(functools.partial(callback, *args))
     88 
     89                 self._select(timeout)
     90             n = len(self._ready)
     91             for i in range(n):
     92                 step = self._ready.pop()
     93                 step()
     94 
     95     def close(self):
     96         self._stop = True
     97 
     98 
     99 @types.coroutine
    100 def _sleep():
    101     yield
    102 
    103 
    104 async def sleep(s, result=None):
    105     if s <= 0:
    106         await _sleep()
    107         return result
    108     else:
    109         future = Future(loop=loop)
    110         future._loop.call_later(s, unless_cancelled, future)
    111         await future
    112         return result
    113 
    114 
    115 def unless_cancelled(future):
    116     future.set_result(None)
    117 
    118 
    119 class Queue:
    120     def __init__(self, maxsize=0, *, loop=None):
    121         self._loop = loop
    122         self._maxsize = maxsize
    123 
    124         # Futures.
    125         self._getters = collections.deque()
    126         # Futures.
    127         self._putters = collections.deque()
    128         self._init(maxsize)
    129 
    130     def _init(self, maxsize):
    131         self._queue = collections.deque()
    132 
    133     def _get(self):
    134         return self._queue.popleft()
    135 
    136     def _put(self, item):
    137         self._queue.append(item)
    138 
    139     def _wakeup_next(self, waiters):
    140         while waiters:
    141             waiter = waiters.popleft()
    142             try:
    143                 waiter.set_result(None)
    144             except Exception as e:
    145                 pass
    146             break
    147 
    148     def qsize(self):
    149         return len(self._queue)
    150 
    151     @property
    152     def maxsize(self):
    153         return self._maxsize
    154 
    155     def empty(self):
    156         return not self._queue
    157 
    158     def full(self):
    159         if self._maxsize <= 0:
    160             return False
    161         else:
    162             return self.qsize() >= self._maxsize
    163 
    164     @types.coroutine
    165     def put(self, item):
    166         while self.full():
    167             putter = Future(loop=self._loop)
    168             self._putters.append(putter)
    169             try:
    170                 yield from putter
    171             except:
    172                 if not self.full():
    173                     self._wakeup_next(self._putters)
    174                 raise
    175         return self.put_nowait(item)
    176 
    177     def put_nowait(self, item):
    178         if self.full():
    179             raise
    180         self._put(item)
    181         self._wakeup_next(self._getters)
    182 
    183     @types.coroutine
    184     def get(self):
    185         while self.empty():
    186             getter = Future(loop=self._loop)
    187             self._getters.append(getter)
    188             try:
    189                 yield from getter
    190             except:
    191                 try:
    192                     self._getters.remove(getter)
    193                 except ValueError:
    194                     pass
    195 
    196                 if not self.empty():
    197                     self._wakeup_next(self._getters)
    198                 raise
    199         return self.get_nowait()
    200 
    201     def get_nowait(self):
    202         if self.empty():
    203             raise
    204         item = self._get()
    205         self._wakeup_next(self._putters)
    206         return item
    207 
    208 
    209 async def foo(queue):
    210     print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
    211     item = await queue.get()
    212     print('foo get ', item)
    213     await sleep(2)
    214     item = await queue.get()
    215     print('foo get ', item)
    216     print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
    217 
    218 
    219 async def goo(queue):
    220     print(f'enter goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
    221     print('goo put a')
    222     await queue.put('a')
    223     # await sleep(2)
    224     print('goo put b')
    225     await queue.put('b')
    226     print(f'exit goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
    227 
    228 if __name__ == '__main__':
    229     loop = Loop()
    230     queue = Queue(maxsize=1, loop=loop)
    231     f = foo(queue)
    232     g = goo(queue)
    233     task1 = loop.create_task(f)
    234     task2 = loop.create_task(g)
    235     loop.run_until_complete(task1)
  • 相关阅读:
    LeetCode 25 Reverse Nodes in k-Group
    圆桌派:家世背景对人的影响有多大
    BibTex 学习笔记
    R parallel包实现多线程1
    IIS学习笔记
    高效完成R代码
    圆桌派 :我们,朋友一生一起走
    高文欣个人简介
    R语言函数话学习笔记5
    git学习笔记1
  • 原文地址:https://www.cnblogs.com/yejing-snake/p/13560958.html
Copyright © 2011-2022 走看看