一、 管道
创建管道的类
Pipe([duplex]):在进程之间创建一条管道,并返回元祖(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生process对象之前产生管道
参数介绍
dumplex
默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送
主要方法:
Conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另一端已经关闭,那么recv方法会抛出EOFError。
Conn1.send(obj):通过连接发送对象。Obj是与序列化兼容的任意对象
其他方法:
Conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
Conn1.fileno():返回连接使用的整数文件描述符
Conn1.poll(【timeout】):如果连接上的数据可用,返回True。Timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达
Conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。Maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发ioError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发eoferror异常
Conn.send_bytes(buffer[,offset[,size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
Conn1.recv_bytes_info(buffer[,offset])接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。Offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发bufferTooShort异常
from multiprocessing import Pipe,Process def func(conn1,): # conn2.close() msg = conn1.recv() print('>>>>>',msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # conn1.close() # conn2.close() conn2.send('小鬼!') print('主进程结束')
应该特别注意管道端点的正确管理问题。如果生产者或消费者都没有使用管道的某个端点,就应将它关闭。这也说明了为何生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记要执行这些步骤,程序可能在消费者中的recv()操作上挂起(就是阻塞)。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道的相同一端就会能生成EOFError异常。因此,在生产者中关闭通道不会有任何效果,除非消费者也关闭了相同的管道端点。
主进程将管道的两端都传给子进程,子进程和主进程共用管道的两种报错情况,都是在recv接收的时候报错的:
1, 主进程和子进程的管道的相同一端都关闭了,出现EOFError
2, 如果你管道的一端在主进程和子进程中都关闭了,但是你还用这个关闭的一端去接收消息,那么就会出现OSError
所以你关闭管道的时候,就会容易出现问题,需要将所有只有这个管道的进程中的两端全部关闭才行,当然也可以通过异常捕获(try:except EOFerror)来处理。
虽然我们在主进程和子进程中都打印了conn1一端的对象,发现两个不再同一个地址,但是子进程中的管道和主进程的管道还是可以通信的,因为管道是同一套,系统能够记录
我们的目的就是关闭所有的通道,那么主进程和子进程进行通信的时候,可以给子进程传管道的一端就够了,并且用我们之前学到的,信息发送完后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中管道了。
from multiprocessing import Process,Pipe def consumer(p,name): produce,consume = p produce.close() while 1: try: baozi = consume.recv() print('%s 收到包子:%s'%(name,baozi)) except EOFError: break def producer(seq,p): produce,consume = p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume = Pipe() c1 = Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq = (i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主进程')
关于管道会造成数据不安全问题的官方解释:
由Pipe方法返回的两个连接对象表示管道的两端。每个连接对象都有send和recv方法(除其他之外)。注意,如果两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。当然,在使用管道的不同端部的过程中不存在损坏风险
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主进程')
from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock:
d['count'] -=1
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
dic = m.dict({'count':100})
p_1 = []
for i in range(100):
p = Process(target=work,args=(dic,lock))
p_1.append(p)
p.start()
for p in p_1:
p.join()
print(dic)
一、 数据共享
进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题,应该尽量避免共享数据的方式,以后会使用数据库来解决进程之间的数据共享问题
总结一下,进程之间的通信:队列,管道,数据共享也算
一、 进程池和mutiprocess.poll
为什么要有进程池?进程池的概念
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万割任务需要被执行的时候,我们就需要去船舰成千上万个进程吗?首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等内容)也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行切换并且记录每个进程的执行节点,也就是记录上下文,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者开启程序。
定义一个池子,有需求来了,就拿一个池中的进程来处理任务,等处理完毕,进程并不关闭,而是将进程再放回池中继续等待任务。如果有很多任务需要执行,池中的数量不够,任务就要等待之前的进程执行完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程再运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果
from multiprocessing import Process,Pool
def func(i):
num = 0
for j in range(5):
num += i
if __name__ == '__main__':
pool = Pool(4) #
p_list = []
start_time = time.time()
for i in range(500):
p = Process(target=func,args=(i,))
p_list.append(p)
p.start()
[pp.join() for pp in p_list]
end_time = time.time()
print(end_time - start_time)
pool.map(func,range(500)) #map
e_time = time.time()
print(e_time - s_time)