zoukankan      html  css  js  c++  java
  • python中线程和进程(二)

    线程同步

    线程同步,即线程之间协同工作,一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

    不同的操作系统实现的技术有所不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件(Event)等。

    Event

    Event是线程间通信的最简单的方法:通过一个线程发出信号,其他线程等待它。

    Event对象管理一个内部标志,该标志通过set()方法设置为true,也可以通过clear()方法重置为false,使用wait()方法时,将会被阻塞,直到该标志为真的时候,才放行。该标志一开始是false。

    名称 含义
    is_set() 当内部标志为真时,返回true
    set() 将内部标志设置为true,此时所有wait()线程将不会阻塞(唤醒)
    clear() 将内部标志重置为false,此时,线程将会调用wait()阻塞,直到set()被用。
    wait(timeout=None) 阻塞,直到内部标志为真,或者超时。
    from threading import Event,Thread
    import logging
    import time
    
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    
    def boss(event:Event):
        logging.info("I'm boss,wait...")
        event.wait()  # 阻塞,直到为true
        logging.info("good,thanks!")
    
    
    def worker(event:Event,counts):
        logging.info("I'm working for boss.")
        cups = []
        while True:
            logging.info("make 1 cup")
            cups.append(1)
            time.sleep(0.5)
            logging.info(cups)
            if len(cups) >= counts:
                event.set()  # 完成,标记为true
                break
        logging.info("I finished my job,cups={}".format(cups))
    
    
    event = Event()
    b = Thread(target=boss,args=(event,))
    w = Thread(target=worker,args=(event,10))
    w.start()
    b.start()
    

    Lock

    锁,如果存在共享资源争抢的问题,就可以使用锁,从而保证一个使用者可以完全拥有这个资源。

    Lock拥有两种基本方法,acquire()release()acquire将状态改为锁定,此时将会阻塞,直到调用relase()解锁。relase()方法只能在锁定的时候调用,否则将会应发RuntimeError异常。锁还支持上下文管理协议,锁的所有方法都是原子方式进行。

    名称 含义
    acquire(blocking=True,timeout=-1) 默认阻塞,阻塞是可以设置超时时间,返回True,否则返回False
    release() 解锁,可以从任何线程调用,而不仅仅是获取锁的线程

    例子:车间生产1000个杯子,组织10人生产时,结果会超出数量(没有使用锁)

    from threading import Thread,Lock
    import logging
    import time
    
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    
    cups = []
    def worker(counts=10):
        logging.info("I'm working for boss")
        while len(cups) < counts:
            time.sleep(0.0002)
            cups.append(1)
        logging.info("I'm finished,cups={}".format(len(cups)))
    
    for _ in range(10):
        Thread(target=worker,args=(1000,)).start()
    
    # 结果
    ''''''
    2019-03-06 07:31:50,884 Thread-10 11052 I'm working for boss
    2019-03-06 07:31:51,084 Thread-3 7108 I'm finished,cups=1000
    2019-03-06 07:31:51,084 Thread-7 21188 I'm finished,cups=1001
    2019-03-06 07:31:51,084 Thread-8 11592 I'm finished,cups=1002
    2019-03-06 07:31:51,085 Thread-5 18288 I'm finished,cups=1003
    2019-03-06 07:31:51,085 Thread-10 11052 I'm finished,cups=1004
    ''''''
    

    使用锁后。

    from threading import Thread,Lock
    import logging
    import time
    
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    
    cups = []
    lock = Lock()
    
    
    def worker(counts=10):
        logging.info("I'm working for boss")
        flag = False
        while True:
            lock.acquire() # 加锁
            if len(cups) >= counts:
                flag = True
            time.sleep(0.0002)
            if not flag:
                cups.append(1)
            lock.release() #生产完之后,解锁
            if flag:
                break
    
        logging.info("I'm finished,cups={}".format(len(cups)))
    
    for _ in range(10):
        Thread(target=worker,args=(1000,)).start()
    

    RLock

    RLock:可重入锁,是线程相关的锁。
    线程A获得可重入锁,可以多次获取,不会阻塞,但是最后在线程A中做和acquire次数相同的release才会释放。当锁没有释放完,其他线程取锁就会阻塞。

    from threading import Thread,Lock,RLock
    import logging
    import time
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    
    def say(l:RLock):
        l.acquire()  # 获取一次
        logging.info("acquire1")
        l.acquire()  # 获取2次
        logging.info("acquire2")
        l.release()  # 释放1次
        logging.info("release1")
        l.release()  # 释放2次
        logging.info("release2")
    
    rlock = RLock()
    t1 = Thread(target=say,args=(rlock,),name="t1")
    t2 = Thread(target=say,args=(rlock,),name="t2")
    t1.start()
    t2.start()
    # 结果
    2019-03-06 20:41:40,830 t1 2036 acquire1
    2019-03-06 20:41:40,830 t1 2036 acquire2
    2019-03-06 20:41:40,830 t1 2036 release1
    2019-03-06 20:41:40,830 t1 2036 release2
    2019-03-06 20:41:40,831 t2 31984 acquire1
    2019-03-06 20:41:40,831 t2 31984 acquire2
    2019-03-06 20:41:40,831 t2 31984 release1
    2019-03-06 20:41:40,831 t2 31984 release2
    

    从结果也可以看出来,只有t1线程完全释放了相同次数的锁,t2线程才能获得锁。

    Condition

    条件变量Condition,总是和某种锁(通常是RLock)相互关联。条件变量也支持上下文管理协议。

    名称 含义
    acquire() 获取锁
    release() 释放锁
    wait(timeout=None) 等待通知或者直到超时,调用次方法前必须先获得锁
    notify(n=1) 唤醒指定数目的等待线程,如果调用此方法时,线程没有锁定,将会引发异常。
    notify_all() 唤醒所有的线程

    生产者消费者模型:

    from threading import Event,Thread,Condition
    import logging
    import time
    import random
    
    
    class Dispatcher:
        def __init__(self):
            self.data = 0
            self.cond = Condition()
    
        def produce(self):
            print("开始生产包子")
            while True:
                with self.cond:
                    self.data += 1
                    print("生产了一个包子,现在有{}个。".format(self.data))
                    self.cond.notify()
                time.sleep(1)
    
        def consume(self):
            print("开始买包子了...")
            while True:
                with self.cond:
                    if self.data <= 5:
                        self.cond.wait()
                        print("包子太少了,等着了...")
                    else:
                        self.data -= 1
                        print("我买了1个包子,还有{}个。".format(self.data))
                        self.cond.notify()
                time.sleep(2)
    
    
    
    d = Dispatcher()
    for i in range(2):
        Thread(target=d.produce).start()
    for j in range(4):
        Thread(target=d.consume).start()
    
    

    结果

    开始生产包子
    生产了一个包子,现在有1个。
    开始生产包子
    生产了一个包子,现在有2个。
    开始买包子了...
    开始买包子了...
    开始买包子了...
    开始买包子了...
    生产了一个包子,现在有3个。
    包子太少了,等着了...
    生产了一个包子,现在有4个。
    包子太少了,等着了...
    生产了一个包子,现在有5个。
    包子太少了,等着了...
    生产了一个包子,现在有6个。
    包子太少了,等着了...
    生产了一个包子,现在有7个。
    我买了1个包子,还有6个。
    生产了一个包子,现在有7个。
    我买了1个包子,还有6个。
    生产了一个包子,现在有7个。
    我买了1个包子,还有6个。
    生产了一个包子,现在有7个。
    

    使用方法:
    使用Condition,必须先要获取锁,用完以后要释放,通常是使用with上下文,消费者如果不满足条件,wait等待通知,生产者生产好,对消费者发通知。

    Barrier

    barrier,提供了简单的同步给需要相互等待的固定数量的线程使用,每个线程都会试图通过wait()方法来传递屏障,并将阻塞直到满足条件的线程数量都进行了调用,然后,将这些线程同时释放。

    名称 含义
    Barrier(parties, action=None, timeout=None) 构建Barrier对象,指定参与的数目。
    n_waiting 当前屏障中等待的线程数
    parties 通过屏障需要等待线程数量
    wait(time=None) 等待通过屏障,返回0到线程-1的整数,每个线程返回不同。如果wiat方法设置了超时,并会超时发送,屏障将处于broken状态
    reset() 将屏障恢复为默认的空状态
    broken() 一个布尔值,True代表屏障处于断开状态。
    abort() 屏障处于破碎状态,会导致任何等待或者调用的等待的方法的线程失败,抛出BrokenBarrierError。
    from threading import Thread,Barrier,BrokenBarrierError,Event
    import logging
    import time
    
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    
    def worker(barrier:Barrier):
        logging.info("waiting for {} threads.".format(barrier.n_waiting))
        try:
            barrier_id = barrier.wait()
            logging.info('after barrier {}'.format(barrier_id))
        except BrokenBarrierError:
            logging.info("Broken Barrier")
    
    barrier = Barrier(3)
    for x in range(6): #换成其他数
        Thread(target=worker,name="worker-{}".format(x),args=(barrier,)).start()
    
    

    结果:

    2019-03-07 06:38:40,697 worker-0 26024 waiting for 0 threads.
    2019-03-07 06:38:40,698 worker-1 4248 waiting for 1 threads.
    2019-03-07 06:38:40,698 worker-2 23484 waiting for 2 threads.
    2019-03-07 06:38:40,699 worker-2 23484 after barrier 2
    2019-03-07 06:38:40,699 worker-0 26024 after barrier 0
    2019-03-07 06:38:40,699 worker-1 4248 after barrier 1
    2019-03-07 06:38:40,701 worker-3 29924 waiting for 0 threads.
    2019-03-07 06:38:40,702 worker-4 29744 waiting for 1 threads.
    2019-03-07 06:38:40,703 worker-5 1156 waiting for 2 threads.
    2019-03-07 06:38:40,703 worker-5 1156 after barrier 2
    2019-03-07 06:38:40,704 worker-3 29924 after barrier 0
    2019-03-07 06:38:40,704 worker-4 29744 after barrier 1
    

    semaphore

    信号量(semaphore)和Lock很像,信号量内部维护一个倒数计数器,每一次acquire都会减一,当计数为0时就会阻塞请求的线程,直到其他线程对信号量release后,计数如果大于0,继续恢复阻塞的线程。

    名称 含义
    Semaphore(value=1) 构造方法,value小于0,抛出ValueError
    acquire(blocking=True,timeout=None) 获取信号量,计数器减1,获取成功返回True
    release() 释放信号量,计数器加1
    from threading import Thread,Semaphore
    import time
    import logging
    
    
    
    FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    def task():
        with sm:
            logging.info("I get key...")
            time.sleep(4)
    
    
    sm = Semaphore(5)
    for _ in range(20):
        Thread(target=task).start()
    

    GIL

    CPython在解释器进程级别有一把锁,叫全局解释器锁(GIL)。GIL保证在Cpython进程中,只有一个线程执行字节码,在多核CPU的情况下,也是如此。

    在CPython中,如果运行IO密集型程序,由于线程阻塞,就会调用其他线程。如果运行CPU密集型的程序,当前的线程可能会连续获得GIL,导致其他线程几乎无法使用CPU。所以,对于IO密集型,尽量多使用多线程。对于CPU密集型,使用多线程时候,尽量绕开GIL。

  • 相关阅读:
    安卓开发_浅谈TimePicker(时间选择器)
    eclipse显示代码行数
    Java数据解析---JSON
    Java数据解析---PULL
    Java数据解析---SAX
    统计机器学习(目录)
    FP Tree算法原理总结
    梯度下降(Gradient Descent)小结
    用scikit-learn和pandas学习线性回归
    用scikit-learn学习BIRCH聚类
  • 原文地址:https://www.cnblogs.com/dianel/p/10487205.html
Copyright © 2011-2022 走看看