线程池
线程池各种方法的使用
#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor():线程池,提供异步调用 ProcessPoolExecutor(): 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数各
#用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random #定义一个函数 def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': #配置线程池里的线程,不写默认cpu*5,默认20 executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): #往线程池里面提交任务,就是函数名和参数 future=executor.submit(task,i) futures.append(future) #主进程等待进程池里的进程运行结束后再结束 executor.shutdown(True) print('+++>') for future in futures: print(future.result())
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import time,random,os # import threading # # def work(n): # print('%s is running' %threading.current_thread().getName()) # time.sleep(random.randint(1,3)) # return n**2 # if __name__ == '__main__': # executor=ThreadPoolExecutor() # futrues=[] # for i in range(30): # future=executor.submit(work,i) # futrues.append(future) # executor.shutdown(wait=True) # print('主') # for obj in futrues: # print(obj.result())
#map方法的使用 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import time,random,os # # def work(n): # print('%s is running' %os.getpid()) # time.sleep(random.randint(1,3)) # return n**2 # if __name__ == '__main__': # executor=ProcessPoolExecutor() # # futures=[] # # for i in range(10): # # future=executor.submit(work,i) # # futures.append(future) # # executor.map(work,range(10)) # executor.shutdown(wait=True) # print('主')
#回调函数 # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # import requests # import os # import time,random # def get(url): # print('%s GET %s' %(os.getpid(),url)) # response=requests.get(url) # time.sleep(random.randint(1,3)) # if response.status_code == 200: # print('%s DONE %s' % (os.getpid(), url)) # return {'url':url,'text':response.text} # # def parse(future): # dic=future.result() # print('%s PARSE %s' %(os.getpid(),dic['url'])) # time.sleep(1) # res='%s:%s ' %(dic['url'],len(dic['text'])) # with open('db.txt','a') as f: # f.write(res) # # if __name__ == '__main__': # urls=[ # 'https://www.baidu.com', # 'https://www.python.org', # 'https://www.openstack.org', # 'https://help.github.com/', # 'http://www.sina.com.cn/' # ] # p=ProcessPoolExecutor() # start_time=time.time() # objs=[] # for url in urls: # # obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活 # p.submit(get,url).add_done_callback(parse) # parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果 # # # p.close() # # p.join() # # print('主',(time.time()-start_time))
死锁/递归锁
通过引用来进行多次计数,只要计数不为0(不释放锁),其他线程不能抢锁和运行
#递归锁的特点: #1:可以acquire多次 #2:每acquire一次,计数加1,只要计数不为0,就不能被其他线程抢到 #互斥锁的特点: #1:只能acquire一次 mutexA=mutexB=RLock() # mutexA=mutexB=Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 抢 A锁' %self.name) mutexB.acquire() print('%s 抢 B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 抢 B锁' %self.name) time.sleep(0.1) mutexA.acquire() print('%s 抢 A锁' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
信号量
限制最大链接数与线程池/进程池不一样/ 信号量是有一堆线程/进程去抢定义好的信号量
而池是规定好只能有多少进程或者线程, 通俗点就是一个定义有几个茅坑,一个定义同是只能有几个人排队而且不能有新人加入
from threading import Semaphore,Thread,current_thread import time,random def task(): with sm: print('%s 正在上厕所' %current_thread().getName()) time.sleep(random.randint(1,3)) if __name__ == '__main__': sm=Semaphore(5) for i in range(11): t=Thread(target=task) t.start()
事件Event
线程之间的协同工作,一个检测,合格之后运行,共享数据间的运行,当一个线程运行到需要另一个线程开始执行时就需要用到事件,无非就是发送一个信号量,FALSE或者TURE
多个线程间干一个活才能用到,独立的线程间用不到
from threading import Event,current_thread,Thread import time e=Event() def check(): print('%s 正在检测' %current_thread().getName()) time.sleep(5) e.set() def conn(): count=1 while not e.is_set(): if count > 3: raise TimeoutError('连接超时') print('%s 正在等待%s连接' % (current_thread().getName(),count)) e.wait(timeout=1) count+=1 print('%s 开始连接' % current_thread().getName()) if __name__ == '__main__': t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
定时器Timer
定时器,指定n秒后执行某操作
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
线程queue
队列:先进先出 堆栈:后进先出
- class
queue.
Queue
(maxsize=0) #先进先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
#VIP用户设置优先级链接
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') '''