zoukankan      html  css  js  c++  java
  • 使用 Condition 实现简单的队列

    使用 Condition 实现简单的队列

    队列特点

    队列有以下特点:

    • 先进先出

    如果要在多线程中使用,还要满足:

    • 从空队列中获取元素会阻塞当前线程,直到队列不为空(其它线程向队列中添加了元素);
    • 向已满的队列添加元素会阻塞当前线程,直到队列不满(其它线程从队列中取出了元素);

    使用 Condition 实现简单的队列

    from threading import Condition, Thread, Lock
    
    class MyQueue:
        def __init__(self, cap):
            self.cap = cap
            lock = Lock()
            self.not_full = Condition(lock)
            self.not_empty = Condition(lock)
            self.container = []
    
        def put(self, item):
            with self.not_full:
                if len(self.container) >= self.cap:
                    # 阻塞直到其它线程调用self.not_full.notify
                    self.not_full.wait()
                print('put', item)
                self.container.append(item)
                self.not_empty.notify()
    
        def get(self):
            with self.not_empty:
                if len(self.container) <= 0:
                    # 阻塞直到其它线程调用self.not_empty.notify
                    self.not_empty.wait()
                item = self.container.pop(0)
                print('get', item)
                self.not_full.notify()
                return item
    
    

    测试

    import time
    
    q = MyQueue(5)
    
    
    def consumer():
        while True:
            time.sleep(1)
            q.get()
    
    
    def producer(name):
        num = 0
        while True:
            num += 1
            item = f"{name}_{num}"
            q.put(item)
            time.sleep(1)
    
    
    if __name__ == "__main__":
        c1 = Thread(target=consumer)
        p1 = Thread(target=producer, args=('p1',))
        p2 = Thread(target=producer, args=('p2',))
        for t in (c1, p1, p2):
            t.start()
    

    Conditon 实现

    Conditon 是用 Lock 实现的.

    关键方法如下:

    • wait
      创建一个锁,连续 acquire 两次,这是线程就会被阻塞;
    • notify
      在另外的线程中释放 wait 时创建的锁,这时第一个线程就可以获取到锁继续执行;

    Demo 如下:

    from threading import Lock
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    class Condition:
        def __init__(self):
            self.lock = None
    
        def wait(self):
            self.lock = Lock()
            self.lock.acquire()
            self.lock.acquire()
    
        def notify(self):
            self.lock.release()
    
    
    cond = Condition()
    q = []
    
    
    def t1():
        while True:
            if len(q) == 0:
                cond.wait()
            item = q.pop(0)
            print(f'get {item} from queue')
    
    
    def t2():
        for i in range(5):
            print(f'put {i} to queue')
            q.append(i)
            cond.notify()
            time.sleep(1)
    
    
    if __name__ == '__main__':
        with ThreadPoolExecutor() as e:
            e.submit(t1)
            e.submit(t2)
    
    

    结果:

    put 0 to queue
    get 0 from queue
    put 1 to queue
    get 1 from queue
    put 2 to queue
    get 2 from queue
    put 3 to queue
    get 3 from queue
    put 4 to queue
    get 4 from queue
    

    Condition 的实际实现比 Demo 中要复杂的多,但基本原理确是相同的,其中一个关键点就是release 锁和 acquire 锁不一定是同一个线程,所以在下面的例子中是不会造成死锁的.

    from threading import Lock
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    l = Lock()
    
    
    def t1():
        l.acquire()
        l.acquire()
        print('t1')
    
    
    def t2():
        time.sleep(1)
        l.release()
    
    
    if __name__ == '__main__':
        with ThreadPoolExecutor() as e:
            e.submit(t1)
            e.submit(t2)
    
    
  • 相关阅读:
    公输盘
    电脑机器刷BIOS
    八皇后问题的实现
    安装msdn出现的问题及解决
    加密推荐书籍
    C++待解
    atan()与atan2()
    Win32/MFC/COM学习推荐书籍
    C++问题
    windows2000 sp4下载
  • 原文地址:https://www.cnblogs.com/aloe-n/p/13660953.html
Copyright © 2011-2022 走看看