GIL是用来锁线程的,保证了数据的安全,那么有了GIL为什么还要用锁呢?因为有了GIL还是会出现数据不安全的现象,所以还要用锁
先看以下代码
import time from threading import Thread,Lock n = 100 def func(): global n # n -=1 在执行时会被拆分为 # tmp = n -1 # n = tnp tmp = n - 1 n = tmp if __name__ == '__main__': l = [] for i in range(100): t = Thread(target=func) t.start() l.append(t) for t in l: t.join() print(n) 结果: 0 import time from threading import Thread n = 100 def func(): global n # n -=1 tmp = n - 1 time.sleep(0.1) #强制陷入阻塞,移交全局解释器锁 n = tmp if __name__ == '__main__': l = [] for i in range(100): t = Thread(target=func) t.start() l.append(t) for t in l: t.join() print(n) 结果: 99
我们只是在第二段代码中阻塞了0.1秒,但却出现了不同的结果,这是为什么呢
在代码的角度看,计算机在执行n-=1时,会拆分为两步执行 1. tmp = n - 1 2. n = tmp 而在计算机时间片的轮转当中,每700条指令全局解释性锁会让出一次 比如现有a和b两条线程轮流去cpu当中执行指令 a线程中有两条指令 : tmp = n - 1 n = tmp b线程中也有俩条指令: tmp = n -1 n = tmp 因为有全局解释器锁,当a线程去cpu当中执行指令的时候,执行到 tmp = n-1 时,
可能会发生全局解释器锁让出,此时tmp=99,但是并没有执行 n=tmp 进行赋值,
此时cpu开始执行b线程中的指令,执行完赋值后此时 n=99,b线程执行完再去执行a线程,
此时进行赋值 n = 99 执行了两次 - 1操作 ,但是最后得到的值却是 99 即便是有了全局解释器锁,仍然有可能出现数据的不安全
眼见为实,看看指令是怎样走的
import dis n = 1 def func(): global n n -1 dis.dis(func) 结果: 168 0 LOAD_GLOBAL 0 (n) 2 LOAD_CONST 1 (1) 4 BINARY_SUBTRACT 6 POP_TOP 8 LOAD_CONST 0 (None) 10 RETURN_VALUE import dis n = 1 def func(): global n n -= 1 dis.dis(func) 结果: 168 0 LOAD_GLOBAL 0 (n) 2 LOAD_CONST 1 (1) 4 INPLACE_SUBTRACT 6 STORE_GLOBAL 0 (n) #其他的不看,有存储这步就是不安全的 8 LOAD_CONST 0 (None) 10 RETURN_VALUE
只要是两个或两个以上的线程对公共数据进行赋值操作,就有可能会发生数据的不安全
列表/字典等中的方法 l.append l.insert dic.update等都是线程安全的
而像 l[0]+=1 或 d[k] +=1 是不安全的
死锁
死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('赵教授',)).start() Thread(target=eat2,args=('钱教授',)).start() Thread(target=eat1,args=('孙教授',)).start() Thread(target=eat2,args=('李教授',)).start() 结果: 赵教授拿到面条了 赵教授拿到叉子了 赵教授开始吃面 赵教授放下叉子了 钱教授拿到叉子了 赵教授放下面了 孙教授拿到面条了 赵教授执行的是eat1,很顺利的吃到了面,先放下的叉子,而钱教授执行的是eat2,并且在赵教授放下叉子时就获取到了叉子,但是钱教授执行了一段话,在钱教授执行这段话的时候,面条被放下了,面条别执行eat1的孙教授得到了,面条无法吃下去了,出现了死锁
死锁不上必然发生的,有偶然的情况整个程序都崩溃了
出现的原因:每个线程之中不止一把锁,并且套着使用
解决方法:如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制,而应看做一个资源,使用一把锁
解决
import time from threading import Thread,Lock lock = Lock() def eat1(name): lock.acquire() print('%s拿到面条了'%name) print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) lock.release() print('%s放下叉子了' % name) print('%s放下面了' % name) def eat2(name): lock.acquire() print('%s拿到叉子了' % name) print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) lock.release() print('%s放下面了' % name) print('%s放下叉子了' % name) Thread(target=eat1,args=('赵教授',)).start() Thread(target=eat2,args=('钱教授',)).start() Thread(target=eat1,args=('孙教授',)).start() Thread(target=eat2,args=('李教授',)).start()
递归锁 rlock
当然在实际当中,肯定是你不知不觉的写的,不会摆在明面上,解决的时间会很长,而此时又需要立马解决,此时就要用到递归锁了,临时解决问题.
互斥锁和递归锁的区别
互斥锁:无论在相同的线程还是不同的线程,都只能连续acpuice一次,要想在acquire,必须先release
递归锁:在同一线程中,可以无限次的acquice,但是想要在其他线程中也acquiece,必须实现在自己的线程中添加和acquire次数相同的release,在同一线程内不会出现死锁现象
from threading import RLock rlock = RLock() rlock.acquire() rlock.acquire() rlock.acquire() print('锁不住')
在多线程中
from threading import RLock,Thread rlock = RLock() def func(num): rlock.acquire() print('a',num) rlock.acquire() print('b',num) rlock.release() rlock.release() Thread(target=func,args=(1,)).start() Thread(target=func,args=(2,)).start()
使用递归锁解决教授吃面的问题
import time from threading import RLock,Thread noodle_lock = fork_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('赵教授',)).start() Thread(target=eat2,args=('钱教授',)).start() Thread(target=eat1,args=('孙教授',)).start() Thread(target=eat2,args=('李教授',)).start()
线程池
concurrent.futures:模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法
submit(fn, *args, **kwargs) 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 result(timeout=None) 取得结果 add_done_callback(fn) 回调函数
线程池示例
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 tp = ThreadPoolExecutor(5) ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())
进程池示例
import os import time import random from threading import currentThread from concurrent.futures import ProcessPoolExecutor def func(num): print('in %s func '%num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': tp = ProcessPoolExecutor(5) ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())
多线程/多进程 秒切换
import os import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool # from concurrent.futures import ProcessPoolExecutor as Pool def func(num): print('in %s func'%num,currentThread()) # print('in %s func' %num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': tp = Pool(5) ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) tp.shutdown() for ret in ret_l: print(ret.result())
简便用法 map
import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 if __name__ == '__main__': tp = Pool(5) ret = tp.map(func,range(30)) for i in ret: print(i)
回调函数 add_done_callback
from threading import currentThread from concurrent.futures import ThreadPoolExecutor as Pool def func1(num): print('in func1 ',num,currentThread()) return num*'*' def func2(ret): print('--->',ret.result(),currentThread()) tp = Pool(5) print('主 : ',currentThread()) for i in range(10): tp.submit(func1,i).add_done_callback(func2
回调函数收到的参数是需要使用result()获取的
回调函数是由主线程执行的
回调函数例子
from concurrent.futures import ThreadPoolExecutor as Pool from urllib.request import urlopen def func(name,url): content = urlopen(url).read() return name,content def parserpage(ret): name,content = ret.result() with open(name,'wb') as f: f.write(content) urls = { 'baidu.html':'https://www.baidu.com', 'python.html':'https://www.python.org', 'openstack.html':'https://www.openstack.org', 'github.html':'https://help.github.com/', 'sina.html':'http://www.sina.com.cn/' } tp = Pool(2) for k in urls: tp.submit(func,k,urls[k]).add_done_callback(parserpage)
信号量
import time from threading import Semaphore,Thread def func(name,sem): sem.acquire() print(name,'start') time.sleep(1) print(name,'stop') sem.release() sem = Semaphore(5) #同时执行5个线程 for i in range(20): Thread(target=func,args=(i,sem)).start()
信号量和池 进程池 有1000个任务 一个进程池中有5个进程 所有的1000个任务会多次利用这五个进程来完成任务 信号量 有1000个任务 有1000个进程/线程 所有的1000个任务由于信号量的控制,只能5个5个的执行
事件
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
示例
import time import random from threading import Thread,Event def connect_sql(e): count = 0 while count < 3: e.wait(0.5) if e.is_set(): print('连接数据库成功') break else: print('数据库未连接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() Thread(target=connect_sql,args=(e,)).start()
定时器
from threading import Timer def func(): print('执行我啦') t = Timer(3,func) # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适 t.start() print('主线程的逻辑')
条件