生产者+消费者模型
从网上爬取数据
从网页上获取数据的过程 --》 生产数据的过程,爬取网页,是生产者行为
把数据取回来进行分析得出结果 --》数据消费过程,是消费者行为
使用队列来完成生产、消费的过程
生产者,是进程
消费者,是进程
生产者与消费者之间,传递数据,需要一个盘子(IPC)
# 没设置时间延迟的情况下
from multiprocessing import Queue, Process
def consumer(name, q):
while 1:
food = q.get() # 会阻塞,直到队列里有数据就取出来
print("%s把包子%s号给吃了" % (name, food))
def producer(q, food_name):
for i in range(10):
food = "%s%s" % (food_name, i)
print("我做了%s号" % food)
q.put(food)
if __name__ == "__main__":
q = Queue()
p = Process(target=consumer, args=("alex", q))
p1 = Process(target=producer, args=(q, "包子"))
p.start()
p1.start()
# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# 我做了包子3号
# 我做了包子4号
# alex把包子包子0号给吃了
# 我做了包子5号
# 我做了包子6号
# 我做了包子7号
# 我做了包子8号
# 我做了包子9号
# alex把包子包子1号给吃了
# alex把包子包子2号给吃了
# alex把包子包子3号给吃了
# alex把包子包子4号给吃了
# alex把包子包子5号给吃了
# alex把包子包子6号给吃了
# alex把包子包子7号给吃了
# alex把包子包子8号给吃了
# alex把包子包子9号给吃了
# 运行发现明显生产比消费更快,供大于求,注意程序并没有停止运行
# 设置时间延迟后
from multiprocessing import Queue, Process
import time
def consumer(name, q):
while 1:
food = q.get() # 会阻塞,直到队列里有数据就取出来
time.sleep(1)
print("%s把包子%s号给吃了" % (name, food))
def producer(q, food_name):
for i in range(10):
time.sleep(0.5)
food = "%s%s" % (food_name, i)
print("我做了%s号" % food)
q.put(food)
if __name__ == "__main__":
q = Queue()
p = Process(target=consumer, args=("alex", q))
p1 = Process(target=producer, args=(q, "包子"))
p.start()
p1.start()
# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# alex把包子包子0号给吃了
# 我做了包子3号
# alex把包子包子1号给吃了
# 我做了包子4号
# 我做了包子5号
# alex把包子包子2号给吃了
# 我做了包子6号
# 我做了包子7号
# alex把包子包子3号给吃了
# 我做了包子8号
# 我做了包子9号
# alex把包子包子4号给吃了
# alex把包子包子5号给吃了
# alex把包子包子6号给吃了
# alex把包子包子7号给吃了
# alex把包子包子8号给吃了
# alex把包子包子9号给吃了
# 运行发现,虽然设置了延迟,但是生产(producer)的延迟比消费(consumer)的延迟更短
# 所以还是供大于求
# 修改生产者与消费者的延迟设置
from multiprocessing import Queue, Process
import time
def consumer(name, q):
while 1:
food = q.get() # 会阻塞,直到队列里有数据就取出来
time.sleep(0.5)
print("%s把包子%s号给吃了" % (name, food))
def producer(q, food_name):
for i in range(10):
time.sleep(1)
food = "%s%s" % (food_name, i)
print("我做了%s号" % food)
q.put(food)
if __name__ == "__main__":
q = Queue()
p = Process(target=consumer, args=("alex", q))
p1 = Process(target=producer, args=(q, "包子"))
p.start()
p1.start()
# 我做了包子0号
# alex把包子包子0号给吃了
# 我做了包子1号
# alex把包子包子1号给吃了
# 我做了包子2号
# alex把包子包子2号给吃了
# 我做了包子3号
# alex把包子包子3号给吃了
# 我做了包子4号
# alex把包子包子4号给吃了
# 我做了包子5号
# alex把包子包子5号给吃了
# 我做了包子6号
# alex把包子包子6号给吃了
# 我做了包子7号
# alex把包子包子7号给吃了
# 我做了包子8号
# alex把包子包子8号给吃了
# 我做了包子9号
# alex把包子包子9号给吃了
# 很明显,在只有一个消费者和一个生产者的前提下,如果生产的时间要比消费的时间更长
# 那么每生产一个包子,消费者都能立刻吃掉,并且等着下一个生产好的包子
# 基于队列实现生产者消费者模型
from multiprocessing import Queue, Process
import time
def consumer(name, q):
while 1:
food = q.get() # 会阻塞,直到队列里有数据就取出来
time.sleep(2)
print("%s拿到了包子%s号后并把它吃掉" % (name, food))
def producer(q, food_name):
for i in range(20): # 生产20个包子
time.sleep(0.1)
food = "%s%s" % (food_name, i)
print("我做了%s号" % food)
q.put(food) # 如果队列满了就在这里阻塞,直到队列里有空位置
if __name__ == "__main__":
q = Queue(5) # 这个队列的容量,这里是指 6个的容量
p = Process(target=consumer, args=("alex", q)) # 消费者1号
p1 = Process(target=consumer, args=("太白", q)) # 消费者2号
p2 = Process(target=producer, args=(q, "包子")) # 生产者1号
p.start()
p1.start()
p2.start()
# 我做了包子0号
# 我做了包子1号
# 我做了包子2号
# 我做了包子3号
# 我做了包子4号
# 我做了包子5号
# 我做了包子6号
# 我做了包子7号
# alex拿到包子包子0号并把它吃掉
# 太白拿到包子包子1号并把它吃掉
# 我做了包子8号
# 我做了包子9号
# alex拿到包子包子2号并把它吃掉
# 我做了包子10号
# 太白拿到包子包子3号并把它吃掉
# 我做了包子11号
# alex拿到包子包子4号并把它吃掉
# 太白拿到包子包子5号并把它吃掉
# 我做了包子12号
# 我做了包子13号
# alex拿到包子包子6号并把它吃掉
# 我做了包子14号
# 太白拿到包子包子7号并把它吃掉
# 我做了包子15号
# alex拿到包子包子8号并把它吃掉
# 我做了包子16号
# 太白拿到包子包子9号并把它吃掉
# 我做了包子17号
# alex拿到包子包子10号并把它吃掉
# 我做了包子18号
# 太白拿到包子包子11号并把它吃掉
# 我做了包子19号
# alex拿到包子包子12号并把它吃掉
# 太白拿到包子包子13号并把它吃掉
# alex拿到包子包子14号并把它吃掉
# 太白拿到包子包子15号并把它吃掉
# alex拿到包子包子16号并把它吃掉
# 太白拿到包子包子17号并把它吃掉
# alex拿到包子包子18号并把它吃掉
# 太白拿到包子包子19号并把它吃掉
# alex拿到包子None号并把它吃掉
# 太白拿到包子None号并把它吃掉
# 注意一开始为什么要生产8个包子才开始吃?
# 因为是本来这个队列q = Queue(5)限定了只能放6个包子进去
# 但是生产者一旦把6个包子放进去,两个消费者就分别从里面取出一个包子
# 只不过,重点是消费者要2s后才能继续拿包子吃,但他们已经把两个包子拿出来了
# 所以生产者还得生产2个包子放进去,把队列排满
# 运行发现,中间之所以吃了一个才会生产一个,是因为队列已经满了
# 前面所有的示例, 都是主进程永远不会结束
# 原因是:生产者p在生产完后就结束了
# 但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步
# 解决办法: 主进程里发结束信号,但主进程需要等生产者结束后再发送该信号
from multiprocessing import Queue, Process
import time
def consumer(name, q):
while 1:
food = q.get()
if not food: break
time.sleep(2)
print("%s拿到了%s号并开始吃它" % (name, food))
def producer(q, food_name):
for i in range(20): # 生产20个包子
time.sleep(0.1)
food = "%s%s" % (food_name, i)
print("我做了%s号" % food)
q.put(food) # 如果队列满了就在这里阻塞,直到队列里有空位置
if __name__ == "__main__":
q = Queue(5) # 这个队列的容量,这里是指 6个的容量
p = Process(target=consumer, args=("alex", q)) # 消费者1号
p1 = Process(target=consumer, args=("太白", q)) # 消费者2号
p2 = Process(target=producer, args=(q, "包子")) # 生产者1号
p.start()
p2.start()
p1.start()
p2.join() # 确认生产者把所需生产数量都生产完毕
q.put(None)
q.put(None)
# 运行结果和上面一样,不同的是程序会结束
# 吃的人是消费数据的
# 制造吃的人是生产数据的
# 生产快,消费慢,供过于求
# 增加消费者
# 生产慢,消费快,供不应求
# 增加生产者
# 控制内存
import queue
q = queue.Queue(5) # 设置一个容量值,保护内存
q.put(1)
print("1 over")
q.put(2)
print("2 over")
q.put(3)
print("3 over")
q.put(4)
print("4 over")
q.put(5)
print("5 over")
q.put(6)
print("6 over")
# 1 over
# 2 over
# 3 over
# 4 over
# 5 over
# 运行发现程序不会停,因为它还在等,等设置的内存里减少东西再执行q.put(6)