zoukankan      html  css  js  c++  java
  • python学习Day37--生产者消费者模型

    一、回顾

    1、并发与并行:

      并发:在同一个时间短内多个任务同时进行。

      并行:在用一个时间点上多个任务同时进行。

    2、进程的三大基本状态:

      就绪状态:所有进程需要的资源都获取到了,除了CPU。

      执行状态:获取到了所有资源包括CPU,进程处于运行状态。

      阻塞状态:进程停止不再运行,放弃了CPU,进程此时处于内存中。

    3、什么叫进程?

      正在运行的程序;由代码段,数据段,PCB(进程控制块)组成;

      进程是资源分配的基本单位;

    4、进程之间能不能直接通信?

      正常情况下,多进程之间是无法进行通信的。因为每一个进程都有自己独立的内存空间。

    5、锁机制

      为了多进程通信时,保护数据的安全性。

      一把锁醅一把钥匙。

      l = Lock( )

      l.acquire( )

      l.release( )

    6、信号量

      一把锁配多把钥匙。

      sem = Semaphore(num)

      num代表的是几把钥匙。

    7、事件

      e = Event( )

      e.is_set( )  返回一个bool值。

      e.wait( )     阻塞和非阻塞

      e.set( )       把is_set的bool值变为True

      e.clear( )    把is_set的bool变为False

    二、生产者消费者模型

      主要是为了解耦

      借助队列来实现生产者消费者模型。

    1、栈与队列介绍

      栈:先进后出(First In Last Out 简称FILO)

      队列:先进先出(First In First Out 简称FIFO),队列是安全的。

    import queue # 不能进程多进程之前的数据传输(不用)
    from multiprocessing import Queue # 用这个模块

    (1)队列的主要内容

    q = Queue(num)
    # num:队列的最大长度
    q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
    q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
    
    q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错
    q.put_nowait() # 不阻塞,如果可以继续网队列中放数据,就直接放,不能就报错

    (2)初始队列

     1 # *******************练习**********************开始
     2 from multiprocessing import Queue
     3 
     4 q = Queue(3) # 长度为3
     5 
     6 q.put(1)
     7 q.put('abc')
     8 q.put([4,5,6])
     9 print(123)
    10 # q.put('娃哈哈') # 阻塞,不报错
    11 try:
    12     q.put_nowait('娃哈哈') # 阻塞,并报错
    13 except:
    14     print("队列满了")
    15 print(111)
    16 
    17 print(q.get())
    18 print(q.get())
    19 print(q.get())
    20 # print(q.get()) # 阻塞,等待
    21 try:
    22     print(q.get_nowait())
    23 except:
    24     print("队列已经空了")
    25 
    26 # *******************练习**********************结束
    练习

    2、队列实现生产者消费模型

     1 # *******************队列实现生产者消费者模型**********************开始
     2 from multiprocessing import Queue,Process
     3 import time
     4 
     5 def consumer(q,name):
     6     while 1:
     7         info = q.get_nowait()
     8         if info:
     9             print('%s拿走了%s' % (name,info))
    10         else:
    11             print("没有娃娃了,等下一批吧")
    12             break
    13 
    14 
    15 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    16 
    17 def producer(q,product):
    18     for i in range(20):
    19         info = product+'娃娃%s号' % str(i)
    20         q.put(info)
    21     q.put(None)
    22 
    23 if __name__ == '__main__':
    24     q = Queue(10)
    25     p_pro = Process(target=producer,args=(q,'mini版本'))
    26     p_con = Process(target=consumer,args=(q,'xxx'))
    27     p_pro.start()
    28     p_con.start()
    29 # *******************队列实现生产者消费者模型**********************结束

    3、进阶生产者消费者模型

    (1)进阶:将生产者生产结束的标识,放到父进程中。

     1 # *******************进阶生产者消费者模型**********************开始
     2 from multiprocessing import Queue,Process
     3 import time
     4 
     5 def consumer(q,name,color):
     6     while 1:
     7         info = q.get()
     8         if info:
     9             print('%s%s拿走了%s33[0m' % (color,name,info))
    10         else:
    11             print("没有娃娃了,等下一批吧")
    12             break
    13 
    14 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    15 
    16 def producer(q,product):
    17     for i in range(20):
    18         info = product+'娃娃%s号' % str(i)
    19         q.put(info)
    20 
    21 if __name__ == '__main__':
    22     q = Queue(10)
    23     p_pro1 = Process(target=producer,args=(q,'mini版本'))
    24     p_pro2 = Process(target=producer,args=(q,'maxi版本'))
    25     p_pro3 = Process(target=producer,args=(q,'super版本'))
    26     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
    27     p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
    28     p_1 = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
    29     [i.start() for i in p_1]
    30 
    31     # 父进程如何感知生产者子进p_pro1程不在生产数据了?
    32     # 通过加入join来解决
    33     p_pro1.join()
    34     p_pro2.join()
    35     p_pro3.join()
    36     q.put(None) # 几个消费者就接收几个结束标识
    37     q.put(None)
    38 # *******************进阶生产者消费者模型**********************结束
    进阶版

    (2)新模块JoinableQueue(可连接队列)——改进生产者消费者模型

      继承Queue,所有可以使用Queue里面的方法。

     1 # *******************新模块joinableQueue**********************开始
     2 from multiprocessing import Process,JoinableQueue
     3 
     4 # q = JoinableQueue()
     5 
     6 # q.join() # 用于生产者。等待q.task_done返回结果,通过返回结果,生产和就能获得消费者当前消费了多少个数据
     7 # q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识
     8 
     9 from multiprocessing import Queue,Process
    10 import time
    11 
    12 def consumer(q,name,color):
    13     while 1:
    14         info = q.get()
    15         print('%s%s拿走了%s33[0m' % (color,name,info))
    16         q.task_done()
    17 
    18 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
    19 
    20 def producer(q,product):
    21     for i in range(20):
    22         info = product+'娃娃%s号' % str(i)
    23         q.put(info)
    24     q.join() #记录了生产20个数据在队列中,此时会阻塞等待消费者消费完队列中所有数据
    25 
    26 if __name__ == '__main__':
    27     q = JoinableQueue(10)
    28     p_pro1 = Process(target=producer,args=(q,'mini版本'))
    29     # p_pro2 = Process(target=producer,args=(q,'maxi版本'))
    30     # p_pro3 = Process(target=producer,args=(q,'super版本'))
    31     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
    32     # p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
    33     p_con1.daemon = True # 把消费者进程设为守护进程(这样做是:最后才能结束消费者进程,主进程结束,守护进程也就结束了)
    34     p_pro1.start()
    35     p_con1.start()
    36     p_pro1.join() # 主进程等待生产者进程结束
    37 
    38 '''
    39     程序有3个进程,主进程和生产者进程和消费者进程。 当主进程执行到p_pro1.join()代码时,主进程会等待生产进程结束
    40     而生产进程中会等待消费者进程把所有数据消费完,生产者进程才结束。
    41     现在的状态就是  主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据
    42     所以,把消费者设置为守护进程。  当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
    43     此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。    整个程序也就能正常结束了。
    44 '''
    45 # *******************新模块joinableQueue**********************结束

    4、管道

      管道是不安全的。

    (1)管道介绍、

     1 # *******************管道**********************开始
     2 from multiprocessing import Pipe
     3 
     4 con1,con2 = Pipe()
     5 
     6 con1.send('abc')
     7 print(con2.recv())
     8 con2.send('456')
     9 print(con1.recv())
    10 # *******************管道**********************结束

    (2)多进程下的管道

     1 # *******************多进程下的管道**********************开始
     2 from multiprocessing import Pipe,Process
     3 
     4 def func(con):
     5     con1,con2 = con
     6     con1.close()
     7     while 1:
     8         try:
     9             print(con2.recv())
    10         except EOFError:
    11             con2.close()
    12             break
    13 
    14 if __name__ == '__main__':
    15     con1,con2 = Pipe()
    16     p = Process(target=func,args=((con1,con2),))
    17     p.start()
    18     con2.close()
    19     for i in range(10):
    20         con1.send('abc')
    21     con1.close()
    22 # *******************多进程下的管道**********************结束

    【注意】一个小问题:你知道的IPC有哪些?

      管道、队列、(锁,信号量,事件)

    5、进程之间的共享内存Manager

     1 # *******************多进程之前的共享内存**********************开始
     2 from multiprocessing import Process,Manager
     3 
     4 def func(num):
     5     num[0] -= 1
     6     print("子进程中的num的值是",num) # [0, 2, 3]
     7 
     8 
     9 if __name__ == '__main__':
    10     m = Manager()
    11     num = m.list([1,2,3])
    12     p = Process(target=func,args=(num,))
    13     p.start()
    14     p.join()
    15     print("父进程中的num的值是",num) # [0, 2, 3]
    16 # *******************多进程之前的共享内存**********************结束
    Manager

    6、进程池

         进程池:一个池子,里边有固定数量的进程。这些进程抑制处于待命状态,一旦有任务来,马上就有进程去处理。

      因为在实际业务中,任务量是有多又有少的,如果任务量特别多,不可能要开对应那么多的进程数;

      开启那么多进程首先就需要消耗大量的时间让操作系统来管理它,其次还需要消耗大量时间让CPU帮你调度它。

      进程池还会帮程序员去管理池中的进程。

    进程池有三个方法

    (1)map(func, iterable)

      func:进程池中的进程执行的任务函数

      iterable:可迭代对象,是把可迭代对象中的每个元素依次传给任务函数的参数

    (2)apply(func, args=( )) ——同步的效率,也就是说池中的进程一个一个的去执行任务

      func:进程池中的进程执行的任务函数

      args:可迭代对象型的参数,是传给任务函数的参数

      同步处理任务时,不需要close和join

    (3)apply_async(func, args=( ),callback=None) ——异步的效率,也就是说池中的进程一次性都去执行任务

      func:进程池中的进程执行的任务函数

      args:可迭代对象型的参数,是传给任务函数的参数

      callback:回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步没有的。

      异步处理任务时,必须要加上close和join

     1 # *******************进程池**********************开始
     2 from multiprocessing import Pool,Process
     3 import os
     4 import time
     5 
     6 def func(num):
     7     num += 1
     8     print(num)
     9 
    10 if __name__ == '__main__':
    11     # p = Pool(os.cpu_count())
    12     # start = time.time()
    13     # p.map(func,[i for i in range(100)])
    14     # p.close() # 指不允许再向进程池中添加任务
    15     # p.join() # 等待进程池中所有进程执行完所有任务
    16     # print("进程池做任务的效率", time.time() - start)
    17 
    18     # p = Pool(os.cpu_count())
    19     # # start = time.time()
    20     # for i in range(100):
    21     #     res = p.apply(func,args=(i,)) # (进程池的同步处理)是指让进程池中的进程,同步的帮你做任务
    22     # print("进程池做任务的效率", time.time() - start)
    23 
    24     p = Pool(os.cpu_count())
    25     start = time.time()
    26     for i in range(100):
    27         p.apply_async(func, args=(i,)) # (进程池的异步处理)是指让进程池中的进程,异步的帮你做任务
    28     print("进程池做任务的效率", time.time() - start)
    29 
    30     start = time.time()
    31     p_1 = []
    32     for i in range(100):
    33         p1 = Process(target=func,args=(i,)) # (多线程的处理)
    34         p1.start()
    35         p_1.append(p1)
    36     [p1.join() for p1 in p_1]
    37     print("多进程做任务的效率",time.time()-start)
    38 # *******************进程池**********************结束
    进程池的操作

    回调函数的使用:

      进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作。

      回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

     1 from multiprocessing import Pool
     2 import requests
     3 import time,os
     4 
     5 def func(url):
     6     res = requests.get(url)
     7     print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
     8     # print(res.text)
     9     if res.status_code == 200:
    10         return url,res.text
    11 
    12 def cal_back(sta):
    13     url,text = sta
    14     print('回调函数的pid', os.getpid())
    15     with open('a.txt','a',encoding='utf-8') as f:
    16         f.write(url + text)
    17     print('回调函数中!',url)
    18 
    19 if __name__ == '__main__':
    20     p = Pool(5)
    21     l = ['https://www.baidu.com',
    22          'http://www.jd.com',
    23          'http://www.taobao.com',
    24          'http://www.mi.com',
    25          'http://www.cnblogs.com',
    26          'https://www.bilibili.com',
    27          ]
    28     print('主进程的pid',os.getpid())
    29     for i in l:
    30         p.apply_async(func, args=(i,),callback=cal_back) # 回调函数是由主进程调用的
    31         # 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
    32     p.close()
    33     p.join()
    回调函数的使用
  • 相关阅读:
    CSS3的box-sizing属性
    html5 --基础笔记2
    html5--基础笔记
    CSS3--阴影,渐变,背景图片
    响应式布局--流式布局
    angular中的this指向问题
    angular中控制器之间的通讯方式
    angular中的$http配置和参数
    console
    h5表单验证的css和js方法
  • 原文地址:https://www.cnblogs.com/fengxb1213/p/12747450.html
Copyright © 2011-2022 走看看