生产者消费者模型

import time import random from multiprocessing import Process,Queue def consumer(q): while True: obj = q.get() print('消费了一个数据%s'%obj) time.sleep(random.randint(1,3)) if __name__ == '__main__': q =Queue() p = Process(target=consumer,args=(q,)) p.daemon = True p.start() for i in range(10): time.sleep(random.randint(1,5)) q.put('food%s'%i) print('生产了一个数据food%s'%i)

import time import random from multiprocessing import Process, Queue def consumer(q): while True: obj = q.get() if obj == None:break print('消费了一个数据%s' % obj) time.sleep(random.randint(1, 3)) def producer(q): for i in range(10): time.sleep(random.randint(1, 5)) q.put('food%s' % i) print('生产了一个数据food%s' % i) q.put(None) if __name__ == '__main__': q = Queue() for i in range(2): p_consumer = Process(target=consumer, args=(q,)) p_consumer.start() for i in range(2): p_producer = Process(target=producer, args=(q,)) p_producer.start()

import time import random from multiprocessing import Process, Queue def consumer(q): while True: obj = q.get() if obj == None:break print('消费了一个数据%s' % obj) time.sleep(random.randint(1, 3)) def producer(q): for i in range(10): time.sleep(random.randint(1, 5)) q.put('food%s' % i) print('生产了一个数据food%s' % i) if __name__ == '__main__': q = Queue() p1_consumer = Process(target=consumer, args=(q,)) p2_consumer = Process(target=consumer, args=(q,)) p1_consumer.start() p2_consumer.start() p1_producer = Process(target=producer, args=(q,)) p2_producer = Process(target=producer, args=(q,)) p1_producer.start() p2_producer.start() p1_producer.join() q.put(None) p2_producer.join() q.put(None)
每个进程的生产者在生产完后向队列中put一个None,消费者get时判断,收到None则退出
multiprocessing.JoinableQueue()
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
q = JoinableQueue()
q.join()
q.task_done

from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('