队列 :是进程之间安全的 自带锁
from queue import Queue
q = Queue(2)
给参数控制队列的大小如果超过大小且没人取值会一直阻塞
1.
q.put(1) 添加数据 添加一个多一个
2.
q.put_nowait(2) 超过队列大小 将引发Queue.Empty异常
3.
print(q.get()) 取一个少一个 取值没值的时候阻塞
4.
print(q.get_nowait()) 取值没有值的值的时候 报错 queue.Empty
5.
print(q.full()) 看队列满不满 在多进程中是不准确的
6.
print(q.empty()) 看列表空不空 在多进程中是不准确的
7.
print(q.qsize()) 返回队列中目前项目正确数量
from queue import Queue
q = Queue(2)
给参数控制队列的大小如果超过大小且没人取值会一直阻塞
1.
q.put(1) 添加数据 添加一个多一个
2.
q.put_nowait(2) 超过队列大小 将引发Queue.Empty异常
3.
print(q.get()) 取一个少一个 取值没值的时候阻塞
4.
print(q.get_nowait()) 取值没有值的值的时候 报错 queue.Empty
5.
print(q.full()) 看队列满不满 在多进程中是不准确的
6.
print(q.empty()) 看列表空不空 在多进程中是不准确的
7.
print(q.qsize()) 返回队列中目前项目正确数量
1队列 先进先出FIFO 维护秩序的时候用的较多, 买票 秒杀
from multiprocessing import Queue q=Queue() q.put(1) q.put(2) q.put(3) print(q.qsize()) print(q.get()) print(q.qsize()) print(q.get()) print(q.get()) print(q.get()) #没有值的时候回堵塞 print(q.get_nowait()) #有值的时候会取值 print(q.get_nowait()) #没有值的时候会报错 print(q.full()) #在实例化的时候加一个值 当队列里的值满了会报True print(q.empty()) #当队列里没有值了 会报True print(q.qsize()) #获取管道里数据的个数
子程序进 子程序出
# from multiprocessing import Queue,Process
# def func(q):
# q.put(22)
#
# def func1(q):
# print(q.get())
#
# if __name__ == '__main__':
# q=Queue()
# p=Process(target=func,args=(q,))
# p.start()
# p = Process(target=func1, args=(q,))
# p.start()
# 生产者消费者模型 - 解决创造(生产)数据和处理(消费)数据的效率不平衡问题
# 把创造数据 和 处理数据放在不同的进程中,
# 根据他们的效率来调整进程的个数
# 生产数据快 消费数据慢 内存空间的浪费
# 消费数据快 生产数据慢 效率低下
# 消费数据快 生产数据慢 效率低下
解决办法
第一版本
View Code
View Code
View Code
View Code
View Code
View Code
from multiprocessing import Queue,Process def func1(q,name): while True: food=q.get() print("%s吃了%s" %(name,food)) def func2(q,name,food,n=10): for i in range(n): fd=food+str(i) print('%s 生产了 %s' % (name, fd)) q.put(fd) if __name__ == '__main__': q=Queue() p=Process(target=func1,args=(q,"姚志强")) p.start() p1 = Process(target=func2, args=(q,"戴恩","泔水")) p1.start()
第二版本 采用from multiprocessing import JoinableQueue 这个模块
import time import random from multiprocessing import JoinableQueue,Process def func1(q,name): while True: food=q.get() print("%s吃了%s" % (name,food)) time.sleep(1) q.task_done() def func2(q,name,food,n=10): for i in range(n): time.sleep(random.random()) fd=food+str(i) print("%s制造了%s" % (name,fd)) q.put(fd) q.join() if __name__ == '__main__': q=JoinableQueue(1) c=Process(target=func1,args=(q,"姚志强")) c.daemon = True c.start() c1=Process(target=func1,args=(q,"姚志强")) c1.daemon = True c1.start() z=Process(target=func2,args=(q,"戴恩","泔水")) z.start() z1 = Process(target=func2, args=(q, "戴恩", "清泉")) z1.start() z.join() z1.join()
# 只有multiprocessing中的队列 才能帮助你 实现 IPC
# 永远不可能出现数据不安全的情况,多个进程不会同时取走同一个数据
# 提供给你的方法
# 永远不可能出现数据不安全的情况,多个进程不会同时取走同一个数据
# 提供给你的方法
# 由于先进先出的特点+进程通信的功能+数据进程安全,经常用它来完成进程之间的通信
# 生产者消费者模型
# 生产者和消费者的效率平衡的问题
# 内存的控制 - 队列的长度限制
# 让消费者自动停下来
# 生产者消费者模型
# 生产者和消费者的效率平衡的问题
# 内存的控制 - 队列的长度限制
# 让消费者自动停下来
# JoinableQueue
# 在消费数据的时候 task_done
# 在生产端主进程 join
# 在消费数据的时候 task_done
# 在生产端主进程 join
二 管道
# 管道
# 队列就是基于管道实现的
# 队列 数据安全的
# 管道 数据不安全的
# 队列 = 管道 + 锁
# from multiprocessing import Pipe # left,right = Pipe() # left.send('aaa') # print(right.recv()) # from multiprocessing import Pipe,Process # def consumer(left,right): # left.close() # while True: # try: # print(right.recv()) # except EOFError: # break # # if __name__ == '__main__': # left,right = Pipe() # p = Process(target=consumer,args=(left,right)) # p.start() # right.close() # for i in range(10): # left.send('hello') # left.close()
EOF异常的触发
# 在这一个进程中 如果不在用这个端点了,应该close
# 这一在recv的时候,如果其他端点都被关闭了,就能够知道不会在有新的消息传进来
# 此时就不会在这里阻塞等待,而是抛出一个EOFError
# * close并不是关闭了整个管道,而是修改了操作系统对管道端点的引用计数的处理
# from multiprocessing import Process,Pipe # # def consumer(p,name): # produce, consume=p # produce.close() # while True: # try: # baozi=consume.recv() # print('%s 收到包子:%s' %(name,baozi)) # except EOFError: # break # # def producer(p,seq=10): # produce, consume=p # consume.close() # for i in range(seq): # produce.send(i) # # if __name__ == '__main__': # produce,consume=Pipe() # for i in range(5): # c=Process(target=consumer,args=((produce,consume),'c1')) # c.start() # for i in range(5): # p = Process(target=producer, args=((produce, consume))) # p.start() # producer((produce,consume)) # produce.close() # consume.close()
三 数据共享
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: d['count']-=1 if __name__ == '__main__': lock = Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) # with as 的机制 # __enter__ # __exit__
四 数据池
# 进程不能无限开 会给操作系统调度增加负担
# 且真正能被同时执行的进程最多也就和CPU个数相同等
# 进程的开启和销毁都要消耗资源和时间
import os import time from multiprocessing import Pool def func(i): time.sleep(0.1) print(os.getpid(),i) if __name__ == '__main__': p = Pool(5) for i in range(20): p.apply_async(func,args=(i,)) p.close() p.join()