zoukankan      html  css  js  c++  java
  • Python线程同步(1)

    概念

    线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

    不同操作系统实现拘束有所不同,有临界区(Critical Section),互斥量(Mutex)、信号量(Semaphore)、事件event等。

    Event(重要)

    event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的true或false的变化来进行操作。

    set():标记设置为true

    clear():标记设置为false

    is_set():标记是否为true

    wait(timeout = none):设置等待标记为true的时长,none为无限等待,等到返回true,未等到超时了返回false.

    需求

    老板雇佣了一个工人,让他生产一个杯子,老板一直等着这个工人,知道生产了10个杯子。

    from threading import Event,Thread
    import logging
    import time
    
    FORMAT = "%(asctime)s %(threadNname)s %(thread)d %(message)s"
    logging.basicConfig(format = FORMAT,level = logging.INFO)
    
    def boss(event:Event):
        logging.info("i'm boss,waiting for u")
        #等待
        event.wait()
        logging.info("good job")
        
    def worker(event:Event,count = 10):
        logging.info("i'm working for u")
        cups = []
        while True:
            logging.info("make 1")
            time.sleep(0.5)
            cups.append(1)
            if len(cups)>= count:
                #通知
                event.set()
                break
        logging.info("i finished my job. cups = {}".format(cups))
        
    event = Event()
    w = Thread(target = worker,args = (event,))
    b = Thread(target = boss,args = (event,))
    w.start()
    b.start()

    结果为:

    2019-11-25 17:13:57,577 Thread-1 9760 i'm working for u
    2019-11-25 17:13:57,577 Thread-1 9760 make 1
    2019-11-25 17:13:57,577 Thread-2 10592 i'm boss,waiting for u.
    2019-11-25 17:13:58,077 Thread-1 9760 make 1
    2019-11-25 17:13:58,577 Thread-1 9760 make 1
    2019-11-25 17:13:59,077 Thread-1 9760 make 1
    2019-11-25 17:13:59,577 Thread-1 9760 make 1
    2019-11-25 17:14:00,077 Thread-1 9760 make 1
    2019-11-25 17:14:00,577 Thread-1 9760 make 1
    2019-11-25 17:14:01,077 Thread-1 9760 make 1
    2019-11-25 17:14:01,577 Thread-1 9760 make 1
    2019-11-25 17:14:02,077 Thread-1 9760 make 1
    2019-11-25 17:14:02,577 Thread-1 9760 i finished my job. cups = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    2019-11-25 17:14:02,577 Thread-2 10592 good job

     

    总结

    使用同一个Event对象的标记flag,谁wait就是等到flag变为true,或等到超时返回false,不限制等待的个数。

    wait的作用

    from threading import Event,Thread
    import logging
    logging.basicConfig(level = logging.INFO)
    
    def do(event:Event,interval:int):
        while not event.wait(interval):#条件中使用,返回true或false
            logging.info("do sth")
            
    e = Event()
    Thread(target = do,args = (e,3)).start()
    
    e.wait(10)#也可以使用time.sleep(10)
    e.set()
    print("main exit")

    Event的wait优于time.sleep,它会更快的切换到其他线程,提高并发效率。

    Event练习

    实现Timer,延时执行的线程

    延时计算add(x,y)

    思路

    Timer的构造函数中参数得有哪些?

    如何实现start启动一个线程执行函数

    如何cancel取消待执行任务

    思路实现

    from threading import Event,Thread
    import logging
    logging.basicConfig(level = logging.INFO)
    
    def add(x:int,y:int):
        logging.info(x+y)
        
    class Timer():
        def __init__(self,interval,fn,*args,**kwargs):
            pass
        
        def start(self):
            pass
        
        def cancel(self):
            pass
        

    完整实现

    from threading import Event,Thread
    import logging
    import datetime
    logging.basicConfig(level = logging.INFO)
    
    def add(x:int,y:int):
        logging.info(x+y)
        
    class Timer():
        def __init__(self,interval,fn,*args,**kwargs):
            self.interval = interval
            self.fn = fn
            self.args = args
            self.kwargs = kwargs
            self.event = Event()
        
        def start(self):
            Thread(target = self.__run).start()
        
        def cancel(self):
            self.event.set()
            
        def __run(self):
            start = datetime.datetime.now()
            logging.info("waiting")
            
            self.event.wait(self.interval)
            if not self.event.is_set():
                self.fn(*self.args,**self.kwargs)
            delta = (datetime.datetime.now()  - start).total_seconds()
            logging.info("finished {} ".format(delta))
            self.event.set()
            
    t = Timer(10,add,4,50)
    t.start()
    e = Event()
    e.wait(4)
    t.cancel()
    print("============")
        

    或者

    from threading import Event,Thread
    import logging
    import datetime
    logging.basicConfig(level = logging.INFO)
    
    def add(x:int,y:int):
        logging.info(x+y)
        
    class Timer():
        def __init__(self,interval,fn,*args,**kwargs):
            self.interval = interval
            self.fn = fn
            self.args = args
            self.kwargs = kwargs
            self.event = Event()
        
        def start(self):
            Thread(target = self.__run).start()
        
        def cancel(self):
            self.event.set()
            
        def __run(self):
            start = datetime.datetime.now()
            logging.info("waiting")
            
            
            if not self.event.wait(self.interval):
                self.fn(*self.args,**self.kwargs)
            delta = (datetime.datetime.now()  - start).total_seconds()
            logging.info("finished {} ".format(delta))
            self.event.set()
            
    t = Timer(10,add,4,50)
    t.start()
    e = Event()
    e.wait(4)
    t.cancel()
    print("============")
        

    lock

    锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。

    需求:

    订单要求生产1000个杯子, 组织10个工人生产。

    import logging
    import threading
    
    logging.basicConfig(level=logging.INFO)
    
    #10个人生产100个杯子
    
    cups = []
    
    def worker(task = 100):
        while True:
            count = len(cups)
            logging.info(count)
            if count>=task:
                break
            cups.append(1)
            logging.info("{} make 1".format(threading.current_thread().name))
        logging.info("{}".format(cups))
    
    for _ in range(10):
        threading.Thread(target = worker,args=(100,)).start()

     从结果截图可以看出,最后的结果是104,并不是100,因为线程都在访问cups,也就是都在访问cups,这就导致了多生产。也就是接近临界点的时候,都在生产。

    import threading
    from threading import Thread,Lock
    import logging
    import time
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format = FORMAT,level = logging.INFO)
    
    cups = []
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format = FORMAT,level = logging.INFO)
    
    cups = []
    
    def worker(count = 10):
        logging.info("i'm working for u")
        while len(cups)<count:
            time.sleep(0.0001)#为了看出线程切换效果
            cups.append(1)
        logging.info("i finished . cups = {}".format(len(cups)))
        
    for _ in range(10):
        Thread(target = worker,args = (1000,)).start()

    从上例的运行结果来看,多线程调度,导致了判断失效,多生产了杯子。如何修改?加锁

    lock

    锁,一旦线程获得锁,其他视图获取锁的线程将被阻塞。

    import threading
    
    lock = threading.Lock()
    
    lock.acquire()#拿到锁
    
    print("get locker")
    lock.release()
    print("release locker")
    
    结果为:
    get locker
    release locker

    上面拿到锁了,然后打印,再然后释放锁,再打印。

    import threading
    
    lock = threading.Lock()
    
    lock.acquire()#拿到锁
    
    print("get locker1")
    lock.acquire()
    print("get locker2")
    lock.release()
    print("release locker")
    
    结果为:
    get locker1

    上面拿到锁了,没有释放,又在继续拿锁,就被阻塞咯。等待释放。不能在往后面执行。

    import logging
    import threading
    
    
    logging.basicConfig(level=logging.INFO)
    
    #10个人生产100个杯子
    
    cups = []
    lock = threading.Lock()
    
    def worker(lock:threading.Lock ,task =100):
        while True:
            lock.acquire()
            count = len(cups)
            lock.release()
            logging.info(count)
            if count>=task:
                break
            lock.acquire()
            cups.append(1)
            lock.release()
            logging.info("{} make 1".format(threading.current_thread().name))
        logging.info("{}".format(cups))
    
    for _ in range(10):
        threading.Thread(target = worker,args=(lock,100)).start()

    上面这样的代码,最后的执行结果还是不正确,因为从业务上来说是不正确的。

    import logging
    import threading
    
    
    logging.basicConfig(level=logging.INFO)
    
    #10个人生产100个杯子
    
    cups = []
    lock = threading.Lock()
    
    def worker(lock:threading.Lock ,task =100):
        while True:
            lock.acquire()
            count = len(cups)
    
            logging.info(count)
            if count>=task:
                break
            
            cups.append(1)
            lock.release()
            logging.info("{} make 1".format(threading.current_thread().name))
        logging.info("{}".format(cups))
    
    for _ in range(10):
        threading.Thread(target = worker,args=(lock,100)).start()

    这样写才是正确的。但是效率还是不太高。还有点问题!因为到中间的位置,被break了,这个时候根本没有释放锁。

     看结果,程序根本就没有结束。直接阻塞咯。

    import logging
    import threading
    
    
    logging.basicConfig(level=logging.INFO)
    
    #10个人生产100个杯子
    
    cups = []
    lock = threading.Lock()
    
    def worker(lock:threading.Lock ,task =100):
        while True:
            lock.acquire()
            count = len(cups)
    
            logging.info(count)
            if count>=task:
                lock.release()
                break
    
            cups.append(1)
            lock.release()
            logging.info("{} make 1".format(threading.current_thread().name))
        logging.info("{}".format(cups))
    
    for _ in range(10):
        threading.Thread(target = worker,args=(lock,100)).start()

    这样才可以咯。

    acquire(blocking = True,timeout = -1):默认阻塞,阻塞可以设置超时时间,非阻塞时,timeout禁止设置。成功获取锁,返回true,否则返回false。

    release():释放锁。可以从任何线程调用释放,已上锁的锁,会被重置为unlocked未上锁的锁上调用,抛runtimeerror异常。

    上例的锁的实现

    import threading
    from threading import Thread,Lock
    import logging
    import time
    
    FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
    logging.basicConfig(format = FORMAT,level = logging.INFO)
    
    cups = []
    lock = Lock()
    
    def worker(count = 10):
        logging.info("i'm working for u")
        flag = False
        while True:
            lock.acquire()#获取锁
            
            if len(cups)>=count:
                flag = True
            #lock.release()#1 这里释放锁?
            time.sleep(0.0001)#为了看出线程切换效果
            
            if not flag:
                cups.append(1)
            #lock.release()#2 这里释放锁?
            
            if flag:
                break
                
        logging.info("i finished . cups = {}".format(len(cups)))
        
    for _ in range(10):
        Thread(target = worker,args = (1000,)).start()

    思考,上面的代码中,共有两处可以释放锁,放在何处合适?

    假设位置1的lock.release()合适,分析如下:

    有一个时刻,在某一个线程中len(cups)正好是999,flag= true,释放锁,正好线程被打断。另一个线程判断发现也是999,flag= true,可能线程被打断。可能另外一个线程也判断是999,flag也设置为true。这3个线程只要继续执行到cups.append(1),一定会导致cups的长度超过1000.

    假设位置2的lock.release()合适,分析如下:

    在某一个时刻,len(cups)正好是999,flag= true,其他线程试图访问这段代码的线程都阻塞获取不到锁,直到当前线程安全的增加了一个数据,然后释放锁,其他线程有一个抢到锁,但发现已经1000了,只好break打印退出。再其他线程 都一样,发现已经1000咯。都退出了。

    所以位置2释放锁是正确的。

    但是我们发现锁保证了数据完整性,但是性能下降很多。

    上例中if flag:break是为了保证release方法被执行,否则,就出现了死锁,得到的锁永远没有释放锁。

    计数器类,可以加,可以减。

    import threading
    from threading import Thread,Lock
    import time
    
    class Counter:
        def __init__(self):
            self._val = 0
    
        @property
        def value(self):
            return self._val
    
        def inc(self):
            self._val+=1
    
        def dec(self):
            self._val-=1
    
    def run(c:Counter,count = 100):#重复了100次,50次加,50次减,最后结果应该0.
        for _ in range(count):
            for i in range(-50,50):
                if i<0:
                    c.dec()
                else:
                    c.inc()
    
    
    c = Counter()
    c1 = 10 #线程数
    c2 = 1000
    
    for i in range(c1):
        Thread(target=run,args =(c,c2)).start()
    
    print(c.value)#这一句有问题,可能线程还没有启动,这一句就完了。可以让主线程睡一会。

    c1 取10、100、1000看看

    c2取10、100、1000看看。当是1000的时候,结果就不对了。而且应该注意,最后一句打印的结果不一定是最终结果,可能线程还在运行。

    self._val+=1或self._val-=1在线程执行的时候,有可能被打断。

    要加锁,怎么加?

    加锁,解锁

    一般来说,加锁就需要解锁,但是加锁后,解锁前,还要有一些代码执行,就有可能会抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

    加锁,解锁常用语句:

    1. 使用try……finally语句保证锁的释放。
    2. with上下文管理,锁对象支持上下文管理。

    改造counter类,如下:

    import threading
    from threading import Thread,Lock
    import time
    
    class Counter:
        def __init__(self):
            self._val = 0
            self.__lock = Lock()
    
        @property
        def value(self):
            with self.__lock:
                return self._val
    
        def inc(self):
            try:
                self.__lock.acquire()
                self._val+=1
            finally:
                self.__lock.release()
    
        def dec(self):
            with self.__lock:
                self._val -= 1
    
    
    def run(c:Counter,count = 100):
        for _ in range(count):
            for i in range(-50,50):
                if i<0:
                    c.dec()
                else:
                    c.inc()
    
    
    c = Counter()
    c1 = 10 #线程数
    c2 = 1000
    
    for i in range(c1):
        Thread(target=run,args =(c,c2)).start()
    
    print(c.value)#这一句合适吗?
    

    最后一句修改如下:

    import threading
    from threading import Thread,Lock
    import time
    
    class Counter:
        def __init__(self):
            self._val = 0
            self.__lock = Lock()
    
        @property
        def value(self):
            with self.__lock:#__enter__和__exit__
                return self._val
    
        def inc(self):
            try:
                self.__lock.acquire()
                self._val+=1
            finally:
                self.__lock.release()
    
        def dec(self):
            with self.__lock:
                self._val -= 1
    
    
    def run(c:Counter,count = 100):
        for _ in range(count):
            for i in range(-50,50):
                if i<0:
                    c.dec()
                else:
                    c.inc()
    
    
    c = Counter()
    c1 = 10 #线程数
    c2 = 1000
    
    for i in range(c1):
        Thread(target=run,args =(c,c2)).start()
    
    while True:
        time.sleep(1)
        if threading.active_count() ==1:
            print(threading.enumerate())
            print(c.value)
            break
        else:
            print(threading.enumerate())

    print(v.value)这一句在主线程中,很早就执行完了。退出条件是,只剩下主线程的时候。

    锁的应用场景

    锁使用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

    如果全部都是读取同一个共享资源需要锁吗?

    不需要,因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁。

    使用锁的注意事项:

    • 少用锁,必要时用锁,使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。
    • 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。
    • 加锁时间越短越好,不需要就立即释放锁。
    • 一定要避免死锁。

    不使用锁,有了效率,但是结果是错的。

    使用了锁,效率低下,但是结果是正确的。

    所以,我们是为了效率要错误的结果呢?还是为了正确的结果,让计算机去计算?

    非阻塞锁使用

    import  threading
    import time
    import logging
    
    FORMAT = "%(asctime)-15s	 [%(threadName)s,%(thread)8d] %(message)s"
    logging.basicConfig(level=logging.INFO,format = FORMAT)
    
    def worker(tasks):
        for task in tasks:
            time.sleep(0.001)
            if task.lock.acquire(False):#获取锁则返回true
                logging.info("{} {} begin to start".format(threading.current_thread(),task.name))
            #适当的时机释放锁
            else:
                logging.info(" {} {} is working".format(threading.current_thread(),task.name))
    
    class Task:
        def __init__(self,name):
            self.name = name
            self.lock = threading.Lock()
    
    #构造10个任务
    tasks  = [Task("task - {}".format(x)) for x in range(10)]
    
    #启动5个线程
    for i in range(5):
        threading.Thread(target=worker,name="worker - {}".format(i),args=(tasks,)).start()

    可重入锁RLock

    可重入锁,是线程相关的锁。

    线程A获得可重复锁,并可以多次成功获取,不会阻塞,最后要在线程A中做和asquire次数相同的release.

    import threading
    
    lock = threading.RLock()
    
    ret = lock.acquire()
    print(ret)
    
    ret = lock.acquire()
    print(ret)

    结果为:

    True
    True

    没有释放锁,后面还是拿到了锁。

    import threading
    import time
    
    lock = threading.RLock()
    print(lock.acquire())
    print("________________________")
    print(lock.acquire(blocking=False))
    print(lock.acquire())
    print(lock.acquire(timeout = 3.55))
    print(lock.acquire(blocking=False))
    #print(lock.acquire(blocking=False,timeout=10))#异常
    lock.release()
    lock.release()
    lock.release()
    lock.release()
    print("main thread {}".format(threading.current_thread().ident))
    print("locked in main thread {}".format(lock))#注意观察lock对象的信息
    lock.release()
    #lock.release()#多了一次
    print("========================")
    print()
    
    print(lock.acquire(blocking=False))#1次
    #threading.Timer(3,lambda  x:x.release(),args=(lock,)).start()#跨线程了,异常
    lock.release()
    print("```````````````````````````````````")
    print()
    
    #测试多线程
    print(lock.acquire())
    def sub(l):
        print("{}:{}".format(threading.current_thread(),l.acquire()))#阻塞
        print("{}:{}".format(threading.current_thread(), l.acquire(False)))
        print("lock in sub thread {}".format(lock))
        l.release()
        print("sub 1")
        l.release()
        print("sub 2")
        #l.release()#多了一次
    
    threading.Timer(2,sub, args=(lock,)).start()#传入同一个lock对象
    print("+++++++++++++++++++++++++++++")
    print()
    
    print(lock.acquire())
    
    lock.release()
    time.sleep(5)
    print("释放主线程锁")
    lock.release()

    结果为:

    True
    ________________________
    True
    True
    True
    True
    main thread 12652
    locked in main thread <locked _thread.RLock object owner=12652 count=1 at 0x000000000217B418>
    ========================

    
    

    True
    ```````````````````````````````````

    
    

    True
    +++++++++++++++++++++++++++++

    
    

    True
    释放主线程锁
    <Timer(Thread-1, started 15028)>:True
    <Timer(Thread-1, started 15028)>:True
    lock in sub thread <locked _thread.RLock object owner=15028 count=2 at 0x000000000217B418>
    sub 1
    sub 2

     

    可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一个线程中不阻塞获取锁。当锁未释放完,其他线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。

    Condition

    构造方法Condition(lock = None),可以传入一个lock或rlock对象,默认是rlock。

    acquire(*args):获取锁

    wait(self,timeout = None):等待或超时

    notify(n = 1):唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作。

    notify_all():唤醒所有等待的线程

    Conditin用于生产者,消费者模型,为了解决生产者消费者速度匹配问题。

    先看一个例子,消费者消费速度大于生产者生产速度。

    from threading import Thread,Event
    import logging
    import random
    
    FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
    logging.basicConfig(format = FORMAT,level=logging.INFO)
    
    #此例只是为了演示,不考虑线程安全问题。
    
    class Dispatcher():
        def __init__(self):
            self.data = None
            self.event = Event()#event只是为了使用方便,与逻辑无关
    
        def produce(self,total):
            for _  in range(total):
                data = random.randint(0,100)
                logging.info(data)
                self.data = data
                self.event.wait(1)
            self.event.set()
    
        def consum(self):
            while not self.event.is_set():
                data = self.data
                logging.info("recieved {}".format(data))
                self.data = None
                self.event.wait(0.5)
    
    d = Dispatcher()
    p = Thread(target=d.produce,args=(10,),name="producer")
    c = Thread(target=d.consum,name="consumer")
    c.start()
    p.start()

    结果为:

    2019-11-26 16:39:02,354 consumer (thread)d recieved None
    2019-11-26 16:39:02,355 producer (thread)d 100
    2019-11-26 16:39:02,855 consumer (thread)d recieved 100
    2019-11-26 16:39:03,369 producer (thread)d 37
    2019-11-26 16:39:03,369 consumer (thread)d recieved 37
    2019-11-26 16:39:03,869 consumer (thread)d recieved None
    2019-11-26 16:39:04,383 producer (thread)d 20
    2019-11-26 16:39:04,383 consumer (thread)d recieved None
    2019-11-26 16:39:04,884 consumer (thread)d recieved None
    2019-11-26 16:39:05,397 producer (thread)d 65
    2019-11-26 16:39:05,397 consumer (thread)d recieved 65
    2019-11-26 16:39:05,897 consumer (thread)d recieved None
    2019-11-26 16:39:06,411 consumer (thread)d recieved None
    2019-11-26 16:39:06,412 producer (thread)d 9
    2019-11-26 16:39:06,912 consumer (thread)d recieved 9
    2019-11-26 16:39:07,425 producer (thread)d 66
    2019-11-26 16:39:07,425 consumer (thread)d recieved 66
    2019-11-26 16:39:07,925 consumer (thread)d recieved None
    2019-11-26 16:39:08,439 consumer (thread)d recieved None
    2019-11-26 16:39:08,439 producer (thread)d 67
    2019-11-26 16:39:08,939 consumer (thread)d recieved 67
    2019-11-26 16:39:09,453 consumer (thread)d recieved None
    2019-11-26 16:39:09,453 producer (thread)d 58
    2019-11-26 16:39:09,953 consumer (thread)d recieved 58
    2019-11-26 16:39:10,467 producer (thread)d 24
    2019-11-26 16:39:10,467 consumer (thread)d recieved 24
    2019-11-26 16:39:10,967 consumer (thread)d recieved None
    2019-11-26 16:39:11,481 consumer (thread)d recieved None
    2019-11-26 16:39:11,481 producer (thread)d 61
    2019-11-26 16:39:11,981 consumer (thread)d recieved 61

     

    这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。

    能否换成一种通知机制,有数据通知消费者来消费呢?

    使用Condition对象

    from threading import Thread, Event, Condition
    import logging
    import random
    
    FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    
    # 此例只是为了演示,不考虑线程安全问题。
    
    class Dispatcher():
        def __init__(self):
            self.data = None
            self.event = Event()  # event只是为了使用方便,与逻辑无关
            self.cond = Condition()
    
        def produce(self, total):
            for _ in range(total):
                data = random.randint(0, 100)
                with self.cond:
                    logging.info(data)
                    self.data = data
                    self.cond.notify_all()
                self.event.wait(1)#模拟产生数据速度
            self.event.set()
    
        def consum(self):
            while not self.event.is_set():
                with self.cond:
                    self.cond.wait()#阻塞等通知
                    logging.info("recieved {}".format(self.data))
                    self.data = None
                self.event.wait(0.5)
    
    
    d = Dispatcher()
    p = Thread(target=d.produce, args=(10,), name="producer")
    c = Thread(target=d.consum, name="consumer")
    c.start()
    p.start()

    结果为:


    2019-11-26 16:40:26,151 producer (thread)d 23
    2019-11-26 16:40:26,151 consumer (thread)d recieved 23
    2019-11-26 16:40:27,158 producer (thread)d 71
    2019-11-26 16:40:27,158 consumer (thread)d recieved 71
    2019-11-26 16:40:28,172 producer (thread)d 35
    2019-11-26 16:40:28,172 consumer (thread)d recieved 35
    2019-11-26 16:40:29,186 producer (thread)d 15
    2019-11-26 16:40:29,186 consumer (thread)d recieved 15
    2019-11-26 16:40:30,200 producer (thread)d 26
    2019-11-26 16:40:30,201 consumer (thread)d recieved 26
    2019-11-26 16:40:31,215 producer (thread)d 42
    2019-11-26 16:40:31,215 consumer (thread)d recieved 42
    2019-11-26 16:40:32,229 producer (thread)d 63
    2019-11-26 16:40:32,229 consumer (thread)d recieved 63
    2019-11-26 16:40:33,243 producer (thread)d 50
    2019-11-26 16:40:33,243 consumer (thread)d recieved 50
    2019-11-26 16:40:34,257 producer (thread)d 50
    2019-11-26 16:40:34,257 consumer (thread)d recieved 50
    2019-11-26 16:40:35,271 producer (thread)d 50
    2019-11-26 16:40:35,271 consumer (thread)d recieved 50

     

    上例中,消费者等待数据等待,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪。

    如果是1个生产者,多个消费者怎么改?

    from threading import Thread, Event, Condition
    import logging
    import random
    
    FORMAT = "%(asctime)s %(threadName)s (thread)d %(message)s"
    logging.basicConfig(format=FORMAT, level=logging.INFO)
    
    
    # 此例只是为了演示,不考虑线程安全问题。
    
    class Dispatcher():
        def __init__(self):
            self.data = None
            self.event = Event()  # event只是为了使用方便,与逻辑无关
            self.cond = Condition()
    
        def produce(self, total):
            for _ in range(total):
                data = random.randint(0, 100)
                with self.cond:
                    logging.info(data)
                    self.data = data
                    self.cond.notify_all()
                self.event.wait(1)#模拟产生数据速度
            self.event.set()
    
        def consum(self):
            while not self.event.is_set():
                with self.cond:
                    self.cond.wait()#阻塞等通知
                    logging.info("recieved {}".format(self.data))
                self.event.wait(0.5)#模拟消费速度
    
    
    d = Dispatcher()
    p = Thread(target=d.produce, args=(10,), name="producer")
    for i in range(5):
        c = Thread(target=d.consum,name="consumer-{}".format(i))
        c.start()
    p.start()

    2019-11-26 16:41:38,536 producer (thread)d 26
    2019-11-26 16:41:38,536 consumer-4 (thread)d recieved 26
    2019-11-26 16:41:38,536 consumer-3 (thread)d recieved 26
    2019-11-26 16:41:38,536 consumer-0 (thread)d recieved 26
    2019-11-26 16:41:38,537 consumer-2 (thread)d recieved 26
    2019-11-26 16:41:38,537 consumer-1 (thread)d recieved 26
    2019-11-26 16:41:39,543 producer (thread)d 80
    2019-11-26 16:41:39,543 consumer-1 (thread)d recieved 80
    2019-11-26 16:41:39,544 consumer-4 (thread)d recieved 80
    2019-11-26 16:41:39,544 consumer-2 (thread)d recieved 80
    2019-11-26 16:41:39,544 consumer-3 (thread)d recieved 80
    2019-11-26 16:41:39,544 consumer-0 (thread)d recieved 80
    2019-11-26 16:41:40,557 producer (thread)d 17
    2019-11-26 16:41:40,557 consumer-4 (thread)d recieved 17
    2019-11-26 16:41:40,557 consumer-2 (thread)d recieved 17
    2019-11-26 16:41:40,557 consumer-0 (thread)d recieved 17
    2019-11-26 16:41:40,558 consumer-1 (thread)d recieved 17
    2019-11-26 16:41:40,558 consumer-3 (thread)d recieved 17
    2019-11-26 16:41:41,571 producer (thread)d 73
    2019-11-26 16:41:41,571 consumer-1 (thread)d recieved 73
    2019-11-26 16:41:41,572 consumer-3 (thread)d recieved 73
    2019-11-26 16:41:41,572 consumer-0 (thread)d recieved 73
    2019-11-26 16:41:41,573 consumer-2 (thread)d recieved 73
    2019-11-26 16:41:41,573 consumer-4 (thread)d recieved 73
    2019-11-26 16:41:42,585 producer (thread)d 4
    2019-11-26 16:41:42,585 consumer-3 (thread)d recieved 4
    2019-11-26 16:41:42,585 consumer-2 (thread)d recieved 4
    2019-11-26 16:41:42,586 consumer-0 (thread)d recieved 4
    2019-11-26 16:41:42,586 consumer-4 (thread)d recieved 4
    2019-11-26 16:41:42,586 consumer-1 (thread)d recieved 4
    2019-11-26 16:41:43,599 producer (thread)d 100
    2019-11-26 16:41:43,599 consumer-0 (thread)d recieved 100
    2019-11-26 16:41:43,599 consumer-4 (thread)d recieved 100
    2019-11-26 16:41:43,599 consumer-2 (thread)d recieved 100
    2019-11-26 16:41:43,600 consumer-1 (thread)d recieved 100
    2019-11-26 16:41:43,600 consumer-3 (thread)d recieved 100
    2019-11-26 16:41:44,613 producer (thread)d 68
    2019-11-26 16:41:44,614 consumer-1 (thread)d recieved 68
    2019-11-26 16:41:44,614 consumer-2 (thread)d recieved 68
    2019-11-26 16:41:44,615 consumer-0 (thread)d recieved 68
    2019-11-26 16:41:44,615 consumer-4 (thread)d recieved 68
    2019-11-26 16:41:44,616 consumer-3 (thread)d recieved 68
    2019-11-26 16:41:45,626 producer (thread)d 8
    2019-11-26 16:41:45,626 consumer-0 (thread)d recieved 8
    2019-11-26 16:41:45,626 consumer-1 (thread)d recieved 8
    2019-11-26 16:41:45,626 consumer-2 (thread)d recieved 8
    2019-11-26 16:41:45,627 consumer-3 (thread)d recieved 8
    2019-11-26 16:41:45,627 consumer-4 (thread)d recieved 8
    2019-11-26 16:41:46,640 producer (thread)d 15
    2019-11-26 16:41:46,640 consumer-0 (thread)d recieved 15
    2019-11-26 16:41:46,641 consumer-4 (thread)d recieved 15
    2019-11-26 16:41:46,641 consumer-3 (thread)d recieved 15
    2019-11-26 16:41:46,641 consumer-2 (thread)d recieved 15
    2019-11-26 16:41:46,641 consumer-1 (thread)d recieved 15
    2019-11-26 16:41:47,661 producer (thread)d 48
    2019-11-26 16:41:47,662 consumer-2 (thread)d recieved 48
    2019-11-26 16:41:47,662 consumer-1 (thread)d recieved 48
    2019-11-26 16:41:47,663 consumer-3 (thread)d recieved 48
    2019-11-26 16:41:47,663 consumer-4 (thread)d recieved 48
    2019-11-26 16:41:47,663 consumer-0 (thread)d recieved 48

     

    self.cond.notify_all()#发通知

    修改为self.cond.notify(2)

    试一试看看效果?

    这个例子,可以看到实现了消息的一对多,这其实就是广播模式。

    注:上例中,程序本身不是线程安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解condition的使用,和生产者消费者模型。

    Condition总结

    Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

    采用了通知机制,非常有效率。

    使用方式

    使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文。

    消费者wait,等待通知。

    生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

     
  • 相关阅读:
    Power of Cryptography
    Radar Installation
    Emag eht htiw Em Pleh
    Help Me with the Game
    89. Gray Code
    87. Scramble String
    86. Partition List
    85. Maximal Rectangle
    84. Largest Rectangle in Histogram
    82. Remove Duplicates from Sorted List II
  • 原文地址:https://www.cnblogs.com/xpc51/p/11925053.html
Copyright © 2011-2022 走看看