zoukankan      html  css  js  c++  java
  • python学习Day37--生产者消费者模型

    一、回顾

    1、并发与并行:

      并发:在同一个时间短内多个任务同时进行。

      并行:在用一个时间点上多个任务同时进行。

    2、进程的三大基本状态:

      就绪状态:所有进程需要的资源都获取到了,除了CPU。

      执行状态:获取到了所有资源包括CPU,进程处于运行状态。

      阻塞状态:进程停止不再运行,放弃了CPU,进程此时处于内存中。

    3、什么叫进程?

      正在运行的程序;由代码段,数据段,PCB(进程控制块)组成;

      进程是资源分配的基本单位;

    4、进程之间能不能直接通信?

      正常情况下,多进程之间是无法进行通信的。因为每一个进程都有自己独立的内存空间。

    5、锁机制

      为了多进程通信时,保护数据的安全性。

      一把锁醅一把钥匙。

      l = Lock( )

      l.acquire( )

      l.release( )

    6、信号量

      一把锁配多把钥匙。

      sem = Semaphore(num)

      num代表的是几把钥匙。

    7、事件

      e = Event( )

      e.is_set( )  返回一个bool值。

      e.wait( )     阻塞和非阻塞

      e.set( )       把is_set的bool值变为True

      e.clear( )    把is_set的bool变为False

    二、生产者消费者模型

      主要是为了解耦

      借助队列来实现生产者消费者模型。

    1、栈与队列介绍

      栈:先进后出(First In Last Out 简称FILO)

      队列:先进先出(First In First Out 简称FIFO),队列是安全的。

    import queue # 不能进程多进程之前的数据传输(不用)
    from multiprocessing import Queue # 用这个模块

    (1)队列的主要内容

    q = Queue(num)
    # num:队列的最大长度
    q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
    q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
    
    q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错
    q.put_nowait() # 不阻塞,如果可以继续网队列中放数据,就直接放,不能就报错

    (2)初始队列

     1 # *******************练习**********************开始
     2 from multiprocessing import Queue
     3 
     4 q = Queue(3) # 长度为3
     5 
     6 q.put(1)
     7 q.put('abc')
     8 q.put([4,5,6])
     9 print(123)
    10 # q.put('娃哈哈') # 阻塞,不报错
    11 try:
    12     q.put_nowait('娃哈哈') # 阻塞,并报错
    13 except:
    14     print("队列满了")
    15 print(111)
    16 
    17 print(q.get())
    18 print(q.get())
    19 print(q.get())
    20 # print(q.get()) # 阻塞,等待
    21 try:
    22     print(q.get_nowait())
    23 except:
    24     print("队列已经空了")
    25 
    26 # *******************练习**********************结束
    练习

    2、队列实现生产者消费模型

     1 # *******************队列实现生产者消费者模型**********************开始
     2 from multiprocessing import Queue,Process
     3 import time
     4 
     5 def consumer(q,name):
     6     while 1:
     7         info = q.get_nowait()
     8         if info:
     9             print('%s拿走了%s' % (name,info))
    10         else:
    11             print("没有娃娃了,等下一批吧")
    12             break
    13 
    14 
    15 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    16 
    17 def producer(q,product):
    18     for i in range(20):
    19         info = product+'娃娃%s号' % str(i)
    20         q.put(info)
    21     q.put(None)
    22 
    23 if __name__ == '__main__':
    24     q = Queue(10)
    25     p_pro = Process(target=producer,args=(q,'mini版本'))
    26     p_con = Process(target=consumer,args=(q,'xxx'))
    27     p_pro.start()
    28     p_con.start()
    29 # *******************队列实现生产者消费者模型**********************结束

    3、进阶生产者消费者模型

    (1)进阶:将生产者生产结束的标识,放到父进程中。

     1 # *******************进阶生产者消费者模型**********************开始
     2 from multiprocessing import Queue,Process
     3 import time
     4 
     5 def consumer(q,name,color):
     6     while 1:
     7         info = q.get()
     8         if info:
     9             print('%s%s拿走了%s33[0m' % (color,name,info))
    10         else:
    11             print("没有娃娃了,等下一批吧")
    12             break
    13 
    14 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    15 
    16 def producer(q,product):
    17     for i in range(20):
    18         info = product+'娃娃%s号' % str(i)
    19         q.put(info)
    20 
    21 if __name__ == '__main__':
    22     q = Queue(10)
    23     p_pro1 = Process(target=producer,args=(q,'mini版本'))
    24     p_pro2 = Process(target=producer,args=(q,'maxi版本'))
    25     p_pro3 = Process(target=producer,args=(q,'super版本'))
    26     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
    27     p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
    28     p_1 = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
    29     [i.start() for i in p_1]
    30 
    31     # 父进程如何感知生产者子进p_pro1程不在生产数据了?
    32     # 通过加入join来解决
    33     p_pro1.join()
    34     p_pro2.join()
    35     p_pro3.join()
    36     q.put(None) # 几个消费者就接收几个结束标识
    37     q.put(None)
    38 # *******************进阶生产者消费者模型**********************结束
    进阶版

    (2)新模块JoinableQueue(可连接队列)——改进生产者消费者模型

      继承Queue,所有可以使用Queue里面的方法。

     1 # *******************新模块joinableQueue**********************开始
     2 from multiprocessing import Process,JoinableQueue
     3 
     4 # q = JoinableQueue()
     5 
     6 # q.join() # 用于生产者。等待q.task_done返回结果,通过返回结果,生产和就能获得消费者当前消费了多少个数据
     7 # q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识
     8 
     9 from multiprocessing import Queue,Process
    10 import time
    11 
    12 def consumer(q,name,color):
    13     while 1:
    14         info = q.get()
    15         print('%s%s拿走了%s33[0m' % (color,name,info))
    16         q.task_done()
    17 
    18 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    19 
    20 def producer(q,product):
    21     for i in range(20):
    22         info = product+'娃娃%s号' % str(i)
    23         q.put(info)
    24     q.join() #记录了生产20个数据在队列中,此时会阻塞等待消费者消费完队列中所有数据
    25 
    26 if __name__ == '__main__':
    27     q = JoinableQueue(10)
    28     p_pro1 = Process(target=producer,args=(q,'mini版本'))
    29     # p_pro2 = Process(target=producer,args=(q,'maxi版本'))
    30     # p_pro3 = Process(target=producer,args=(q,'super版本'))
    31     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
    32     # p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
    33     p_con1.daemon = True # 把消费者进程设为守护进程(这样做是:最后才能结束消费者进程,主进程结束,守护进程也就结束了)
    34     p_pro1.start()
    35     p_con1.start()
    36     p_pro1.join() # 主进程等待生产者进程结束
    37 
    38 '''
    39     程序有3个进程,主进程和生产者进程和消费者进程。 当主进程执行到p_pro1.join()代码时,主进程会等待生产进程结束
    40     而生产进程中会等待消费者进程把所有数据消费完,生产者进程才结束。
    41     现在的状态就是  主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据
    42     所以,把消费者设置为守护进程。  当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
    43     此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。    整个程序也就能正常结束了。
    44 '''
    45 # *******************新模块joinableQueue**********************结束

    4、管道

      管道是不安全的。

    (1)管道介绍、

     1 # *******************管道**********************开始
     2 from multiprocessing import Pipe
     3 
     4 con1,con2 = Pipe()
     5 
     6 con1.send('abc')
     7 print(con2.recv())
     8 con2.send('456')
     9 print(con1.recv())
    10 # *******************管道**********************结束

    (2)多进程下的管道

     1 # *******************多进程下的管道**********************开始
     2 from multiprocessing import Pipe,Process
     3 
     4 def func(con):
     5     con1,con2 = con
     6     con1.close()
     7     while 1:
     8         try:
     9             print(con2.recv())
    10         except EOFError:
    11             con2.close()
    12             break
    13 
    14 if __name__ == '__main__':
    15     con1,con2 = Pipe()
    16     p = Process(target=func,args=((con1,con2),))
    17     p.start()
    18     con2.close()
    19     for i in range(10):
    20         con1.send('abc')
    21     con1.close()
    22 # *******************多进程下的管道**********************结束

    【注意】一个小问题:你知道的IPC有哪些?

      管道、队列、(锁,信号量,事件)

    5、进程之间的共享内存Manager

     1 # *******************多进程之前的共享内存**********************开始
     2 from multiprocessing import Process,Manager
     3 
     4 def func(num):
     5     num[0] -= 1
     6     print("子进程中的num的值是",num) # [0, 2, 3]
     7 
     8 
     9 if __name__ == '__main__':
    10     m = Manager()
    11     num = m.list([1,2,3])
    12     p = Process(target=func,args=(num,))
    13     p.start()
    14     p.join()
    15     print("父进程中的num的值是",num) # [0, 2, 3]
    16 # *******************多进程之前的共享内存**********************结束
    Manager

    6、进程池

         进程池:一个池子,里边有固定数量的进程。这些进程抑制处于待命状态,一旦有任务来,马上就有进程去处理。

      因为在实际业务中,任务量是有多又有少的,如果任务量特别多,不可能要开对应那么多的进程数;

      开启那么多进程首先就需要消耗大量的时间让操作系统来管理它,其次还需要消耗大量时间让CPU帮你调度它。

      进程池还会帮程序员去管理池中的进程。

    进程池有三个方法

    (1)map(func, iterable)

      func:进程池中的进程执行的任务函数

      iterable:可迭代对象,是把可迭代对象中的每个元素依次传给任务函数的参数

    (2)apply(func, args=( )) ——同步的效率,也就是说池中的进程一个一个的去执行任务

      func:进程池中的进程执行的任务函数

      args:可迭代对象型的参数,是传给任务函数的参数

      同步处理任务时,不需要close和join

    (3)apply_async(func, args=( ),callback=None) ——异步的效率,也就是说池中的进程一次性都去执行任务

      func:进程池中的进程执行的任务函数

      args:可迭代对象型的参数,是传给任务函数的参数

      callback:回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步没有的。

      异步处理任务时,必须要加上close和join

     1 # *******************进程池**********************开始
     2 from multiprocessing import Pool,Process
     3 import os
     4 import time
     5 
     6 def func(num):
     7     num += 1
     8     print(num)
     9 
    10 if __name__ == '__main__':
    11     # p = Pool(os.cpu_count())
    12     # start = time.time()
    13     # p.map(func,[i for i in range(100)])
    14     # p.close() # 指不允许再向进程池中添加任务
    15     # p.join() # 等待进程池中所有进程执行完所有任务
    16     # print("进程池做任务的效率", time.time() - start)
    17 
    18     # p = Pool(os.cpu_count())
    19     # # start = time.time()
    20     # for i in range(100):
    21     #     res = p.apply(func,args=(i,)) # (进程池的同步处理)是指让进程池中的进程,同步的帮你做任务
    22     # print("进程池做任务的效率", time.time() - start)
    23 
    24     p = Pool(os.cpu_count())
    25     start = time.time()
    26     for i in range(100):
    27         p.apply_async(func, args=(i,)) # (进程池的异步处理)是指让进程池中的进程,异步的帮你做任务
    28     print("进程池做任务的效率", time.time() - start)
    29 
    30     start = time.time()
    31     p_1 = []
    32     for i in range(100):
    33         p1 = Process(target=func,args=(i,)) # (多线程的处理)
    34         p1.start()
    35         p_1.append(p1)
    36     [p1.join() for p1 in p_1]
    37     print("多进程做任务的效率",time.time()-start)
    38 # *******************进程池**********************结束
    进程池的操作

    回调函数的使用:

      进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作。

      回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

     1 from multiprocessing import Pool
     2 import requests
     3 import time,os
     4 
     5 def func(url):
     6     res = requests.get(url)
     7     print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
     8     # print(res.text)
     9     if res.status_code == 200:
    10         return url,res.text
    11 
    12 def cal_back(sta):
    13     url,text = sta
    14     print('回调函数的pid', os.getpid())
    15     with open('a.txt','a',encoding='utf-8') as f:
    16         f.write(url + text)
    17     print('回调函数中!',url)
    18 
    19 if __name__ == '__main__':
    20     p = Pool(5)
    21     l = ['https://www.baidu.com',
    22          'http://www.jd.com',
    23          'http://www.taobao.com',
    24          'http://www.mi.com',
    25          'http://www.cnblogs.com',
    26          'https://www.bilibili.com',
    27          ]
    28     print('主进程的pid',os.getpid())
    29     for i in l:
    30         p.apply_async(func, args=(i,),callback=cal_back) # 回调函数是由主进程调用的
    31         # 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
    32     p.close()
    33     p.join()
    回调函数的使用
  • 相关阅读:
    107. Binary Tree Level Order Traversal II
    103. Binary Tree Zigzag Level Order Traversal
    102. Binary Tree Level Order Traversal
    690. Employee Importance
    1723. Find Minimum Time to Finish All Jobs
    LeetCode 329 矩阵中最长增长路径
    7.2 物理内存管理
    LeetCode 面试题 特定深度节点链表
    LeetCode 100 相同的树
    npm安装包命令详解,dependencies与devDependencies实际区别
  • 原文地址:https://www.cnblogs.com/fengxb1213/p/12747450.html
Copyright © 2011-2022 走看看