1 GIL全局解释器锁定义
定义:在一个线程拥有了解释器的访问权后,其他的所有线程都必须等待他释放解释器的访问权,即这些线程的下一条指令并不会互相影响。
缺点:多处理器退化为单处理器
优点:避免大量的加锁解锁操作
无论你启多少个线程,你有多少个cpu,python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。
Python是无法利用多核cpu实现多线程的
总结:
对于计算密集型任务,python的多线程并没有用
对于IO密集型任务,python的多线程是有意义的
python使用多核:开进程,弊端:开销大而且切换复杂
着重点:协程+多进程
方向:IO多路复用
终极思路:换C模块实现多线程
2 同步锁
锁通常被用来实现对共享资源的同步访问,为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其他线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完,再调用release方法释放锁:
import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 #num-=1 #获取的值为0 与temp=num num=temp-1是同一个意思 唯一在于time.sleep() lock.acquire()#添加锁 temp=num time.sleep(0.001)#IO操作 如果时间在0.1时,足够让一条线程完成后再走另一条线程 取值从100开始,99 #如果时间为0.001时,100条线程运行时出现不确定性,有可能走到第五条的时候就完成IO操作,前五条基数为100,第六条基数就是99 num =temp-1 # 对此公共变量进行-1操作 lock.release()#解锁 num = 100 #设定一个共享变量 thread_list = [] lock=threading.Lock()#获取一把锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('Result: ', num)
3 死锁和递归锁
死锁就是指两个或者两个以上进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁这些永远在互相等待的进程被称为死锁进程。
import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 如果锁被占用,则阻塞在这里,等待锁的释放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
递归锁
Python中为了支持在同一线程中多次请求统一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acpuire都被release,其他的线程才能获得资源。
递归锁用来解决死锁
import threading import time Rlock=threading.RLock()#生成一个递归锁 class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): Rlock.acquire() # 一条线程运行,其他线程等待 count=1 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) Rlock.acquire()#同一条线程运行,其他线程等待中。。。count=2,递归锁 print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) Rlock.release()#同一线程运行完成后,count=1 Rlock.release()#同一线程运行完成后,count=0 ,其他线程才能获得资源 def fun2(self): Rlock.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) Rlock.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) Rlock.release() Rlock.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
4 Event对象
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得棘手,为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,他允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么他将忽略这个事件,继续执行
event对象 线程之间的简单通信 event.set()更改标志位为True, 所有阻塞池的线程激活进入就绪状态, 等待操作系统调度 event.wait()等待;还可以接收一个超时参数,默认情况下超过这个参数设定的值之后,wait方法会返回 event.clear()恢复event的状态值为False event.isSet():返回event的状态值
右边的线程需要左边线程运行的时候,用Event.set来更改状态,为True;
如果右边的线程状态一直为默认为假,左边的线程则一直等待,不会执行。
应用场景:
我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait()#由于状态默认为假,需等待 logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main():#主函数 readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')#产生一个对象 t1.start()#开始运行 t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start()#产生一个对象 logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set()#更改状态为True,默认为False if __name__=="__main__": main()
5 Semaphore(信号量)
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
import threading import time semaphore = threading.Semaphore(5) def func(): semaphore.acquire() print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20):#总数为20,最大连接为5,分为4次 t1 = threading.Thread(target=func) t1.start()