一、concurrent.futures模块
此模块提供了高度封装的异步调用接口,支持进程池异步调用(ProcessPoolExecutor)和线程池异步调用(ThreadPoolExecutor),使用方式类似于进程池pool()中的异步调用。
1、进程池异步调用
异步调用实例:
from concurrent.futures import ProcessPoolExecutor import os,time,random def work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(4) #数字4代表限制的进程数量,不写,默认为cpu的个数 futures=[] for i in range(10): future=executor.submit(work,i) #submit为异步提交任务,返回结果对象 futures.append(future) executor.shutdown(wait=True) for obj in futures: print(obj.result()) #result()为获取结果,相当于pool中的get() print("主")
注:上例中,executor.shutdown(wait=True)相当于进程池的pool.close()+pool.join()操作,wait=True,等待池内所有任务执行完毕回收完资源后才继续;wait=False,立即返回,并不会等待池内的任务执行完毕,但不管wait参数为何值,整个程序都会等到所有任务执行完毕。
模拟同步调用实例:
from concurrent.futures import ProcessPoolExecutor import os,time,random def work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor() for i in range(10): future=executor.submit(work,i).result() print(future) executor.shutdown(wait=True) print("主")
2、线程池异步调用
线程池异步调用实例:
from concurrent.futures import ThreadPoolExecutor import os,time,random def work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(4) #4代表开启的线程的个数,不写,默认为cpu的个数乘5 futures=[] for i in range(10): future=executor.submit(work,i) futures.append(future) executor.shutdown(wait=True) for obj in futures: print(obj.result()) print('主')
3、map方法
map(func, *iterables, timeout=None, chunksize=1) ,取代for循环submit的操作,结果为包含函数结果的一个生成器。
实例:
from concurrent.futures import ThreadPoolExecutor import os,time,random def work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(4) futures=executor.map(work,range(10)) #可迭代对象的每一个值即为函数的参数 executor.shutdown(wait=True) print(type(futures)) #<class 'generator'> print(list(futures)) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] print('主')
4、回调函数
与pool中的回调函数不同的是获得函数执行完返回给解析函数的是一个结果对象,所以解析函数在执行时需要通过result()方法取到结果。具体回调格式如下代码所示:
from concurrent.futures import ProcessPoolExecutor import os,random,time import requests def get(url): print('%s GET %s' %(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: print('%s DONE %s' % (os.getpid(), url)) return {'url':url,'text':response.text} def parse(future): dic=future.result() #返回的是一个对象 print('%s PARSE %s' %(os.getpid(),dic['url'])) time.sleep(1) res='%s:%s ' %(dic['url'],len(dic['text'])) with open('db.txt','a') as f: f.write(res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=ProcessPoolExecutor(4) for url in urls: p.submit(get,url).add_done_callback(parse) p.shutdown(wait=True) print('主')
二、多线程中的同步锁
在python的多线程中GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。所有线程抢的是GIL锁,或者说所有线程抢的是执行权限。线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。这就导致了串行运行的效果。
1、多线程无互斥锁情况
开启的多线程会在极短的时间内完成,所有线程有都有可能同时拿到n=100,然后抢到GIL锁的线程在遇到阻塞后会释放锁,其他线程再去竞争GIL锁。
from threading import Thread import os,time n=100 def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
2、多线程增加互斥锁情况
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
分析:100个线程去抢GIL锁,即抢执行权限,肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire(),极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL,直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复以上的过程。
三、死锁现象与递归锁
1、死锁现象
是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁:
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('