zoukankan      html  css  js  c++  java
  • [Python 多线程] Condition (十)

    Condition常用于生产者、消费者模型,为了解决生产者消费者速度匹配问题。

    构造方法Condition(lock=None),可以传入一个Lock或RLock对象,默认RLock。

    方法:

    acquire(*args)  获取锁

    release()     释放锁

    wait(timeout=None)  等待通知或直到发生超时

    notify(n=1)  唤醒至多指定个数的等待的线程,没有等待的线程就没有任何操作

    notify_all()  唤醒所有等待的线程。wake up

    以下例子,不考虑线程安全问题:

    例1:

    #Condition
    import threading,random,logging
    logging.basicConfig(level=logging.INFO)
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
    
        def produce(self):
            for i in range(100):
                self.event.wait(1) #每1秒生成一条数据
                data = random.randint(1,100)
                self.data = data
    
        def custom(self):
            while True:
                logging.info(self.data)
                self.event.wait(0.5) # 替换为1表示消费者每1秒取一次数据
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce)
    c = threading.Thread(target=d.custom)
    
    c.start()
    p.start()
    
    以下结果:
    INFO:root:0
    INFO:root:0
    INFO:root:0
    INFO:root:77
    INFO:root:64
    INFO:root:64
    INFO:root:8
    INFO:root:8
    INFO:root:85
    

      生产者每1秒钟生成一条数据,消费者每0.5秒就来取一次数据。

    例2:

    #Condition 通知机制,解决重复
    import threading,random,logging
    logging.basicConfig(level=logging.INFO)
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                with self.cond:
                    self.data = data
                    self.cond.notify_all() #通知所有waiter
                self.event.wait(1) #1秒生产一次数据
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait() #无限等待
                    logging.info(self.data) #消费
    
                self.event.wait(0.5) #0.5秒消费一次数据
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce)
    c = threading.Thread(target=d.custom)
    
    c.start()
    p.start()
    
    运行结果:
    INFO:root:64
    INFO:root:43
    INFO:root:11
    INFO:root:28
    INFO:root:30
    INFO:root:33
    INFO:root:93
    INFO:root:69
    INFO:root:4
    

      使用with来管理Condition的上下文(acquire/release),利用Condition通知机制解决消费者获取重复数据。

    例3:

    #Condition 先生成后消费,1对1
    import threading,random,logging
    logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                logging.info(self.data)
                with self.cond:
                    self.data = data
                    self.cond.notify(1)
                    # self.cond.notify_all()
                self.event.wait(1)
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait()
                    logging.info(self.data)
    
                self.event.wait(0.5)
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce,name='produce')
    c = threading.Thread(target=d.custom,name='c')
    c1 = threading.Thread(target=d.custom,name='c1')
    p.start()
    
    e = threading.Event()
    e.wait(3)
    
    c1.start()
    c.start()
    
    运行结果:
    7520 produce 0
    7520 produce 78
    7520 produce 88
    7520 produce 14
    7520 produce 83
    2508 c1 86
    7520 produce 86
    1136 c 79
    7520 produce 79
    2508 c1 77
    7520 produce 77
    1136 c 47
    7520 produce 47
    2508 c1 76
    7520 produce 76
    1136 c 69
    

      生产者先生产数据,2个消费者一个一个来消费数据。

    例4:

    #Condition 1对多,2个2个通知
    import threading,random,logging
    logging.basicConfig(level=logging.INFO,format="%(thread)d %(threadName)s %(message)s")
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.event = threading.Event()
            self.cond = threading.Condition()
    
        def produce(self):
            for i in range(100):
                data = random.randint(1,100)
                logging.info(self.data)
                with self.cond:
                    self.data = data
                    self.cond.notify(2)
                    # self.cond.notify_all()
                self.event.wait(1)
    
        def custom(self):
            while True:
                with self.cond:
                    self.cond.wait()
                    logging.info(self.data)
                # self.event.wait(0.5)
    
    
    d = Dispatcher()
    p = threading.Thread(target=d.produce,name='produce')
    
    for i in range(5):
        threading.Thread(target=d.custom,name='c-{}'.format(i)).start()
    
    p.start()
    
    以下结果:
    8688 produce 0
    10376 c-0 90
    7928 c-1 90
    8688 produce 90
    7640 c-2 61
    10748 c-3 61
    8688 produce 61
    10376 c-0 73
    1344 c-4 73
    8688 produce 73
    7928 c-1 57
    7640 c-2 57
    8688 produce 57
    10748 c-3 71
    10376 c-0 71
    

      1对多,2个2个通知来处理数据。

    以上例子中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解Condition的使用,和生产者消费者模型。

    Condition总结:

    Condition采用通知机制,常用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

    使用方法:

    使用Condition,必须先acquire,用完之后要release,因为内部使用了锁,默认使用RLock,最好的方法是使用with上下文管理。

    生产者wait,会阻塞等待通知,被激活。

    生产者生产好消息,对消费者发通知,可以使用notidy_all() 通知所有消费者或者notify()。

  • 相关阅读:
    HDU 4865 Peter's Hobby --概率DP
    UVALive 6093 Emergency Room --优先队列实现的模拟
    UVALive 6665 Dragon’s Cruller --BFS,类八数码问题
    UVALive 6092 Catching Shade in Flatland --枚举+几何计算
    UVALive 6168 Fat Ninjas --二分小数+搜索
    九连环-递归解法
    一道题看bitset应用 --ZOJ 3642
    UVALive 6663 Count the Regions --离散化+DFS染色
    ZOJ 1111 Poker Hands --复杂模拟
    UVALive 6449 IQ Test --高斯消元?
  • 原文地址:https://www.cnblogs.com/i-honey/p/8068195.html
Copyright © 2011-2022 走看看