zoukankan      html  css  js  c++  java
  • Python学习线程间同步之一

    1、Event

    # Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化进行操作;

    # 总结:使用同一个Event对象的标记flag,默认flag=False;谁wait就是等到flag变为True,或等到超时返回False,不限制等待的个数;

    ##  模拟实现老板雇佣一个工人生产5个杯子,老板一直等待工人的工作完成 ######
    import
    threading import logging,time from threading import Event logging.basicConfig(level=10,format="%(asctime)s %(threadName)s %(thread)d %(message)s",datefmt="%Y/%m/%d-%H:%M:%S") def work(event:Event,n): logging.info("I'm working") cups = [] while True: logging.info('make 1') time.sleep(0.5) cups.append(1) if len(cups) >= n: event.set() # 完成任务后将event对象设置为True break logging.info("I' finished my job, cups={}".format(cups)) def boss(event:Event): logging.info("I'M boss waiting for you") event.wait() # boss线程阻塞,等待event对象变为True logging.info("GOOD job")

    e=Event() b
    =threading.Thread(target=boss,args=(e,),name='boss') w=threading.Thread(target=work,args=(e,5),name='worker') b.start() w.start() ''' 运行结果: 2018/10/30-21:35:23 boss 14388 I'M boss waiting for you 2018/10/30-21:35:23 worker 15244 I'm working 2018/10/30-21:35:23 worker 15244 make 1 2018/10/30-21:35:23 worker 15244 make 1 2018/10/30-21:35:24 worker 15244 make 1 2018/10/30-21:35:24 worker 15244 make 1 2018/10/30-21:35:25 worker 15244 make 1 2018/10/30-21:35:25 worker 15244 I' finished my job, cups=[1, 1, 1, 1, 1] 2018/10/30-21:35:25 boss 14388 GOOD job '''
    from threading import Thread,Event
    import logging
    
    logging.basicConfig(level=10,format="%(asctime)s %(threadName)s %(thread)d %(message)s",
                        datefmt="%Y/%m/%d-%H:%M:%S")
    
    def do(e:Event,interval:int):
        while not e.wait(interval):
            logging.info('do smthing')
    
    e=Event()
    Thread(target=do,args=(e,3),name='do-thread').start()
    e.wait(10) #  e.wait() 阻塞当前线程(当前线程为主线程),阻塞操作10秒,如果没有等到flag变为True,则当前线程继续往后执行
    
    # e.set() # 如果e不设置为True,则do线程继续执行
    print('MainThread is end')

    2、Lock

    # 锁,凡是存在资源争抢的地方都可以使用锁,从而保证只有一个使用者可以独享此资源;Lock为互斥锁;

    2.1、为什么要加锁

    # 由于线程之间的任务执行是cpu进行随机调度的,这就有可能一个线程执行了n条指令之后就被切换到别的线程,开始执行别的线程指令;当个多线程同时操作一个对象,如果没有对该对象上锁,会造成程序的执行结果不可预期,这就为“线程不安全”;

    # 为了保证多个线程操作同一个对资源象时候,对象的数据安全与完整,则需要将资源对象家锁;

    2.2、加锁与解锁

    # Lock 锁,一旦线程获得锁,其他试图获取锁的线程将被阻塞;

    # 加锁与解锁的方法:

    ①:使用try...finally语句保证锁的释放;

    ②:with上下文管理,锁对象支持上下文管理;

    l=Lock()
    cups=[]
    e=Event()
    
    def boss():
        logging.info("I'm boss,waiting for you")
        e.wait()
        print('good job,cups lens={}'.format(len(cups)))
    
    def worker():
        logging.info("I'm working")
        e.wait(1)
        with l:
            while True:
                if len(cups) >= 100:
                    logging.info('Finish JOB')
                    e.set()
                    break
                else:
                    cups.append(1)
    
    b=Thread(target=boss,name='boss')
    b.start()
    
    for i in range(10):
        w=Thread(target=worker,name='worker-{}'.format(i))
        w.start()

    2.3、非阻塞锁

    # 阻塞锁与非阻塞锁取决于在获取锁的时候是blocking=False还是True;

    # 什么时候阻塞? 当一个线程成功获取锁之后,无论是阻塞或是非阻塞都不影响该线程的继续执行;但是如果此时另外一个线程也要获取该锁,该线程的阻塞与非阻塞就要看该线程获取锁的方式了;

    def test():
        print('2,----')
        l.acquire()
        print('3,~~~~~~~~~')
    
    l=Lock()
    l.acquire()
    print('1,=======')
    Thread(target=test).start()
    
    print('===MainThreadEnd====')
    
    '''
    主线程获得锁l.acquire()对象,主线程继续执行,打印1,=======;
    继续执行到Thread的时候会启动一个工作线程,执行test函数;
    test函数内执行到l.acquire()的时候,由于主线程并没有释放锁,并且工作线程是l.acquire() 阻塞的方式获取锁,所以工作线程被阻塞;
    如果以l.acquire(False) 非阻塞的方式获取锁,则工作线程继续往下执行打印print('3,~~~~~~~~~')
    '''
    def worker(tasks):
        for task in tasks:
            time.sleep(0.001)
            if task.lock.acquire(False):
            # 获取锁则返回True, 此时如果acquire(True) 阻塞的方式,则只有1个工作线程工作,其他的5个线程都被阻塞,因为没有释放锁;
                logging.info('{} {} begin to start'.format(threading.current_thread(),task.name))
            else:
                logging.info('{} {} is working'.format(threading.current_thread(),task.name))
    
    class Task:
        def __init__(self,name):
            self.name=name
            self.lock=Lock()
    
    tasks=[Task('task-{}'.format(x)) for x in range(5)] # 构造5个任务,每个任务对象都有自己的一把锁
    # 启动5个线程
    for i in range(5):
    
        Thread(target=worker,args=(tasks,),name='worker-{}'.format(i)).start()
    
    '''
    阻塞锁acquire()运行结果:
        2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-0 begin to start
        2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-1 begin to start
        2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-2 begin to start
        2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-3 begin to start
        2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-4 begin to start
    非阻塞锁acquire(False)运行结果:
        2018/11/01-15:34:31 worker-2 1704 <Thread(worker-2, started 1704)> task-0 begin to start
        2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-0 is working
        2018/11/01-15:34:31 worker-3 2608 <Thread(worker-3, started 2608)> task-0 is working
        2018/11/01-15:34:31 worker-1 5560 <Thread(worker-1, started 5560)> task-0 is working
        2018/11/01-15:34:31 worker-0 9468 <Thread(worker-0, started 9468)> task-0 is working
        2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-1 begin to start
        2018/11/01-15:34:31 worker-2 1704 <Thread(worker-2, started 1704)> task-1 is working
        2018/11/01-15:34:31 worker-3 2608 <Thread(worker-3, started 2608)> task-1 is working
        2018/11/01-15:34:31 worker-0 9468 <Thread(worker-0, started 9468)> task-1 is working
        2018/11/01-15:34:31 worker-1 5560 <Thread(worker-1, started 5560)> task-1 is working
        2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-2 begin to start
        ....
    '''

    3、RLock

    # RLock,可重入锁,是线程相关的锁;

    # RLock本质上与Lock类似,也有阻塞和非阻塞的模式,但是RLock锁对象内部维护一个计数器count,在一个线程内可以多次成功获取,acquire()一次就count+=1;release()一次count-=1,release次数不能大于acquire次数

    # 其他线程要成功获取该锁,则需要该锁对象内部计数器count=0;

    4、Condition

    # 构造方法Condition(Lock=None),可以传入一个Lock或者RLock对象,默认是RLock;

    # Condition用于生产者消费者模型中,解决了生产者消费者速度匹配的问题;采用了通知机制,非常有效率;

    # 使用方式:

      使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with进行上下文管理(自动加锁和解锁);

      消费者wait,等待通知;

      生产者生产完成,对消费者发送通知,可以使用notify或者notify_all 方法;

    # 生产者消费者模型代码实现

    class Dispather:
        def __init__(self):
            self.data=None
            self.event=Event()
            self.cond=Condition()
        def produce(self,total):
            for _ in range(total):
                data=random.randint(0,100)
                with self.cond:
                    logging.info(data)
                    self.data=data
                    self.cond.notify_all()
                self.event.wait(1) # 模拟生产数据速度
            self.event.set() # 数据生产完成将event设置为True
    
        def consume(self):
            while not self.event.is_set(): # 默认event.is_set()为False,循环不进入,等待event.set后开始进入循环;
                with self.cond:
                    self.cond.wait() # 阻塞等待通知
                    logging.info('received {}'.format(self.data))
    
                self.event.wait(0.5) # 模拟消费速度
    
    d=Dispather()
    p=Thread(target=d.produce,args=(10,),name='producer')
    
    for i in range(2):
    
        c= Thread(target=d.consume,name='consumer-{}'.format(i))
        c.start()
    
    p.start()
  • 相关阅读:
    Python中的sorted函数以及operator.itemgetter函数
    a=a+(a++);b=b+(++b);计算顺序,反汇编
    带基虚类的构造函数执行顺序
    开源系统管理资源大合辑
    linux的LNMP架构介绍、MySQL安装、PHP安装
    lamp下mysql安全加固
    ITSS相关的名词解释
    从苦逼到牛逼,详解Linux运维工程师的打怪升级之路
    Linux 文件系统概览
    Exchange2010批量删除邮件
  • 原文地址:https://www.cnblogs.com/soulgou123/p/9879761.html
Copyright © 2011-2022 走看看