zoukankan      html  css  js  c++  java
  • [Python 多线程] Condition (十)

    Condition常用于生产者、消费者模型,为了解决生产者消费者速度匹配问题。

    构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认RLock。

    方法:

    acquire(*args)  获取锁

    release()     释放锁

    wait(timeout=None)  等待通知或直到发生超时

    notify(n=1)  唤醒至多指定个数的等待的线程,没有等待的线程就没有任何操作

    notify_all()  唤醒所有等待的线程。wake up

    以下例子,不考虑线程安全问题:

    例1:

    #Condition
    import threading,random,logging
    logging.basicConfig(level=logging.INFO)
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
    
        def produce(self):
            for i in range(100):
                self.event.wait(1) #每1秒生成一条数据
                data = random.randint(1,100)
                self.data = data
    
        def custom(self):
            while True:
                logging.info(self.data)
                self.event.wait(0.5) # 替换为1表示消费者每1秒取一次数据
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce)
    c = threading.Thread(target=d.custom)
    
    c.start()
    p.start()
    
    以下结果:
    INFO:root:0
    INFO:root:0
    INFO:root:0
    INFO:root:77
    INFO:root:64
    INFO:root:64
    INFO:root:8
    INFO:root:8
    INFO:root:85
    

      生产者每1秒钟生成一条数据,消费者每0.5秒就来取一次数据。

    例2:

    #Condition 通知机制,解决重复
    import threading,random,logging
    logging.basicConfig(level=logging.INFO)
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                with self.cond:
                    self.data = data
                    self.cond.notify_all() #通知所有waiter
                self.event.wait(1) #1秒生产一次数据
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait() #无限等待
                    logging.info(self.data) #消费
    
                self.event.wait(0.5) #0.5秒消费一次数据
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce)
    c = threading.Thread(target=d.custom)
    
    c.start()
    p.start()
    
    运行结果:
    INFO:root:64
    INFO:root:43
    INFO:root:11
    INFO:root:28
    INFO:root:30
    INFO:root:33
    INFO:root:93
    INFO:root:69
    INFO:root:4
    

      使用with来管理Condition的上下文(acquire/release),利用Condition通知机制解决消费者获取重复数据。

    例3:

    #Condition 先生成后消费,1对1
    import threading,random,logging
    logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                logging.info(self.data)
                with self.cond:
                    self.data = data
                    self.cond.notify(1)
                    # self.cond.notify_all()
                self.event.wait(1)
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait()
                    logging.info(self.data)
    
                self.event.wait(0.5)
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce,name='produce')
    c = threading.Thread(target=d.custom,name='c')
    c1 = threading.Thread(target=d.custom,name='c1')
    p.start()
    
    e = threading.Event()
    e.wait(3)
    
    c1.start()
    c.start()
    
    运行结果:
    7520 produce 0
    7520 produce 78
    7520 produce 88
    7520 produce 14
    7520 produce 83
    2508 c1 86
    7520 produce 86
    1136 c 79
    7520 produce 79
    2508 c1 77
    7520 produce 77
    1136 c 47
    7520 produce 47
    2508 c1 76
    7520 produce 76
    1136 c 69
    

      生产者先生产数据,2个消费者一个一个来消费数据。

    例4:

    #Condition 1对多,2个2个通知
    import threading,random,logging
    logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                logging.info(self.data)
                with self.cond:
                    self.data = data
                    self.cond.notify(2)
                    # self.cond.notify_all()
                self.event.wait(1)
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait()
                    logging.info(self.data)
                # self.event.wait(0.5)
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce,name='produce')
    
    for i in range(5):
        threading.Thread(target=d.custom,name='c-{}'.format(i)).start()
    
    p.start()
    
    以下结果:
    8688 produce 0
    10376 c-0 90
    7928 c-1 90
    8688 produce 90
    7640 c-2 61
    10748 c-3 61
    8688 produce 61
    10376 c-0 73
    1344 c-4 73
    8688 produce 73
    7928 c-1 57
    7640 c-2 57
    8688 produce 57
    10748 c-3 71
    10376 c-0 71
    

      1对多,2个2个通知来处理数据。

    以上例子中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解Condition的使用,和生产者消费者模型。

    Condition总结:

    Condition采用通知机制,常用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

    使用方法:

    使用Condition,必须先acquire,用完之后要release,因为内部使用了锁,默认使用RLock,最好的方法是使用with上下文管理。

    生产者wait,会阻塞等待通知,被激活。

    生产者生产好消息,对消费者发通知,可以使用notidy_all() 通知所有消费者或者notify()。

  • 相关阅读:
    C#深入浅出 修饰符(二)
    HDU 5785 Interesting
    HDU 5783 Divide the Sequence
    HDU 5781 ATM Mechine
    UVA 714 Copying Books
    uva 1471 Defense Lines
    UVA 11134 Fabled Rooks
    UVA 11572 Unique Snowflakes
    UVA 11093 Just Finish it up
    UVA 10954 Add All
  • 原文地址:https://www.cnblogs.com/i-honey/p/8068195.html
Copyright © 2011-2022 走看看