zoukankan      html  css  js  c++  java
  • Python中的多线程编程,线程安全与锁(二)

    在我的上篇博文Python中的多线程编程,线程安全与锁(一)中,我们熟悉了多线程编程与线程安全相关重要概念, Threading.Lock实现互斥锁的简单示例两种死锁(迭代死锁和互相等待死锁)情况及处理。今天我们将聚焦于Python的Threading模块总结线程同步问题。

    1. Threading模块总结

    1.1 Threading模块概览

    threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。python当前版本的多线程库没有实现优先级、线程组,线程也不能被停止、暂停、恢复、中断。

    threading模块提供的类:  
      Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

    threading 模块提供的常用方法: 
      threading.currentThread(): 返回当前的线程变量。 
      threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 
      threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    threading 模块提供的常量:

      threading.TIMEOUT_MAX 设置threading全局超时时间。

    1.2 Thread类

    Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run()。推荐使用方法一,将目标函数作为target参数传入,非常简单实用

    # coding:utf-8
    import threading
    import time
    #方法一:将要执行的方法作为参数传给Thread的构造方法
    def action(arg):
        time.sleep(1)
        print 'the arg is:%s
    ' %arg
    
    for i in xrange(4):
        t =threading.Thread(target=action,args=(i,))
        t.start()
    
    print 'main thread end!'
    
    #方法二:从Thread继承,并重写run()
    class MyThread(threading.Thread):
        def __init__(self,arg):
            super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
            self.arg=arg
        def run(self):#定义每个线程要运行的函数
            time.sleep(1)
            print 'the arg is:%s
    ' % self.arg
    
    for i in xrange(4):
        t =MyThread(i)
        t.start()
    
    print 'main thread end!'

    相关方法:

    构造方法: 
    Thread(group=None, target=None, name=None, args=(), kwargs={}) 

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 线程名; 
      args/kwargs: 要传入方法的参数。

    实例方法: 
      isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。 
      get/setName(name): 获取/设置线程名。 

      start():  线程准备就绪,等待CPU调度
      is/setDaemon(bool): 将该子线程设置为父线程的守护线程(默认为非守护线程(False))。(需要在线程start之前设置)

        关于“守护”的含义,我们可以这样理解,子线程为父线程的守护线程,意思是说子线程要守着父线程,一旦父线程执行完毕,也就不需要“守护”了,所以此时子线程就要结束。

        True: 设置该子进程为父进程的守护进程,即后台线程。主线程执行过程中,子线程也在进行,主线程执行完毕后,子线程不论成功与否,主线程和子线程均停止。
             False:设置该子进程为父进程的非守护进程,即前台进程。主线程执行过程中,子线程也在进行,主线程代码执行完毕后,仍需要等待子线程也执行完成后,主线程才会停止。
      start(): 启动线程。 
      join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。

    1.2.1 关键参数setDaemon

    该参数是设置线程属性,规定当前线程是否属于守护线程(默认为非守护线程(False))。(需要在线程start之前设置)

    • True: 设置该子进程为父进程的守护进程,即后台线程。主线程执行过程中,子线程也在进行,主线程执行完毕后,子线程不论成功与否,主线程和子线程均停止。
    • False:设置该子进程为父进程的非守护进程,即前台进程。主线程执行过程中,子线程也在进行,主线程代码执行完毕后,仍需要等待子线程也执行完成后,主线程才会停止。

    关于setDaemon,默认子线程属于非守护线程,即主线程要等待所有子线程执行完之后,才停止程序。

    # coding:utf-8
    import threading
    import time
    
    def action(arg):
        time.sleep(1)
        print  'sub thread start!the thread name is:%s
    ' % threading.currentThread().getName()
        print 'the arg is:%s
    ' %arg
        time.sleep(1)
    
    for i in xrange(4):
        t =threading.Thread(target=action,args=(i,))
        t.start()
    
    print 'main_thread end!'
    
    main_thread end!
    sub thread start!the thread name is:Thread-2
    the arg is:1
    the arg is:0
    sub thread start!the thread name is:Thread-4
    the arg is:2
    the arg is:3
    Process finished with exit code 0
    可以看出,创建的4个“前台”线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    该例子验证了setDeamon(False)(默认)非守护线程,主线程执行过程中,子线程也在进行,主线程执行完毕后,等待子线程也执行完成后,主线程停止。

    设置setDeamon=True时:

    # coding:utf-8
    import threading
    import time
    
    def action(arg):
        time.sleep(1)
        print  'sub thread start!the thread name is:%s
    ' % threading.currentThread().getName()
        print 'the arg is:%s
    ' %arg
        time.sleep(1)
    
    for i in xrange(4):
        t =threading.Thread(target=action,args=(i,))
        t.setDaemon(True)#设置线程为后台线程
        t.start()
    
    print 'main_thread end!'
    
    
    
    main_thread end!
    
    可以看出,主线程执行完毕后,后台线程不管是成功与否,主线程均停止

    验证了setDeamon(True)守护线程,主线程执行过程中,守护线程也在进行,主线程执行完毕后,子线程不论成功与否,均与主线程一起停止。

    1.2.2 关键参数join

    阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。即当子进程的join()函数被调用时,主线程就被阻塞住了,意思为不再继续往下执行。

    值得注意的是,由于join()会阻塞其他函数,如果我们要用for循环触发多个线程的执行,start()要和join()分开(用两个for循环,先用第一个for循环将全部子线程start(),再用第二个for循环将全部子线程join),不然会让多线程并行执行,变成多线程依次执行:

    • 因为如果其他线程还没有start(),那么由于start()操作属于主线程的调用,那么start()会被阻塞,我们原本想要的多线程并行执行会变成多线程依次执行。
    • 如果此时其他线程已经start()了,那么join()函数由于是对子线程的操作,不属于主线程,则不会被阻塞。
    #coding:utf-8
    import threading
    import time
    
    def action(arg):
        time.sleep(1)
        print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
        print 'the arg is:%s   ' %arg
        time.sleep(1)
    
    thread_list = []    #线程存放列表
    for i in xrange(4):
        t =threading.Thread(target=action,args=(i,))
        t.setDaemon(True)
        thread_list.append(t)
    
    for t in thread_list:
        t.start()
    
    for t in thread_list:
        t.join()
    print("main_thread end!")

    #Output: sub thread start!the thread name
    is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 设置join之后,主线程等待子线程全部执行完成后或者子线程超时后,主线程才会从被阻塞的地方继续执行。

    验证了 join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束

    使用例子(join不妥当的用法,使多线程编程顺序执行)

    #coding:utf-8
    import threading
    import time
    
    def action(arg):
        time.sleep(1)
        print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
        print 'the arg is:%s   ' %arg
        time.sleep(1)
    
    
    for i in xrange(4):
        t =threading.Thread(target=action,args=(i,))
        t.setDaemon(True)
        t.start()
        t.join()
    
    print 'main_thread end!'
    
    join不妥当用法
    
    sub thread start!the thread name is:Thread-1    
    the arg is:0   
    sub thread start!the thread name is:Thread-2    
    the arg is:1   
    sub thread start!the thread name is:Thread-3    
    the arg is:2   
    sub thread start!the thread name is:Thread-4    
    the arg is:3   
    main_thread end!
    
    Process finished with exit code 0
    可以看出此时,程序只能顺序执行,每个线程都被上一个线程的join阻塞,使得“多线程”失去了多线程意义。
    
    运行结果

    1.3 Timer类

    Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

    构造方法: 
    Timer(interval, function, args=[], kwargs={}) 
      interval: 指定的时间 
      function: 要执行的方法 
      args/kwargs: 方法的参数

    实例方法: 
    Timer从Thread派生,没有增加实例方法。

    # encoding: UTF-8
    import threading
    
    
    def func():
        print (hello timer!)
    
    if __name__ == "__main__"
        timer = threading.Timer(5, func)
        timer.start()

    该例子效果为延迟5秒执行

    2 线程同步

    线程同步是借由threading模块的以下类实现的:Condition,Event,Local.

    2.1 Condition类

    我们先关注一下Condition类一般用于什么场景:

    线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。在pthread库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。

    通俗的讲,Condition类适合于生产者,消费者模型。 即Condition很适合那种主动休眠,被动唤醒的场景。 Condition使用难度要高于mutex,一不注意就会被死锁,所以很考验对condition的理解。

         首先我们知道python下的线程是真实的线程,底层用的是pthread。pthread内部Condition条件变量有两个关键函数, await和signal方法,对应python threading Condition是wait和notify方法。 

         一个Condition实例的内部实际上维护了两个队列一个是等待锁队列(实际上mutex内部其实就是维护这个等待锁队列) ,另一个队列可以叫等待条件队列,在这队列中的节点都是由于(某些条件不满足而)线程自身调用wait方法阻塞的线程(记住是自身阻塞)。最重要的Condition方法是wait和 notify方法。另外condition还需要lock的支持, 如果你构造函数没有指定lock,condition会默认给你配一个rlock。   

    构造方法: 
    Condition([lock/rlock])

    实例方法: 
    acquire([timeout])/release(): 调用关联的锁的相应方法。 
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
    notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

    下面是这两个方法的执行流程。
              wait方法:
                                1. 入列到条件队列(注意这里不是等待锁的队列)
                                2. 释放锁
                                3. 阻塞自身线程
                                 ————被唤醒后执行————-
                                4. 尝试去获取锁(执行到这里时线程已不在条件队列中,而是位于等待(锁的)队列中,参见signal方法)
                                    4.1 成功,从await方法中返回,执行线程后面的代码
                                    4.2 失败,阻塞自己(等待前一个节点释放锁时将它唤醒)
             注意: 调用wait可以让当前线程休眠,等待其他线程的唤醒,也就是等待signal,这个过程是阻塞的。 当队列首线程被唤醒后,会继续执行await方法中后面的代码。

             notify(signal)方法:

                               1. 将条件队列的队首节点取出,放入等待锁队列的队尾
                               2. 唤醒节点对应的线程.

    注: notify ( signal ) 可以把wait队列的那些线程给唤醒起来。  

    下面给一个生产者-消费者模型的例子:

    #/usr/bin/python3
    # encoding: UTF-8
    import threading
    import time
    
    # 商品
    product = None
    # 条件变量
    con = threading.Condition()
    
    
    # 生产者方法
    def produce():
        global product
    
        if con.acquire():
            while True:
                if product is None:
                    print("produce the product...")
                    product = 'anything'
    
                    # 通知消费者,商品已经生产
                    con.notify()
                else:
                    print("Producer: product is not None")
                # 等待通知
                con.wait()
                print("Producer: Resume from wait...")
                time.sleep(2)
    
    
    # 消费者方法
    def consume():
        global product
    
        if con.acquire():
            while True:
                if product is not None:
                    print("consume the product...")
                    product = None
    
                    # 通知生产者,商品已经没了
                    con.notify()
                else:
                    print("Cosumer: product is None")
                # 等待通知
                con.wait()
                print("Cosumer: Resume from wait...")
                time.sleep(2)
    
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=produce)
        t2 = threading.Thread(target=consume)
        t2.start()
        t1.start()

    结果为

    Cosumer: product is None
    produce the product Cosumer: Resume from wait...
    cosume the product
    Producuer: Resume from wait...
    produce the product
    Cosumer: Resume from wait...
    cosume the product

     注意:con.wait()一定要在if判断的外面,因为一开始的时候,两个子线程都获得了锁。如果没有con.wait()释放锁并等待通知,则另一个暂时没有获得锁权限的线程,会一直被阻塞。整个生产者-消费者模型也就跑不起来了。下面是跑步起来的例子:

    #/usr/bin/python3
    # encoding: UTF-8
    import threading
    import time
    
    # 商品
    product = None
    # 条件变量
    con = threading.Condition()
    
    
    # 生产者方法
    def produce():
        global product
    
        if con.acquire():
            while True:
                if product is None:
                    print("produce the product...")
                    product = 'anything'
    
                    # 通知消费者,商品已经生产
                    con.notify()
                    con.wait()
                    print("Producer: Resume from wait...")
                    time.sleep(2)
                else:
                    print("Producer: product is not None")
                # 等待通知
                #con.wait()
                #print("Producer: Resume from wait...")
               # time.sleep(2)
    
    
    # 消费者方法
    def consume():
        global product
    
        if con.acquire():
            while True:
                if product is not None:
                    print("consume the product...")
                    product = None
    
                    # 通知生产者,商品已经没了
                    con.notify()
                    con.wait()
                    print("Cosumer: Resume from wait...")
                    time.sleep(2)
                else:
                    print("Cosumer: product is None")
                # 等待通知
                #con.wait()
               # print("Cosumer: Resume from wait...")
               # time.sleep(2)
    
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=produce)
        t2 = threading.Thread(target=consume)
        t2.start()
        t1.start()

    结果为:

    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None
    Cosumer: Product is None

    2.2 Event类

    Event(事件)是最简单的线程通信机制之一一个线程通知事件,其他线程等待事件Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

      Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态

    构造方法: 
    Event()

    实例方法: 
      isSet(): 当内置标志为True时返回True。 
      set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。 
      clear(): 将标志设为False。 
      wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

    下面是例子:

    # encoding: UTF-8
    import threading
    import time
    
    event = threading.Event()
    
    
    def func():
        # 等待事件,进入等待阻塞状态
        print '%s wait for event...' % threading.currentThread().getName()
        event.wait()
    
        # 收到事件后进入运行状态
        print '%s recv event.' % threading.currentThread().getName()

    if __name__ == "__main__":
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    t1.start()
    t2.start()
    time.sleep(2)
    print 'MainThread set event.' 
    event.set()

    结果为

    Thread-1 wait for event...
    Thread-2 wait for event...
    
    #2秒后。。。
    MainThread set event.
    Thread-1 recv event.
    Thread-2 recv event.
    

    2.3 Local类

    local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换

      可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

    # encoding: UTF-8
    import threading
     
    local = threading.local()
    local.tname = 'main'
     
    def func():
        local.tname = 'notmain'
        print local.tname
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=func)
        t1.start()
        t1.join()
        print(local.tname)
  • 相关阅读:
    C++ STL list
    1159 Palindrome
    3070 Fibonacci
    1458 Common Subsequence
    git工具之重写历史
    git工具之修订版本(revision)选择
    git工具之使用git调试
    程序员也要听歌啊 写写歌词~(毛不易)《借》《消愁》《像我这样的人》
    git基本命令讲解
    git工作原理
  • 原文地址:https://www.cnblogs.com/ArsenalfanInECNU/p/10134591.html
Copyright © 2011-2022 走看看