zoukankan      html  css  js  c++  java
  • Python多线程1:threading

    threading模块提供了高级别的线程接口,基于低级别的_thread模块实现。

    模块基本方法

    该模块定了的方法例如以下:
    threading.active_count()
            返回当前活跃的Thread对象数量。

    返回值和通过enumerate()返回的列表长度是相等的。
    threading.current_thread()
            返回当前线程对象,相应调用者的控制线程。

    假设调用者的控制线程不是通过threading模块创建,一个功能受限的虚拟线程被返回。
    threading.get_ident()
            返回当前线程的“线程标识符”。

    这是一个非0整数,没有特定含义,通经常使用于索引线程特定数据的字典。

    线程标识符能够被循环使用。


    threading.enumerate()
            返回当前活跃的全部线程对象的列表。该列表包含精灵线程、被current_thread()创建的虚拟线程对象、和主线程。

    它不包含终止的线程和还没有启动的线程。


    threading.main_thread()
            返回主线程对象。

    在正常情况下,主线程就是Python解释器启动的线程。
    threading.settrace(func)
            为全部从threading模块启动的线程设置一个trace函数。

    在每一个线程的run()方法被调用前。函数将为每一个线程被传递到sys.settrace()。


    threading.setprofile(func)
            为全部从threading模块启动的线程设置一个profile函数。在每一个线程的run()方法被调用前,函数将为每一个线程被传递到sys.setprofile() 。
    threading.stack_size([size])
            返回当创建一个新线程是使用的线程栈大小,0表示使用平台或配置的默认值。
    平台顶一个常量例如以下:
    threading.TIMEOUT_MAX 
            堵塞函数(Lock.acquire()、RLock.acquire()、Condition.wait()等)的超时參数同意的最大值。指定的值超过该值将抛出OverflowError。


    该模块也定义了一些类,在以下会讲到。
    该模块的设计是仿照Java的线程模型。然而。Java使lock和condition变量成为每一个对象的基本行为。在Python中则是分离的对象。Python的Thread类支持Java的线程类的行为的一个子集;当前,没有优先级,没有线程组,而且线程不能被销毁、停止、暂停、恢复、或者中断。当实现时,Java的线程类的静态方法被相应到模块级函数。
    形容在以下的全部方法都被原子地运行。

    线程本地数据

    线程本地数据是那些值和特定线程相关的数据。为了管理线程本地数据,创建一个local类(或者一个子类)的实例,然后存储属性在它里面:
    mydata = threading.local()
    mydata.x = 1
    为不同线程实例的值将是不同的。




    class threading.local 
    表示线程本地数据的类。


    很多其它的细节參考_threading_local的文档字符串。


    线程对象

    Thread类表示一个执行在一个独立的控制线程中的行为。有两个方法指定这个行为:通过传递一个callable对象给构造函数,或者通过在子类中重载run()方法,在子类中没有其它方法(除了构造函数)应该被重载,换句话说,仅重载这个类的__init__()和run()方法。


    一旦一个线程对象被创建,他的行为必须通过线程的start()方法启动。这将在一个独立的控制线程中调用run()方法。
    一旦线程的行为被启动,这个线程被觉得是'活跃的'。

    正常情况下,当它的run()终止时它推出活跃状态。或者出现为处理的异常。is_alive()方法可用于測试线程是否活跃。
    其他线程能调用一个线程的join()方法。

    这将堵塞调用线程直到join()方法被调用的线程终止。


    一个线程有一个名称,名称能被传递给构造器,并能够通过name属性读取或者改变。


    一个线程能被标注为“精灵线程”。

    这个标志的意义是当今有精灵线程遗留时,Python程序将退出。初始值从创建的线程继承,这个标志能够通过daemon属性设置,或者通过构造器的daemon參数传入。
    注意,精灵线程在关闭时会突然地停止。他们的资源(比如打开的文件、数据库事务等)不能被正确的释放。假设你想你的线程优雅地停止,应该使它们是非精灵线程而且使用一个适当的信号机制,比如Event(后面解说)。
    有一个“主线程”对象,这相应到Python程序的初始控制线程,它不是精灵线程。
    有可能“虚拟线程对象”被创建,则会存在线程对象相应到“外星人线程”,其控制线程在threading模块之外启动,比如直接从C代码启动。虚拟线程对象的功能是受限的,它们总是被觉得是活跃的精灵线程,不能被join()。

    他们不能被删除,因为探測外星人线程的终止是不可能的。


    以下是Thread类的构造方法:


    class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 
    1)group:应该是None,保留为未来的扩展,当一个ThreadGroup类被实现的时候须要;
    2)target:一个callable对象,被run()方法调用。默觉得None,意味着什么都不做。
    3)name:线程名,默认情况下,一个形式为“Thread-N”的唯一名被构造,N是一个小的十进制数;
    4)args:调用target的參数元组,默觉得();
    5)kwargs:为target调用的參数字典,默觉得{};
    6)daemon:假设不为None。则设置该线程是否为精灵线程。假设为None,则从当前线程继承;
    假设子类覆盖了构造器。在做其他不论什么事情之前。它必须确保先调用基类的构造器(Thread.__init__())。
    线程的经常用法例如以下:
    1)start()
    启动线程。
    每一个线程最多仅仅能调用一次。它会导致对象的run()方法在独立的控制线程中被调用。


    假设在同一个线程对象上调用超过一次将抛出RuntimeError。
    2)run()
    表示线程行为的方法。


    你能够在子类中重载该方法。

    标准的run()方法调用传入到构造器中callable对象(相应target參数),使用相应的args或者kwargs參数。
    3)join(timeout=None)
    等待直到线程结束。这将堵塞当前线程直到join()方法被调用的线程终止或者抛出一个未处理的异常,或者设置的溢出时间到达。
    假设timeout參数指定且为非None,则它应该是一个浮点数,用于指定操作的溢出时间,单位为秒。因为join()总是返回None,因此在join()结束后你必须调用is_alive()来推断线程是否结束。假设线程任然是活跃的,join()调用则是时间溢出。


    当timeout不被指定,或者指定为None时。操作将堵塞直到线程终止。
    一个线程能被join()多次。
    假设一个join当前线程的尝试将导致一个死锁,join()将抛出RuntimeError。

    在一个线程启动之前对该线程做join()操作也将导致相同的异常。
    4)name
    线程名,仅用于标识一个线程,没有语义,多个线程能够被给相同的名字,初始的名称被构造器设置。


    5)getName() 
      setName()
    name的旧的getter/setter API。如今直接使用name属性取代。


    6)ident
    这个线程的“线程标识符”,假设线程没有启动,则为None。

    这是一个非0整数。

    线程标识符能够被循环使用,一个线程退出后它的线程标识符能够被其他线程使用。
    7)is_alive()
    返回线程是否活跃。


    该方法仅仅有在run()启动后而且在终止之前才返回True。模块函数enumerate()返回全部活跃线程的一个列表。
    8)daemon
    一个布尔值。用于表示该线程是否精灵线程。

    该值必须在start()方法调用之前设置,否则RuntimeError被抛出。它的初始值从创建的线程继承。主线程不是一个精灵线程,因此全部在主线程中创建的线程daemon默觉得False。
    当没有活跃的非精灵线程执行时。整个Python程序退出。
    9)isDaemon() 
           setDaemon()
    老的getter/setter API。如今直接使用daemon属性取代。


    Lock对象

    基元锁是不被特定线程拥有的同步基元。

    在Python中。它是当前可用的最低级别的同步基元,通过_thread扩展模块直接实现。
    一个基元锁存在两种状态,“锁”或者“未锁”,初始创建时处于未锁状态。

    他有两个基本方法:acquire()和release()。当状态是未锁时,acquire()将改变其状态到锁而且马上返回;当状态是锁时,acquire()堵塞直到还有一个线程调用release()释放了锁,然后acquire()获取锁并重设锁的状态到锁而且返回。

    release()应该仅仅在锁处理锁状态时才调用,它改变锁的状态到未锁而且马上返回。假设尝试释放一个未锁的锁,一个RuntimeError将被抛出。
    锁也支持上下文管理协议。
    当超过一个线程被锁堵塞,当锁被释放后仅有一个线程能获取到锁,获取到锁的线程不确定。依赖详细的实现。
    相关类例如以下:


    class threading.Lock 
    该类实现了基元锁对象。线程能够通过acquire请求该锁。假设已经存在其他线程获取了锁。则线程堵塞。直到其他线程释放锁。
    1)acquire(blocking=True, timeout=-1)
    请求一个锁,堵塞或者非堵塞。
    当blocking參数为True(默认),将堵塞直到锁被释放。然后获取锁并返回True。
    当blocking參数为False,将不堵塞。假设已经存在线程获取了锁,调用将马上返回False。否则。将获取锁并返回True。


    当timeout參数大于0时。最多堵塞timeout指定的秒值。

    timeout为-1(默认)表示一直等待。当blocking为False时不同意指定timeout參数。


    假设锁请求成功。则返回True,否则返回False(比如超时)。


    2)release()
    释放一个锁。这能在不论什么线程中调用,不仅在获取锁的线程中。


    当锁处于锁状态时,重设它为未锁,并返回。其他堵塞等待该锁的线程中将有一个线程能获取到锁。
    在一个未锁的锁上调用该方法。将抛出RuntimeError。


    无返回值。

    RLock对象

    一个可重入锁能够被同一个线程请求多次。在内部,它在基元锁的基础上使用了“拥有者线程”和“递归级别”的概念。在锁状态,一些线程拥有锁;在未锁状态。没有线程拥有锁。
    为了获取锁,一个线程调用acquire()方法。获取锁后返回;为了释放锁,一个线程调用release()方法。acquire()/release()的调用能够是嵌套的,仅仅有最后的release()重设锁到未锁。


    可重入锁也支持上下文管理协议。


    class threading.RLock 
    该类实现了可重入锁对象。一个可重入锁必须被请求它的线程释放,一旦一个线程拥有了一个可重入锁,该线程能够再次请求它,注意请求锁的次数必须和释放锁的次数相应。


    注意RLock实际上是一个工厂函数,返回当前平台支持的效率最高的RLock类版本号的一个实例。


    1)acquire(blocking=True, timeout=-1)
    请求一个锁,堵塞或者非堵塞方式。


    当參数为空时:假设这个线程已经拥有锁,递归级别加一,然后返回。否则,假设还有一个线程拥有锁,堵塞直到锁被释放。

    假设锁处于未锁状态(不被不论什么线程拥有),则设置拥有者线程。并设置递归级别为1,然后返回。假设超过一个线程处于堵塞等待队列中,一次仅有一个线程能获取锁。

    该场景没有返回值。
    当blocking为True时,和没有參数的场景同样,并返回True。


    当blocking为False时。将不堵塞。假设锁处于锁状态,则马上返回False;否则。和没有參数的场景同样,并返回True。
    当timeout參数大于0时。最多堵塞timeout秒。假设在timeout秒内获取了锁,则返回True,否则超时返回False。
    2)release()
    释放一个锁,降低递归级别。假设递归级别降低到0,则重设锁的状态到未锁(不被不论什么线程拥有)。假设降低后递归级别任然大于0。则锁任然被调用者线程保持。
    仅当调用者线程拥有锁时才调用该方法。否则抛出RuntimeError。
    没有返回值。

    Condition对象

    condition变量总是和锁相关联,这能被传入或者通过默认创建。

    当几个condition变量必须共享同一个锁时传入是实用的。锁是condition对象的一部分:你不必分别跟踪它。
    condition变量遵守上下文管理协议:在代码块中用with语句获取相关的锁。acquire()和release()方法也调用相关锁的相应方法。
    其他方法必须被相关锁的持有者调用。

    wait()方法释放锁,然后堵塞直到还有一个线程调用notify()或者notify_all()唤醒它。唤醒后,wait()又一次获取锁并返回。它也能够指定一个超时时间。
    notify()方法唤醒等待线程中的一个;notify_all()方法唤醒全部等待线程。
    注意:notify()和notify_all()方法不释放锁;这意味着唤醒的线程或者线程组将不会从wait()调用中马上返回。
    使用condition变量的一个典型的应用就是用锁同步对一些共享状态的进入;对某个特定状态感兴趣的线程重复调用wait()。直到出现他们感兴趣的状态。改动这个状态的线程则调用notify()或者notify_all()来通知等待的线程状态已经改变。比如。以下是一个典型的使用了无限缓存的生产者-消费者模式:

    # 消费一个条目
    with cv:
        while not an_item_is_available():
            cv.wait()
        get_an_available_item()
    
    # 产生一个条目
    with cv:
        make_an_item_available()
        cv.notify()
    while循环检查是否有条目可用。由于wait()能够在等待随意时间后返回,也有可能调用notify()的线程并没有使条件为真。在多线程编程中这个问题始终存在。wait_for()方法能被用于自己主动的条件检測,简化超时的计算:
    # 消费一个条目
    with cv:
        cv.wait_for(an_item_is_available)
        get_an_available_item()
    对于notify()和notify_all()。使用哪个在于应用场景中有一个还是多个等待线程。

    比如。在一个典型的生产者-消费者场景中。添加一个条目到缓存仅须要唤醒一个消费者线程。


    class threading.Condition(lock=None)
    该类实现条件变量对象。一个条件变量同意一个或多个线程等待。直到他们被还有一个线程通知。


    假设lock參数被指定且不为None。它必须是Lock或者RLock对象,被用做隐含锁。

    否则。一个新的RLock对象被创建并作为隐含锁。


    1)acquire(*args)
    请求一个隐含锁,这种方法会调用隐含锁的相应方法。返回值即为隐含锁的方法的返回值。


    2)release()
    释放隐含锁。

    这种方法调用隐含锁的相应方法。没有返回值。
    3)wait(timeout=None)
    等待直到被唤醒。或者超时。

    假设调用线程没有请求锁。RuntimeError被抛出。
    该方法会释放隐含锁,然后堵塞直到它被还有一个线程调用同一个condition对象的notify()或者notify_all()方法唤醒,或者直到指定的timeout时间溢出。一旦唤醒或者超时,它又一次请求锁并返回。
    当timeout參数被指定并不为None,则指定了一个秒级的超时时间。
    当隐含锁是一个RLock锁,它不通过release()方法释放锁,由于假设线程请求了多次锁,使用release()方法不能解锁(必须调用和lock方法同样的次数才干解锁)。

    一个RLock类的内部接口被使用,该接口能释放锁,无论锁请求了多少次。

    当锁被请求时,还有一个内部接口被用于还原锁的递归层级。


    方法返回True,假设超时则返回False。
    4)wait_for(predicate, timeout=None)
    等待直到条件为True。

    predicate应该是一个callable,返回值为布尔值。

    timeout用于指定超时时间。


    该方法相当于重复调用wait()直到条件为真,或者超时。返回值是最后的predicate的返回值。或者超时返回Flase。


    忽略超时特性,调用这种方法相当于:

    while not predicate():
        cv.wait()
    因此。调用该方法和调用wait()具有相同的规则:调用是或者从堵塞中返回时必须先获取锁。
    5)notify(n=1)
    默认情况下,唤醒一个等待线程。假设调用该方法的线程没有获取锁,则RuntimeError被抛出。


    这种方法子多唤醒n(默觉得1)个等待线程;假设没有线程等待。则没有操作。
    假设至少n个线程正在等待。当前的实现是刚好唤醒n个线程。

    然而,依赖这个行为是不安全的,由于,未来某些优化后的实现可能会唤醒超过n个线程。
    注意:一个唤醒的线程仅仅有当请求到锁后才会从wait()调用中返回。因为notify()不释放锁,所以它的调用者应该释放锁。


    6)notify_all()
    唤醒全部等待线程。这种方法的行为类似于notify(),可是唤醒全部等待线程。假设调用线程未获取锁,则RuntimeError被抛出。

    Semaphore对象

    这是计算机科学历史上最早的同步基元中的一个,被荷兰的计算机科学家Edsger W. Dijkstra发明(他使用P()和V()而不是acquire()和release())。
    semaphore管理一个内部计数,每次调用acquire()时该计数减一,每次调用release()时计数加一。

    计数不会小于0。当acquire()发现计数为0时,则堵塞。等待直到其他线程调用release()。


    semaphore也支持上下文管理协议。


    class threading.Semaphore(value=1) 
    该类实现semaphore对象。一个semaphore管理一个表示计数表示能并行进入的线程数量。假设计数为0。则acquire()堵塞直到计数大于0。

    value默觉得1.
    value给出了内部计数的初始值,默觉得1。假设传入的value小于0,则抛出ValueError。
    1)acquire(blocking=True, timeout=None)
    请求semaphore。


    当没有參数时:假设内部计数大于0。将计数减1并马上返回。假设计数为0。堵塞。等待直到还有一个线程调用了release()。这使用互锁机制实现,保证了假设有多个线程调用acquire()堵塞。则release()将仅仅会唤醒一个线程。唤醒的线程是随机选择一个,不依赖堵塞的顺序。

    成功返回True(或者无限堵塞)。


    假设blocking为False,将不堵塞。假设无法获取semaphore。则马上返回False;否则。同没有參数时的操作,并返回True。
    当设置了timeout而且不是None,它将堵塞最多timeout秒。假设在该时间内没有成功获取semaphore,则返回False;否则返回True。
    2)release()
    释放一个semaphore。计数加1。假设计数初始为0,则须要唤醒等待队列中的一个线程。



    class threading.BoundedSemaphore(value=1)
    该类实现了有界的semaphore对象。一个有界的semaphore会确保它的当前值没有溢出他的初始值。假设溢出。则ValueError被抛出。

    在大部分场景下。semaphore被用于限制资源的使用。

    假设semaphore被释放太多次,往往表示出现了bug。

    Semaphore使用实例

    Semaphore通常被用于控制资源的使用。比如。一个数据库server。在一些情况下。资源的大小被固定,你应该使用一个有界的Semaphore。

    在启动其它工作线程之前。你的主线程首先初始化Semaphore:

    maxconnections = 5
    # ...
    pool_sema = BoundedSemaphore(value=maxconnections)
    其他工作线程则在须要连接到server时调用Semaphore的请求和释放方法:
    with pool_sema:
        conn = connectdb()
        try:
            # ... use connection ...
        finally:
            conn.close()
    有个有界的Semaphore能够降低程序出错的机会,防止semaphore释放的次数大于请求次数引发的问题。

    Event对象

    这是在线程间最简单的通信机制之中的一个:一个线程通知一个事件,还有一个线程等待通知并作出处理。


    一个Event对象管理一个内部标志,使用set()方法能够将其设置为True,使用clear()方法能够将其重设为False。wait()方法将堵塞直到标志为True。


    class threading.Event
    该类实现事件对象。

    一个事件管理一个标志,能够通过set()方法将其设置为True,通过clear()方法将其重设为False。wait()方法堵塞直到标志为True。

    标志初始为False。
    1)is_set()
    当且仅当内部标志为True时返回True。


    2)set()
    设置标志为True。

    全部等待的线程都将被唤醒。一旦标志为True。调用wait()的线程将不再堵塞。
    3)clear()
    重设标志为False。

    接下来,全部调用wait()的线程将堵塞直到set()被调用。
    4)wait(timeout=None)
    堵塞直到标志被设置为True。假设标志已经为True,则马上返回;否则,堵塞直到还有一个线程调用set(),或者直到超时。


    当timeout參数被指定且不为None,线程将仅等待timeout的秒数。
    该方法当标志为True时返回True,当超时时返回False。

    Timer对象

    该类表示了一个定时器,表示一个行为在多少时间后被运行。Timer是Thread的子类,能够作为创建自己定义线程的一个实例。
    Timer通过调用start()方法启动,通过调用cancel()方法停止(必须在行为被运行之前)。Timer运行指定行为之间的等待时间并非精确的,也就是说可能与用户指定的间隔存在差异。
    比如:
    def hello():
        print("hello, world")
    
    t = Timer(30.0, hello)
    t.start() # 30秒后,"hello, world"将被打印
    class threading.Timer(interval, function, args=None, kwargs=None) 
    创建一个Timer,在interval秒之后,将使用參数args和kwargs作为參数运行function。假设args为None(默认),将使用空list。假设kwargs是None(默认)。则使用空字典。
    1)cancel()
    停止定时器。而且取消定时器的行为的运行。这仅当定时器任然处理等待状态时才有效。

    Barrier对象

    栅栏提供了一个简单的同步基元,用于固定数量的线程须要等待彼此的场景。

    尝试通过栅栏的每一个线程都会调用wait()方法,然后堵塞直到全部的线程都调用了该方法。然后,全部线程同一时候被释放。
    栅栏能被反复使用随意多次,但必须是同等数量的线程。


    以下是一个样例,一个同步client和服务端线程的简单方法:

    b = Barrier(2, timeout=5)
    
    def server():
        start_server()
        b.wait()
        while True:
            connection = accept_connection()
            process_server_connection(connection)
    
    def client():
        b.wait()
        while True:
            connection = make_connection()
            process_client_connection(connection)
    class threading.Barrier(parties, action=None, timeout=None) 
    为parties个线程创建一个栅栏对象,假设提供了action,则当线程被释放时。它将被线程中的一个调用。timeout表示wait()方法的默认超时时间值。
    1)wait(timeout=None)
    通过栅栏。当全部使用栅栏的线程都调用了该方法后,他们将被同一时候释放。假设timeout被提供,他优先于类构造器提供的timeout參数。
    返回值是0到parties的整数,每一个线程都不同。这能用于选择某个特定的线程做一些特定的操作,比如:
    i = barrier.wait()
    if i == 0:
        # Only one thread needs to print this
        print("passed the barrier")
    假设一个action被提供给了构造器,线程中的当中一个将在被释放时调用它。假设调用抛出一个异常。则栅栏进入损坏状态。


    假设调用超时。则栅栏进入损坏状态。
    假设栅栏处于损坏状态,或者有线程在等待时被重设了。则该方法会抛出BrokenBarrierError异常。
    2)reset()
    恢复栅栏到默认状态。

    不论什么处于等待中的线程将收到BrokenBarrierError异常。
    注意这里须要额外的同步。假设一个栅栏被损坏,创建一个新的栅栏或许是更好的选择。
    3)abort()
    放置栅栏到损坏状态。

    这导致当前处于等待的线程和未来对wait()的调用都会抛出BrokenBarrierError。通常使用该方法是为了避免死锁。
    使用一个超时时间应该是更好的选择。
    4)parties
    要求通过栅栏的线程的数量。
    5)n_waiting
    当前处于等待中的线程数量。
    6)broken
    栅栏是否处于损坏状态,假设是则为True。



    exception threading.BrokenBarrierError
    该异常是RuntimeError的子类,当栅栏对象被重设或者损坏时被抛出。

    在with语句中使用locks、conditions和semaphores

    全部被该模块提供的具有acquire()和release()方法的对象都能使用with语句管理。

    当进入堵塞状态时acquire()方法将被调用,而当推出堵塞状态时release()方法将被调用。以下是详细的语法:

    with some_lock:
        # do something...
    等价于:
    some_lock.acquire()
    try:
        # do something...
    finally:
        some_lock.release()
    当前,Lock、RLock、Condition、Semaphore和BoundedSemaphore都能够在with语句中管理。


  • 相关阅读:
    PythonのTkinter基本原理
    使用 Word (VBA) 分割长图到多页
    如何使用 Shebang Line (Python 虚拟环境)
    将常用的 VBScript 脚本放到任务栏 (Pin VBScript to Taskbar)
    关于 VBScript 中的 CreateObject
    Windows Scripting Host (WSH) 是什么?
    Component Object Model (COM) 是什么?
    IOS 打开中文 html 文件,显示乱码的问题
    科技发展时间线(Technology Timeline)
    列置换密码
  • 原文地址:https://www.cnblogs.com/lytwajue/p/6754231.html
Copyright © 2011-2022 走看看