一、管道---Pipe
这是一个单向流动的管道,一次产生一对。看代码:
from multiprocessing import Pipe,Process def f(c): print(c.recv())#接收不需要表明个数 if __name__ == '__main__': conn1, conn2 = Pipe()#一次产生两个通道 p = Process(target=f, args=(conn2,)) p.start() conn1.send('哈哈哈哈哈akfnaslknf')#发送不需要使用字节流 看看源码: def Pipe(duplex=True):#duplex:有两部分的 return Connection(), Connection()#这里返回了两个
二、事件--Event
def f(e): print('hello,Python!!') e.set()#把状态修改为True,如果不修改,程序会停不下来 if __name__ == '__main__': ev = Event()#初始状态是False print(ev.is_set())#查看事件当前的状态 p = Process(target=f, args=(ev,)) p.start() # ev.clear()#设置事件状态为False
三、信号量---Semaphore
先看代码吧:
import random
From fultiprocessing import Process,Semaphore
def f(s):
s.acquire()#这里会-1
# time.sleep(2)#(1)
time.sleep(random.randint(1, 5))#(2)
print('hello')
s.release()#这里会+1
if __name__ == '__main__':
s = Semaphore(4)#设置一个数量
for i in range(10):
p = Process(target=f, args=(s,))
p.start()
说明一下:信号量就像是创建了4个通道,进去一个进程会把们锁上,别人来了没有钥匙进不去,只有等某个进程完成了,其他的进程才可以进来。
所以,使用(1)处的方式打印出来时,就是四个一组的被打印出来,而使用(2)处的方式打印时,就有点随机了。这是因为,每次随机出来的时间不一样,所以等待的时间也不一样,某个进程完成后其他的就可以抢夺这个通道。
四、进程池---Pool
- map()
def f(n): time.sleep(1) print(n) if __name__ == '__main__': po = Pool(4) po.map(f, range(10))#使用方式类似前面的方式,第一个参数是函数名,第二个是可迭代的参数
使用线程池,做操作比直接使用进程,要节省时间,看下代码:
# def fn(n): # for i in range(5): # n += i # # if __name__ == '__main__': # st = time.time() # po = Pool(os.cpu_count()) # po.map(fn, range(100)) # et = time.time() # # # pst = time.time() # lst = [] # for i in range(100): # p = Process(target=fn, args=(i,)) # p.start() # lst.append(p) # [pro.join() for pro in lst] # pet = time.time() # print(et - st) # print(pet-pst)
2.进程池---apply()
def f(n): # print(n*n) return n*n if __name__ == '__main__': po = Pool(4) for i in range(10): res = po.apply(f, args=(i,))#还可以接收返回值 print(res)
这个执行顺序是同步的,那么和下面的代码有什么区别吗?
def f(n): pass for i in range(10): p = Process(target=f,args=(i,)) p.start()
上面两段代码都是串行,但是还是有很大区别的:(1)使用进程池,其实只是创建了4个进程,而且这四个进程是一直被复用的,直到任务完成才销毁,而下面这种却创建了10个进程。(2)
3.进程池---apply_async()
看代码:
def f(n): # print(n) time.sleep(1) return n**2 if __name__ == '__main__': po = Pool(4) res_lst = []#收集结果集 for i in range(10): res = po.apply_async(f, args=(i,))#也可以接收返回值,但是接收到的是结果对象,不是直接的值。 #res.get()#这个get()与队列Queue的get()类似,不拿到值,誓不罢休,所以就会等着进程处理完返回了值后再继续往下走, # 但是这样循环就没有意义了。所以把这个结果集收集起来,然后在统一处理。 res_lst.append(res) #集中处理结果集 for r in res_lst: print(r.get()) #使用close()为的是锁住进程池,不让别的程序继续往这个进程池丢任务,这样做完自己的任务后就可以结束了。 po.close()#锁住进程池 po.join()#等待所有任务被执行完,不然主进程结束,子进程也跟着结束了
4.进程池---apply_async(),回调函数
看代码:
def f(i): return i**2 def callBackFunc(n): print(n) if __name__ == '__main__': po = Pool(4) for i in range(10): #callback=callBackFunc这里就是一个函数名 po.apply_async(f, args=(i,), callback=callBackFunc) po.close() po.join()