Thread基本方法
#! /usr/bin/env python3
# encoding: utf-8
"""
@Author: zengchunyun
@Date: 2017/7/12
"""
import threading
import sys
import time
def read(event):
print('get a task ...')
print('read is_set', event.is_set()) # 获取event状态,如果event状态为False,调用event.wait()就会进入阻塞状态,
# 直到event状态为True,才会继续执行event.wait()后面的代码
event.wait() # 此处会进入等待状态,不会继续执行下面的代码,直到event对象被设置为True,则会通知此wait进入非阻塞状态,即,会继续执行下面的代码
print('read is_set', event.is_set()) # 获取event状态,如果在wait后,通常该状态为True,如果wait后,又立即对event调用clear()方法,则该状态又为False
event.clear() # 将event状态设置为False
print('read is_set', event.is_set()) # 获取event状态,如果在wait后,通常该状态为True,如果wait后,又立即对event调用clear()方法,则该状态又为False
current_thread = threading.current_thread() # 获取当前线程对象
print('current_thread', current_thread)
# current_thread._tstate_lock=None
# current_thread._stop()
print('is_alive', current_thread.is_alive()) # 通过获取到当前线程对象后,可以对当前线程做一些操作,比如判断当前线程状态,这里对状态肯定是为True,因为如果为False,那就不可能执行到这里了.也可以通过一些特殊方式,在此处将该结果得到的值为False,不过这种方式对程序而言就没有太大意义了。
current_thread.name = 'read_thread' # 可以对当前线程修改名字
print(current_thread.name)
print('get_ident', threading.get_ident()) # 返回当前的线程唯一标识符,为非0的整数
print('main_thread', threading.main_thread()) # 获取主线程对象,通常这个主线程是有python解释器启动的
print("doing now")
time.sleep(50)
print('doing done.')
def write(frame, event, arg):
print('write', frame, event, arg, frame.f_lineno)
def worker():
# threading.settrace(write) # 通过调用sys.settrace方法去获取执行过程的栈信息,通过这些栈信息可以用于记录日志分析,也能方便理解程序的运行过程,该函数在run()之前调用,它有call,line,return,exception,c_call,c_return,c_exception这些事件类型
# sys.settrace(write)
# threading.setprofile(write) # 通过调用sys.setprofile()获取栈信息,它不是每次都会调用。仅仅是在call一个方法时和方法return时调用。所以它的事件信息只有call,c_call,c_return, return,当一个异常已经发生,这个return事件也会返回
print('stack_size', threading.stack_size(36864)) # 设置栈大小,最少为32KiB,即32768,每次增加的大小必须是4KiB,即4096大小,
print('get_ident', threading.get_ident()) # 返回当前的线程唯一标识符
print('active_count', threading.active_count()) # 当前有多少个正在运行的线程,等同于len(threading.enumerate())
print('TIMEOUT_MAX', threading.TIMEOUT_MAX) # 最大超时参数,用于Lock.acquire(),Rlock.acquire(),Condition.wait(),等使用超时参数等方法,如果设置一个比这个值还大等值,会出现OverflowError异常
print('enumerate', threading.enumerate())
event = threading.Event()
t1 = threading.Thread(target=read, args=(event,))
t1.daemon = False # 设置子线程是否为守护线程,当为守护线程时,该线程将在后台运行,主线程执行完成就不会等待子线程执行是否完成,而是直接退出,如果不是守护线程,主线程执行完成,如果子线程没有完成,会继续等待
# 创建一个线程对象有两种方式,一种是调用线程的构造方法,或者子类继承Thread,重写run方法,要启动这个线程对象,还需要调用线程的start()方法,然后会在独立的线程里管理调用run里面的代码,当线程start().
# 我们为认为这个线程就是活动的状态,直到这个线程的run方法执行完成,或者run方法内部出现异常,则这个线程就是不活跃状态,可以通过is_alive()去获取线程状态
t1.start()
print('stack_size', threading.stack_size())
print('worker is_set', event.is_set())
event.clear()
print('worker is_set', event.is_set())
timer = threading.Timer(interval=5, function=event.set)
timer.start()
print('active_count', threading.active_count())
print('enumerate', threading.enumerate())
# 调用线程的join方法会阻塞其它线程的调用,直到调用join的那个线程方法调用终止,即如果有多个线程,都调用里join,那么会依次按谁先调用join,谁就先执行,直到调用join的那个线程执行完成,才会继续调用下一个线程,这样就等于把线程变成串行方式执行了,而不是并行
# join实际上就是使用了线程锁,让同一时刻只能有一个线程在运行。
# 如果线程不是daemon线程,分两张情况,
# 一:timeout为None,主线程就会等待子线程执行完毕,才继续执行主线程后面的代码,直到主线程执行完,程序才退出
# 二:timeout设置了大于0的浮点值,就会在该超时时间内等待子线程的返回,如果这个时间内,子线程没有执行完成,那么主线程不会继续等子线程,而是继续执行主线程后面的代码,最后如果主线程代码执行完了,如果子线程还没有执行完,会继续等待子线程,直到子线程完全返回,主线程才退出
# 如果子线程是daemon线程,也分两种情况
# 一: 如果timeout为None, 主线程会等待子线程执行完毕,才会继续执行主线程后面的代码,直到主线程执行完,程序退出
# 二:如果timeout设置为大于0的浮点值,就会在该超时时间内等待子线程的返回,如果这个时间内,子线程没有执行完成,那么主线程不会继续等待子线程,而是继续执行主线程后面的代码,最后如果主线程代码执行完了,如果子线程还没有执行完,不会继续等待子线程,而是直接退出程序
# 总结,在没有timeout时,不管是不是daemon线程,主线程都会等待子线程执行完成后才会继续执行主线程后面的代码,直到主线程代码执行完成,程序退出
# 设置了timeout时,子线程超时后,主线程不会继续等待子线程返回结果,而是继续执行主线程代码,最终,主线程执行完成后,在非daemon状态时,如果子线程还没执行完成,会继续等待,如果是daemon状态,那么这时主线程是不会等待子线程完成,而是直接退出程序
# t1.join(timeout=10.1)
print('begin ...')
print('all task done.')
if __name__ == '__main__':
worker()
线程锁Lock基本概念
#! /usr/bin/env python3
# encoding: utf-8
"""
@Author: zengchunyun
@Date: 2017/7/12
"""
import threading
import time
"""
lock有两种状态,一种是locked,一种是unlocked
创建lock时。状态为unlocked,它有两个基本的方法,acquire()和release(),当状态为unlocked,acquire()改变这个状态为locked,
并立即返回,当状态是locked时,调用acquire()方法时会阻塞当前线程,直到另一个线程调用它的release()方法,将状态改为unlocked。
然后这个acquire()调用重置lock为locked状态,并立即返回,这个release()方法应该在lock状态为locked时调用,它会改变lock状态为unlocked,并立即返回,
如果对一个已经是unlocked状态对lock调用release()时,会抛出RuntimeError错误
当多个线程调用acquire()时进入阻塞状态,等待这个lock状态变为unlocked,只有一个线程会在release()被调用后,lock状态会变为unlocked后执行,
具体是哪些线程会执行,并没有一个明确规定条件,且在代码实现上差异也很大
"""
"""
lock支持使用上下文管理
例如:
with lock:
# do something...
它等效于下面这种写法
lock.acquire()
try:
# do something...
finally:
lock.release()
"""
lock = threading.Lock()
def read(lock_obj):
print('entry read function...')
lock_status = lock_obj.acquire() # acquire接受两个参数,blocking=True,timeout=-1,两个默认参数值
# 当lock_status状态为True时,说明获取到锁了,当为False说明没有获得锁
print(lock_status)
time.sleep(5)
print('read do something...')
lock_obj.release()
print('read done.')
def write(lock_obj):
print('entry write function...')
# 当blocking设置为True,默认为True,线程会阻塞,直到lock状态变为unlocked
# 当blocking设置为False,线程不会阻塞,如果一个调用使用blocking为True会被阻塞,并立即返回False,其它情况设置这个lock为locked,并返回True
# 如果设置timeout值时,blocking值必须是True,默认值为True,所以可以不用指定blocking,也就是说,通常这两个参数最好不要同时存在
# 当timeout为非-1时。线程会阻塞这个timeout值当秒数,如果超时了,依然没有获得锁,则不继续阻塞,但是返回值为False,也就是说lock状态为unlocked,所以当返回状态为False时,是不能调用lock的release()方法,否则抛异常
# 当timeout为-1时,线程会进入无限当的等待状态,不允许在blocking为False时为timeout指定值
lock_status = lock_obj.acquire(blocking=True, timeout=2) # acquire接受两个参数,blocking=True,timeout=-1,两个默认参数值,当把blocking设置为False时,即不会阻塞该线程,会继续执行后面的代码,
print(lock_status)
print('write do something...')
if lock_status: # 需要判断是否获得锁,如果获得,状态为True,则需要释放锁
lock_obj.release()
print('write done.')
if __name__ == '__main__':
t1 = threading.Thread(target=read, args=(lock,))
t1.start()
t2 = threading.Thread(target=write, args=(lock,))
t2.start()
# 首先要明白,获取锁时,默认是阻塞状态,直到获取到锁,可以给阻塞状态获取锁时设置等待超时时间,超过时间不管有没有获取到都会继续执行后面的代码
# 对于一个新的锁对象,也就是刚创建的锁,第一次调用acquire()方法去获取时,不管是不是阻塞,都能获取到锁,也就是这个方法会返回True,但是
# 当你再次使用这个锁对象acquire(False)不阻塞方式去获取锁,其结果如果还是True,那其结果只有一种,那就是之前肯定释放了一次锁,否则,对于
# 已经拿到了锁,再次调用acquire(False)其结果将为False
# 第一种情况
import threading
lock = threading.Lock()
print(lock.acquire(True)) # 返回True
print(lock.acquire(False)) # 返回False
lock2 = threading.Lock()
print(lock2.acquire(False)) # 返回True
print(lock2.acquire(False)) # 返回False
# 第二种情况
lock3 = threading.Lock()
print(lock3.acquire(True)) # 返回True
print(lock3.release())
print(lock3.acquire(False)) # 返回True
lock4 = threading.Lock()
print(lock4.acquire(False)) # 返回True
print(lock4.release())
print(lock4.acquire(False)) # 返回True
多把锁正确使用方式
import threading
import time
import random
"""
2把锁对两个不同资源进行多线程操作
"""
read_lock = threading.Lock()
write_lock = threading.Lock()
num = 0
num2 = 0
def read(r_lock, w_lock):
r_lock.acquire() # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
global num # 准备对共享数据进行操作
time.sleep(random.randint(0, 3)) # 修改该资源比较费时,需要大约0-3秒
num += 1 # 对共享数据加1
# 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
new_task = threading.Thread(target=write, args=(w_lock,))
new_task.start()
# 然后需要释放共享资源
r_lock.release()
def write(w_lock):
current_thread = threading.current_thread() # 获取当前线程
print("write %s do something..." % current_thread.name)
w_lock.acquire() # 先锁定资源
global num2 # 准备对共享资源进行操作
time.sleep(random.randint(0, 2)) # 修改该资源比较费时,需要大约0-5秒
num2 += 1 # 花费数秒秒才把资源修改完成
w_lock.release() # 资源修改完了
if __name__ == '__main__':
for task in range(10): # 开启10个任务
t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
t1.start()
print('task done...')
print(num)
print(num2)
while True:
# 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
if threading.active_count() == 1:
print(num)
print(num2)
break
错误使用锁方式
"""
错误的加锁方式
以下代码加锁方式不可取,一个线程内,应该加锁和解锁是成对的,且加完锁后,在一个线程内,不该再去加锁,必须先解锁后,再加锁,否则非常容易造成死锁
"""
read_lock = threading.Lock()
write_lock = threading.Lock()
num = 0
num2 = 0
def read(r_lock, w_lock):
r_lock.acquire() # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
global num # 准备对共享数据进行操作
time.sleep(random.randint(0, 3)) # 修改该资源比较费时,需要大约0-3秒
num += 1 # 对共享数据加1
# 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
new_task = threading.Thread(target=write, args=(w_lock,)) # 注意,子线程如果使用也需要锁时,解锁操作必须在子线程内把锁释放完成
new_task.start()
# 然后需要释放共享资源
w_lock.release() # 不能在此处释放锁,这种代码设计很容易造成死锁现象,
r_lock.release()
def write(w_lock):
current_thread = threading.current_thread() # 获取当前线程
print("write %s do something..." % current_thread.name)
w_lock.acquire() # 先锁定资源
global num2 # 准备对共享资源进行操作
time.sleep(random.randint(0, 2)) # 修改该资源比较费时,需要大约0-5秒
num2 += 1 # 花费数秒秒才把资源修改完成
w_lock.release() # 此处注释后,很可能会有锁没有得到释放,这种写法容易造成死锁,就是递归锁也不能这样写,
if __name__ == '__main__':
for task in range(10): # 开启10个任务
t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
t1.start()
print('task done...')
print(num)
print(num2)
while True:
# 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
if threading.active_count() == 1:
print(num)
print(num2)
break
常见的死锁
"""
使用同一把锁对不同资源加锁,没有及时释放,造成死锁
"""
read_lock = threading.Lock()
num = 0
num2 = 0
def read(r_lock):
r_lock.acquire() # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
global num # 准备对共享数据进行操作
time.sleep(random.randint(0, 1)) # 修改该资源比较费时,需要大约0-3秒
num += 1 # 对共享数据加1
write(r_lock) # 使用了同一把锁,且未先释放锁,会造成死锁
# 然后需要释放共享资源
r_lock.release()
def write(w_lock):
current_thread = threading.current_thread() # 获取当前线程
print("write %s do something..." % current_thread.name)
w_lock.acquire() # 先锁定资源
global num2 # 准备对共享资源进行操作
time.sleep(random.randint(0, 1)) # 修改该资源比较费时,需要大约0-5秒
num2 += 1 # 花费数秒秒才把资源修改完成
w_lock.release() # 资源修改完了
if __name__ == '__main__':
for task in range(10): # 开启10个任务
t1 = threading.Thread(target=read, args=(read_lock,)) # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
t1.start()
print('task done...')
print(num)
print(num2)
while True:
# 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
if threading.active_count() == 1:
print(num)
print(num2)
break
递归锁
上面的这个死锁解决方式
"""
使用1把递归锁可以解决上面对问题,但是上面对问题是可以避免的,只需先释放锁,在调用write(r_lock)方法,即可
"""
read_lock = threading.RLock()
num = 0
num2 = 0
def read(r_lock):
r_lock.acquire() # 先获取一把锁,保证同一时刻只能有一个线程操作共享数据
global num # 准备对共享数据进行操作
time.sleep(random.randint(0, 3)) # 修改该资源比较费时,需要大约0-3秒
num += 1 # 对共享数据加1
# 这时还想对一个比较耗费时间对资源进行操作,所以又开启一个新线程,该资源也是共享资源,所以也需要锁定资源
# new_task = threading.Thread(target=write, args=(w_lock,))
# new_task.start()
write(r_lock)
# 然后需要释放共享资源
r_lock.release()
def write(w_lock):
current_thread = threading.current_thread() # 获取当前线程
print("write %s do something..." % current_thread.name)
w_lock.acquire() # 先锁定资源
global num2 # 准备对共享资源进行操作
time.sleep(random.randint(0, 2)) # 修改该资源比较费时,需要大约0-5秒
num2 += 1 # 花费数秒秒才把资源修改完成
w_lock.release() # 资源修改完了
if __name__ == '__main__':
for task in range(10): # 开启10个任务
t1 = threading.Thread(target=read, args=(read_lock,)) # 传入2把锁,1把用于锁子线程锁定共享资源,1把用于给子线程开启的子线程锁定共享资源
t1.start()
print('task done...')
print(num)
print(num2)
while True:
# 如果当前剩下1个活动的线程,说明其它子线程任务都完成了,只剩下主线程了
if threading.active_count() == 1:
print(num)
print(num2)
break