内容梗概:
1.管道
2.数据共享
3.进程池
4.回调函数
1.管道
进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现
实例:
import time
from multiprocessing import Process,Pipe
def func(conn1,conn2):
conn1.close()
conn2.close()
# msg = conn2.recv()
msg = conn2.recv()
print(">>",msg)
if __name__ == '__main__':
conn1,conn2 = Pipe()
p1 = Process(target=func,args=(conn1,conn2,))
p1.start()
conn1.close()
conn2.close()
conn1.send("嘿嘿")
conn1.send("嘿嘿")
conn1.close()
p1.join()
EOFError:接收端两端都关闭的的时候
OSError:发送端关闭后,仍用该发送端发送
2.数据共享
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
import time
from multiprocessing import Process,Manager,Lock
def func(dic,lock):
with lock: #因为数据共享,必须加锁,不认有可能会争抢数据,导致出错
dic["num"] -= 1
if __name__ == '__main__':
m = Manager()
lock = Lock()
dic = m.dict({"num":100})
lis = []
for i in range(100):
p = Process(target=func,args=(dic,lock))
p.start()
lis.append(p)
for el in lis:
el.join()
print(dic["num"])
3.进程池
进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行
进程池实例:(创建多个进程和进程池运算的区别)
import time
from multiprocessing import Process,Manager,Lock,Pool
def func(i):
num = 0
for j in range(5):
num += i
if __name__ == '__main__':
pool = Pool(4)
#创建多个进程运算
lis = []
start_time = time.time()
for i in range(250):
p = Process(target=func,args=(i,))
p.start()
lis.append(p)
for el in lis:
el.join()
end_time = time.time()
dif_time = abs(start_time - end_time)
print(dif_time)
#进程池运算
s_time = time.time()
pool.map(func,range(250)) #此处map所使用的方法,与内置函数用法一致
e_time = time.time()
d_time = abs(s_time - e_time)
print(d_time)
结果:13.073268413543701
0.0017964839935302734
运算速度差距较大,因为创建进程是很浪费资源的
注意:1.,map是异步执行的,并且自带close和join
2.一般约定俗成的是进程池中的进程数量为CPU的数量,工作中要看具体情况来考量。
进程池中常用方法
1.同步方法
import time
from multiprocessing import Process,Pool
def func(i):
num = 0
for j in range(5):
num += i
time.sleep(0.5)
return num
if __name__ == '__main__':
pool = Pool(4)
lis = []
for i in range(10):
res = pool.apply(func,args=(i,)) #
print(res)
2.异步方法
import time
from multiprocessing import Process,Pool
def func(i):
num = 0
for j in range(5):
num += i
time.sleep(0.5)
print('>>>>>', num)
# return num
if __name__ == '__main__':
lis = []
pool = Pool(4)
for i in range(10):
res = pool.apply_async(func,args=(i,)) ## 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了
# lis.append(res)
pool.close() # 不是关闭进程池,只是锁定
pool.join()
for el in lis:
print(el.get())
需要注意的是,进程池中的三个进程不会同时开启或者同时结束
而是执行完一个就释放一个进程,这个进程就去接收新的任务。
4.回调函数
回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
import time,os
from multiprocessing import Pool,Process
def func(n):
print("子进程pid",os.getpid())
return n*2,"约吗?"
def call_back_func(x):
print("call_back pid",os.getpid())
print(x) #回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。
if __name__ == '__main__':
pool = Pool(4)
pool.apply_async(func,args=(2,),callback=call_back_func) #回调函数只能接受值,不能赋值
print('主进程pid', os.getpid())
pool.close()
pool.join()
from threading import Thread
def func(n):
print(">>>")
print(n)
if __name__ == '__main__':
t = Thread(target=func,args=(5,))
t.start()
t.join()
print("呵呵")
import time
from threading import Thread
class mythread(Thread):
def __init__(self,num):
super().__init__()
self.num = num
def run(self):
print(">>>heiehi")
if __name__ == '__main__':
t = mythread(58)
t.start()
time.sleep(0.5)
print("<<>>")
import threading,time
from threading import Thread,current_thread
def func(n):
time.sleep(2)
print('我是子线程,名字是', current_thread().getName())
print('我是子线程,id是', current_thread().ident)
if __name__ == '__main__':
t = Thread(target=func,args=(5,))
t.start()
print('我是主线程,id是', current_thread().ident)
print(threading.enumerate())
print(threading.active_count())