进程池:
1.进程池初识,2.效率比较,3.同步和异步,4.进程池的返回值和回调函数,5.socket并发的服务端
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
multiprocess.Pool模块:
Pool([numprocess [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用 主要方法
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
进程池初识:
#进程池 #开辟一个空间,固定放着固定数目的进程,比如5个。现在有50个任务,每次任务一执行,就去池子里找这5个进程,一次执行五个任务, #执行完的任务,把进程放回进程池,供其他任务继续使用。从而大大减少了进程的开启和销毁的内存占用,提高了效率 #更高级的进程池 #···python里没有 #假设 进程池最少的时候有n个进程,最多是m个 #当我们访问网站,用户量多的时候,进程池的进程,就会根据算法,慢慢增加,增加到m个 #当访问量,或者任务下降,进程池就会 减减减 减到最少 n个 #信号量:是同一段代码,只能同一时间由几个进程执行,但是这些进程数都是要悉数开启和销毁的,如果有200个进程,这些进程都会占用内存 #进程池:进程都是固定的,不用每次都开启和销毁
进程池和普通多进程的效率比较:
from multiprocessing import Pool,Process import time def func(n): for i in range(10): print(n+1) if __name__ == '__main__': pool = Pool(5) #一般进程数是 cpu核数+1 start = time.time() pool.map(func,range(100)) #开启了一百个任务,map是自带join,close的 #map的功能最多就只能这样了,如果想传更多参数,可以iterable里可以传tuple,list.. t1 = time.time() - start start2 = time.time() p_lst = [] for i in range(100): p = Process(target=func,args=(i,)) p.start() p_lst.append(p) [p.join() for p in p_lst] t2 = time.time() - start2 print(t1,t2) # 0.19 VS 2.61 进程池完胜
同步: apply 异步: apply_async(func, args=(args1,args2,...),callback=func2):
import os import time from multiprocessing import Pool,Process def func(n): print('%s 开始了~~~'%n,os.getpid()) time.sleep(1) print('%s 结束了'%n,os.getpid()) if __name__ == '__main__': pool = Pool(5) for i in range(10): # pool.apply(func,args=(i,)) #apply 进程池的同步调用(自然不需要close,join),要等待进程中的任务完结,才会执行下一个任务.基本上不会用 pool.apply_async(func,args=(i,))#apply_async异步, 一但有任务结束了,它所使用的进程(pid)马上会被其他任务使用 #需要close和join,不然会导致主代码的结束,从而结束进程中的任务 pool.close() #结束进程池接收任务 pool.join() #感知所有任务的结束,然后在执行主程序接下来的代码
#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC 进程间通信:队列(子进程put,主进程get),管道
#但是进程池 是可以直接接收子进程的返回值的
#apply的结果就是func的返回值
#通过get方法,获取 apply_async的返回值
#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC 进程间通信:队列(子进程put,主进程get),管道 #但是进程池 是可以直接接收子进程的返回值的 #apply的结果就是func的返回值 from multiprocessing import Pool import time def func(n): time.sleep(0.2) return n*n if __name__ == '__main__': pool = Pool(5) ret_lst=[] for i in range(10): # ret = pool.apply(func, args=(i,)) ret = pool.apply_async(func,args=(i,)) #print(ret.get()) #get会阻塞着,直到拿到ret的结果。每个ret马上跟着一个get,所以会按顺序一个个print #一旦异步提交了一个任务,返回一个对象,这个对象获得一个get方法,获取这个任务的返回值,所以get可以替代close和join ret_lst.append(ret) for ret in ret_lst:print(ret.get()) # pool.close() # pool.join() # ret = pool.map(func,range(10)) # print(ret) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] #因为map自带close和join,它会把所有的值都计算好了,再打印出来。 #所以虽然map简单,但是apply_async可以实时提交,还是要用后者
回调函数不能传参数,唯一的参数就是注册函数的返回值
回调函数 import os from multiprocessing import Pool def func1(n): print('in func1',os.getpid()) return n*n def func2(nn): print('in func2',os.getpid()) print(nn) if __name__ == '__main__': print('主进程 :',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func1,args=(10,),callback=func2) #回调函数不能传参数,唯一的参数就是注册函数的返回值 p.close() #回调函数是在主进程里执行的 p.join()
import requests from multiprocessing import Pool def get(url): #让并发进程去获取url,把网络延时交给并发,把数据处理返回给主进程 res = requests.get(url) if res.status_code == 200: return url,res.content.decode('utf8') def call_back(args): url,content = args print(url,len(content)) if __name__ == '__main__': url_lst = [ 'https://www.cnblogs.com/', 'http://www.baidu.com', 'https://www.sogou.com/', 'http://www.sohu.com/', ] p = Pool(5) for url in url_lst: p.apply_async(get,args=(url,),callback=call_back) p.close() p.join()
url = r'https://movie.douban.com/subject/26281899/?from=subject-page' import requests res = requests.get(url) print(res) #返回为一个 <Response [200]> print(res.status_code) #状态码:200 print(res.content) #二进制的网页源码 (无格式版的) print(res.text) #网页源码 # from urllib.request import urlopen # ret = urlopen(url) # print(ret.read().decode('utf8')) #获得一个跟原网页源码一样格式的内容
# apply # 同步的:只有当func执行完之后,才会继续向下执行其他代码 # ret = apply(func,args=()) # 返回值就是func的return # apply_async # 异步的:当func被注册进入一个进程之后,程序就继续向下执行 # apply_async(func,args=()) # 返回值 : apply_async返回的对象obj # 为了用户能从中获取func的返回值obj.get() # get会阻塞直到对应的func执行完毕拿到结果 # 使用apply_async给进程池分配任务, # 需要先close后join来保持多进程和主进程代码的同步性
使用进程池来实现socket服务端的并发:
import socket from multiprocessing import Pool def server(conn): conn.send(b'hi') ret = conn.recv(1024).decode('utf8') print(ret) conn.send(b'hello') conn.close() if __name__ == '__main__': pool = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() pool.apply_async(server,args=(conn,)) sk.close() # pool.close() # pool.join()
import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) print(sk.recv(1024)) msg = input('>>>> ') sk.send(msg.encode('utf8')) msg = sk.recv(1024).decode('utf8') print(msg) # sk.close()