concurrent.futures 模块
#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
两者都实现了相同的接口,该接口由抽象执行器类定义。
#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务
#map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作,只能单纯的提交,并且没有返回的结果
#shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
#result(timeout=None)
取得结果
#add_done_callback(fn)
回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os,threading # 进程池,线程池 def work(n): # print('%s is working' %os.getpid()) # 当前进程名 print('%s is working' %threading.current_thread().getName()) # 当前线程名 time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': # executor=ProcessPoolExecutor() executor=ThreadPoolExecutor() # futrues=[] # for i in range(10): # #future=executor.submit(work,i) 1 # ## print(executor.submit(work,i).result()) # 模拟同步的效果 # #futrues.append(future) 2 # futrues.append(executor.submit(work,i)) # 1,2合并 futures = [executor.submit(work, i) for i in range(10)] # 列表推导式 executor.shutdown(wait=True) print('主') for obj in futures: print(obj.result())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,random,os,requests def get(url): print('%s GET %s'%(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code==200: print('%s DONE %s' %(os.getpid(),url)) return {'url':url,'text':response.text} def parse(future): dic=future.result() print('%s PARSE %s'%(os.getpid(),dic['url'])) time.sleep(1) res='%s:%s ' %(dic['url'],len(dic['text'])) with open('db.txt','a') as f: f.write(res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=ProcessPoolExecutor() start_time=time.time() objs=[] for url in urls: p.submit(get,url).add_done_callback(parse) #主进程负责干回调函数的活 # parse拿到的是一个future对象obj,需要用obj.result()拿到结果 print('主',(time.time()-start_time))
import queue # q=queue.Queue(3) #队列 # q.put(1) # q.put(2) # q.put(3) # q.put(4) # print(q.get()) # print(q.get()) # print(q.get()) q=queue.PriorityQueue(4) #数字越小优先级越高,谁小谁级别高 q.put((10,'aaa')) q.put((7,'bbb')) q.put((30,'ddd')) q.put((30,'ccccccccc')) # print(q.get()) # print(q.get()) # print(q.get()) # print(q.get()) # 序列类型 s1='abc' s2='b' print(s1 > s2) # False # l1=[1,'a',3] # l2=[1,'b'] # print(l2 > l1) # True c1=[1,3,5] c2=[5,3,1] print(c2 > c1) # True l1=[1,'a',3] l2=[1,3,4] print(l2 > l1) # 不同类型报错 q=queue.LifoQueue(3) #后进先出,堆栈 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
信号量
同进程的一样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
意思是当计数器为0时,某个线程想acquire这个锁时,获取不到,阻塞了。等其它的已经获取的锁的线程调用release后,计数器就变为1,然后才能调用acquire
from threading import Semaphore,Thread,current_thread import time,random def task(): with sm: print('%s tasking' %current_thread().getName()) time.sleep(random.randint(1,3)) if __name__ == '__main__': sm=Semaphore(5) for i in range(11): t=Thread(target=task) t.start()
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
from threading import Timer def hello(n): print("hello, world",n) t = Timer(2, hello,args=(1,)) t.start() # after 1 seconds, "hello, world" will be printed # 2秒后,“hello,world”将被打印。
Event对象
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。
为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。
如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。
event的四种方法
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
event示例:
from threading import Event,current_thread,Thread import time e=Event() def check(): print('%s 正在检测' %current_thread().getName()) time.sleep(5) e.set() def conn(): count=1 while not e.is_set(): if count > 3: raise TimeoutError('连接超时') print('%s 正在等待%s连接' % (current_thread().getName(),count)) e.wait(timeout=1) # 指定时间到了就不管了 count+=1 print('%s 开始连接' % current_thread().getName()) if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
死锁与递归锁
from threading import Thread,Lock,RLock import time # mutexA=Lock() # mutexB=Lock() #递归锁的特点: #1:可以acquire多次 #2:每acquire一次,计数加1,只要计数不为0,就不能被其他线程抢到 #互斥锁的特点: #1:只能acquire一次 mutexA=mutexB=RLock() # mutexA=mutexB=Lock() 死锁 # 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象, # 若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 抢 A锁' %self.name) mutexB.acquire() print('%s 抢 B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 抢 B锁' %self.name) time.sleep(0.1) mutexA.acquire() print('%s 抢 A锁' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()