zoukankan      html  css  js  c++  java
  • 多线程与线程锁

    Thread基本方法

    #! /usr/bin/env python3
    # encoding: utf-8
    
    """
    @Author: zengchunyun
    @Date: 2017/7/12
    """
    
    import threading
    import sys
    import time
    
    
    def read(event):
        print('get a task ...')
        print('read is_set', event.is_set())  # 获取event状态,如果event状态为False,调用event.wait()就会进入阻塞状态,
        # 直到event状态为True,才会继续执行event.wait()后面的代码
        event.wait()  # 此处会进入等待状态,不会继续执行下面的代码,直到event对象被设置为True,则会通知此wait进入非阻塞状态,即,会继续执行下面的代码
        print('read is_set', event.is_set())  # 获取event状态,如果在wait后,通常该状态为True,如果wait后,又立即对event调用clear()方法,则该状态又为False
        event.clear()  # 将event状态设置为False
        print('read is_set', event.is_set())  # 获取event状态,如果在wait后,通常该状态为True,如果wait后,又立即对event调用clear()方法,则该状态又为False
        current_thread = threading.current_thread()  # 获取当前线程对象
        print('current_thread', current_thread)
        # current_thread._tstate_lock=None
        # current_thread._stop()
        print('is_alive', current_thread.is_alive())  # 通过获取到当前线程对象后,可以对当前线程做一些操作,比如判断当前线程状态,这里对状态肯定是为True,因为如果为False,那就不可能执行到这里了.也可以通过一些特殊方式,在此处将该结果得到的值为False,不过这种方式对程序而言就没有太大意义了。
        current_thread.name = 'read_thread'  # 可以对当前线程修改名字
        print(current_thread.name)
        print('get_ident', threading.get_ident())  # 返回当前的线程唯一标识符,为非0的整数
        print('main_thread', threading.main_thread())  # 获取主线程对象,通常这个主线程是有python解释器启动的
        print("doing now")
        time.sleep(50)
        print('doing done.')
    
    
    def write(frame, event, arg):
        print('write', frame, event, arg, frame.f_lineno)
    
    
    def worker():
        # threading.settrace(write)  # 通过调用sys.settrace方法去获取执行过程的栈信息,通过这些栈信息可以用于记录日志分析,也能方便理解程序的运行过程,该函数在run()之前调用,它有call,line,return,exception,c_call,c_return,c_exception这些事件类型
        # sys.settrace(write)
        # threading.setprofile(write) # 通过调用sys.setprofile()获取栈信息,它不是每次都会调用。仅仅是在call一个方法时和方法return时调用。所以它的事件信息只有call,c_call,c_return, return,当一个异常已经发生,这个return事件也会返回
        print('stack_size', threading.stack_size(36864))  # 设置栈大小,最少为32KiB,即32768,每次增加的大小必须是4KiB,即4096大小,
        print('get_ident', threading.get_ident())  # 返回当前的线程唯一标识符
        print('active_count', threading.active_count())  # 当前有多少个正在运行的线程,等同于len(threading.enumerate())
        print('TIMEOUT_MAX', threading.TIMEOUT_MAX)  # 最大超时参数,用于Lock.acquire(),Rlock.acquire(),Condition.wait(),等使用超时参数等方法,如果设置一个比这个值还大等值,会出现OverflowError异常
        print('enumerate', threading.enumerate())
        event = threading.Event()
        t1 = threading.Thread(target=read, args=(event,))
        t1.daemon = False  # 设置子线程是否为守护线程,当为守护线程时,该线程将在后台运行,主线程执行完成就不会等待子线程执行是否完成,而是直接退出,如果不是守护线程,主线程执行完成,如果子线程没有完成,会继续等待
        # 创建一个线程对象有两种方式,一种是调用线程的构造方法,或者子类继承Thread,重写run方法,要启动这个线程对象,还需要调用线程的start()方法,然后会在独立的线程里管理调用run里面的代码,当线程start().
        # 我们为认为这个线程就是活动的状态,直到这个线程的run方法执行完成,或者run方法内部出现异常,则这个线程就是不活跃状态,可以通过is_alive()去获取线程状态
    
        t1.start()
        print('stack_size', threading.stack_size())
        print('worker is_set', event.is_set())
        event.clear()
        print('worker is_set', event.is_set())
        timer = threading.Timer(interval=5, function=event.set)
        timer.start()
        print('active_count', threading.active_count())
        print('enumerate', threading.enumerate())
        # 调用线程的join方法会阻塞其它线程的调用,直到调用join的那个线程方法调用终止,即如果有多个线程,都调用里join,那么会依次按谁先调用join,谁就先执行,直到调用join的那个线程执行完成,才会继续调用下一个线程,这样就等于把线程变成串行方式执行了,而不是并行
        # join实际上就是使用了线程锁,让同一时刻只能有一个线程在运行。
        # 如果线程不是daemon线程,分两张情况,
        # 一:timeout为None,主线程就会等待子线程执行完毕,才继续执行主线程后面的代码,直到主线程执行完,程序才退出
        # 二:timeout设置了大于0的浮点值,就会在该超时时间内等待子线程的返回,如果这个时间内,子线程没有执行完成,那么主线程不会继续等子线程,而是继续执行主线程后面的代码,最后如果主线程代码执行完了,如果子线程还没有执行完,会继续等待子线程,直到子线程完全返回,主线程才退出
        # 如果子线程是daemon线程,也分两种情况
        # 一: 如果timeout为None, 主线程会等待子线程执行完毕,才会继续执行主线程后面的代码,直到主线程执行完,程序退出
        # 二:如果timeout设置为大于0的浮点值,就会在该超时时间内等待子线程的返回,如果这个时间内,子线程没有执行完成,那么主线程不会继续等待子线程,而是继续执行主线程后面的代码,最后如果主线程代码执行完了,如果子线程还没有执行完,不会继续等待子线程,而是直接退出程序
    
        # 总结,在没有timeout时,不管是不是daemon线程,主线程都会等待子线程执行完成后才会继续执行主线程后面的代码,直到主线程代码执行完成,程序退出
        # 设置了timeout时,子线程超时后,主线程不会继续等待子线程返回结果,而是继续执行主线程代码,最终,主线程执行完成后,在非daemon状态时,如果子线程还没执行完成,会继续等待,如果是daemon状态,那么这时主线程是不会等待子线程完成,而是直接退出程序
        # t1.join(timeout=10.1)
        print('begin ...')
    
        print('all task done.')
    
    
    if __name__ == '__main__':
        worker()
    

    线程锁Lock基本概念

    #! /usr/bin/env python3
    # encoding: utf-8
    
    """
    @Author: zengchunyun
    @Date: 2017/7/12
    """
    import threading
    import time
    
    
    """
    lock有两种状态,一种是locked,一种是unlocked
    创建lock时。状态为unlocked,它有两个基本的方法,acquire()和release(),当状态为unlocked,acquire()改变这个状态为locked,
    并立即返回,当状态是locked时,调用acquire()方法时会阻塞当前线程,直到另一个线程调用它的release()方法,将状态改为unlocked。
    然后这个acquire()调用重置lock为locked状态,并立即返回,这个release()方法应该在lock状态为locked时调用,它会改变lock状态为unlocked,并立即返回,
    如果对一个已经是unlocked状态对lock调用release()时,会抛出RuntimeError错误
    
    
    当多个线程调用acquire()时进入阻塞状态,等待这个lock状态变为unlocked,只有一个线程会在release()被调用后,lock状态会变为unlocked后执行,
    具体是哪些线程会执行,并没有一个明确规定条件,且在代码实现上差异也很大
    
    """
    
    
    """
    lock支持使用上下文管理
    例如:
    with lock:
        # do something...
        
        
    它等效于下面这种写法
    lock.acquire()
    try:
        # do something...
    finally:
        lock.release()
    """
    
    lock = threading.Lock()
    
    
    def read(lock_obj):
        print('entry read function...')
        lock_status = lock_obj.acquire()  # acquire接受两个参数,blocking=True,timeout=-1,两个默认参数值
        # 当lock_status状态为True时,说明获取到锁了,当为False说明没有获得锁
        print(lock_status)
        time.sleep(5)
        print('read do something...')
        lock_obj.release()
        print('read done.')
    
    
    def write(lock_obj):
        print('entry write function...')
        # 当blocking设置为True,默认为True,线程会阻塞,直到lock状态变为unlocked
        # 当blocking设置为False,线程不会阻塞,如果一个调用使用blocking为True会被阻塞,并立即返回False,其它情况设置这个lock为locked,并返回True
        # 如果设置timeout值时,blocking值必须是True,默认值为True,所以可以不用指定blocking,也就是说,通常这两个参数最好不要同时存在
        # 当timeout为非-1时。线程会阻塞这个timeout值当秒数,如果超时了,依然没有获得锁,则不继续阻塞,但是返回值为False,也就是说lock状态为unlocked,所以当返回状态为False时,是不能调用lock的release()方法,否则抛异常
        # 当timeout为-1时,线程会进入无限当的等待状态,不允许在blocking为False时为timeout指定值
        lock_status = lock_obj.acquire(blocking=True, timeout=2)  # acquire接受两个参数,blocking=True,timeout=-1,两个默认参数值,当把blocking设置为False时,即不会阻塞该线程,会继续执行后面的代码,
        print(lock_status)
        print('write do something...')
        if lock_status:  # 需要判断是否获得锁,如果获得,状态为True,则需要释放锁
            lock_obj.release()
        print('write done.')
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=read, args=(lock,))
        t1.start()
        t2 = threading.Thread(target=write, args=(lock,))
        t2.start()
    
    
    # 首先要明白,获取锁时,默认是阻塞状态,直到获取到锁,可以给阻塞状态获取锁时设置等待超时时间,超过时间不管有没有获取到都会继续执行后面的代码
    # 对于一个新的锁对象,也就是刚创建的锁,第一次调用acquire()方法去获取时,不管是不是阻塞,都能获取到锁,也就是这个方法会返回True,但是
    # 当你再次使用这个锁对象acquire(False)不阻塞方式去获取锁,其结果如果还是True,那其结果只有一种,那就是之前肯定释放了一次锁,否则,对于
    # 已经拿到了锁,再次调用acquire(False)其结果将为False
    
    #  第一种情况
    import threading
    lock = threading.Lock()
    print(lock.acquire(True))  # 返回True
    print(lock.acquire(False))  # 返回False
    
    
    lock2 = threading.Lock()
    print(lock2.acquire(False))  # 返回True
    print(lock2.acquire(False))  # 返回False
    
    # 第二种情况
    lock3 = threading.Lock()
    print(lock3.acquire(True))  # 返回True
    print(lock3.release())
    print(lock3.acquire(False))  # 返回True
    
    
    lock4 = threading.Lock()
    print(lock4.acquire(False))  # 返回True
    print(lock4.release())
    print(lock4.acquire(False))  # 返回True
    

    多把锁正确使用方式

    import threading
    import time
    import random
    
    """
    2把锁对两个不同资源进行多线程操作
    """
    
    read_lock = threading.Lock()
    write_lock = threading.Lock()
    
    num = 0
    num2 = 0
    
    
    def read(r_lock, w_lock):
        r_lock.acquire()  # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
        global num  # 准备对共享数据进行操作
        time.sleep(random.randint(0, 3))  # 修改该资源比较费时,需要大约0-3秒
        num += 1  # 对共享数据加1
        # 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
        new_task = threading.Thread(target=write, args=(w_lock,))
        new_task.start()
        # 然后需要释放共享资源
        r_lock.release()
    
    
    def write(w_lock):
        current_thread = threading.current_thread()  # 获取当前线程
        print("write %s do something..." % current_thread.name)
        w_lock.acquire()  # 先锁定资源
        global num2  # 准备对共享资源进行操作
        time.sleep(random.randint(0, 2))  # 修改该资源比较费时,需要大约0-5秒
        num2 += 1  # 花费数秒秒才把资源修改完成
        w_lock.release()  # 资源修改完了
    
    
    if __name__ == '__main__':
        for task in range(10):  # 开启10个任务
            t1 = threading.Thread(target=read, args=(read_lock, write_lock))  # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
            t1.start()
        print('task done...')
        print(num)
        print(num2)
        while True:
            # 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
            if threading.active_count() == 1:
                print(num)
                print(num2)
                break
    

    错误使用锁方式

    """
    错误的加锁方式
    以下代码加锁方式不可取,一个线程内,应该加锁和解锁是成对的,且加完锁后,在一个线程内,不该再去加锁,必须先解锁后,再加锁,否则非常容易造成死锁
    """
    
    read_lock = threading.Lock()
    write_lock = threading.Lock()
    
    num = 0
    num2 = 0
    
    
    def read(r_lock, w_lock):
        r_lock.acquire()  # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
        global num  # 准备对共享数据进行操作
        time.sleep(random.randint(0, 3))  # 修改该资源比较费时,需要大约0-3秒
        num += 1  # 对共享数据加1
        # 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
        new_task = threading.Thread(target=write, args=(w_lock,))  # 注意,子线程如果使用也需要锁时,解锁操作必须在子线程内把锁释放完成
        new_task.start()
        # 然后需要释放共享资源
        w_lock.release()  # 不能在此处释放锁,这种代码设计很容易造成死锁现象,
        r_lock.release()
    
    
    def write(w_lock):
        current_thread = threading.current_thread()  # 获取当前线程
        print("write %s do something..." % current_thread.name)
        w_lock.acquire()  # 先锁定资源
        global num2  # 准备对共享资源进行操作
        time.sleep(random.randint(0, 2))  # 修改该资源比较费时,需要大约0-5秒
        num2 += 1  # 花费数秒秒才把资源修改完成
        w_lock.release()  # 此处注释后,很可能会有锁没有得到释放,这种写法容易造成死锁,就是递归锁也不能这样写,
    
    
    if __name__ == '__main__':
        for task in range(10):  # 开启10个任务
            t1 = threading.Thread(target=read, args=(read_lock, write_lock))  # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
            t1.start()
        print('task done...')
        print(num)
        print(num2)
        while True:
            # 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
            if threading.active_count() == 1:
                print(num)
                print(num2)
                break
    

    常见的死锁

    """
    使用同一把锁对不同资源加锁,没有及时释放,造成死锁
    """
    
    read_lock = threading.Lock()
    
    num = 0
    num2 = 0
    
    
    def read(r_lock):
        r_lock.acquire()  # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
        global num  # 准备对共享数据进行操作
        time.sleep(random.randint(0, 1))  # 修改该资源比较费时,需要大约0-3秒
        num += 1  # 对共享数据加1
        write(r_lock)  # 使用了同一把锁,且未先释放锁,会造成死锁
        # 然后需要释放共享资源
        r_lock.release()
    
    
    def write(w_lock):
        current_thread = threading.current_thread()  # 获取当前线程
        print("write %s do something..." % current_thread.name)
        w_lock.acquire()  # 先锁定资源
        global num2  # 准备对共享资源进行操作
        time.sleep(random.randint(0, 1))  # 修改该资源比较费时,需要大约0-5秒
        num2 += 1  # 花费数秒秒才把资源修改完成
        w_lock.release()  # 资源修改完了
    
    
    if __name__ == '__main__':
        for task in range(10):  # 开启10个任务
            t1 = threading.Thread(target=read, args=(read_lock,))  # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
            t1.start()
        print('task done...')
        print(num)
        print(num2)
        while True:
            # 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
            if threading.active_count() == 1:
                print(num)
                print(num2)
                break
    

    递归锁

    上面的这个死锁解决方式

    """
    使用1把递归锁可以解决上面对问题,但是上面对问题是可以避免的,只需先释放锁,在调用write(r_lock)方法,即可
    """
    
    read_lock = threading.RLock()
    
    num = 0
    num2 = 0
    
    
    def read(r_lock):
        r_lock.acquire()  # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
        global num  # 准备对共享数据进行操作
        time.sleep(random.randint(0, 3))  # 修改该资源比较费时,需要大约0-3秒
        num += 1  # 对共享数据加1
        # 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
        # new_task = threading.Thread(target=write, args=(w_lock,))
        # new_task.start()
        write(r_lock)
        # 然后需要释放共享资源
        r_lock.release()
    
    
    def write(w_lock):
        current_thread = threading.current_thread()  # 获取当前线程
        print("write %s do something..." % current_thread.name)
        w_lock.acquire()  # 先锁定资源
        global num2  # 准备对共享资源进行操作
        time.sleep(random.randint(0, 2))  # 修改该资源比较费时,需要大约0-5秒
        num2 += 1  # 花费数秒秒才把资源修改完成
        w_lock.release()  # 资源修改完了
    
    
    if __name__ == '__main__':
        for task in range(10):  # 开启10个任务
            t1 = threading.Thread(target=read, args=(read_lock,))  # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
            t1.start()
        print('task done...')
        print(num)
        print(num2)
        while True:
            # 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
            if threading.active_count() == 1:
                print(num)
                print(num2)
                break
    
  • 相关阅读:
    Uploader 文件上传
    filters过滤器的使用
    Calendar中遇到的问题
    中科院之旅
    Python基础教程:列表推导式详解
    不会也要知道的,Python四种实现排序的方法
    2021字节跳动校招秋招算法面试真题解题报告--leetcode19 删除链表的倒数第 n 个结点,内含7种语言答案
    2021字节跳动校招秋招算法面试真题解题报告--leetcode206 反转链表,内含7种语言答案
    求协方差
    国外卡组织的 交换费-interchangefee(发卡行服务费) 和 银联对比
  • 原文地址:https://www.cnblogs.com/zengchunyun/p/7155819.html
Copyright © 2011-2022 走看看