zoukankan      html  css  js  c++  java
  • 上海 day31--进程间通信IPC机制、生产者与消费者模型

    目  录

    一、进程间通信--引出队列

    队列

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    相关方法:

      .put()  向队列中添加值

      .get()  向队列取值

      .full()  判断队里是否已满

      .empty()  判断队列是否已经为空

      .get_nowait()  若队列中没有值可取时,直接报错不等待!

    from multiprocessing import Process,Queue
    
    q = Queue(5)   # 参数5表示该队列的容量只能装5个元素
    q.put(1)  # 点put()方法向队列中添加元素
    q.put(2)
    q.put(3)
    # print(q.full())  # False  点full() 方法判断队列是否已经装满
    q.put(4)
    q.put(5)
    # print(q.full())  # True
    # q.put(6)   # 当超过队列容量的时候程序会一直等待直到队列中有元素被取走
    # print(q.full())  # 阻塞!!!  不会执行该行代码,因为q.put(6)还在停滞状态
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())  # 点empty() 方法判断队列是否为空  False
    print(q.get())
    print(q.get())
    print(q.empty())  # True
    # print(q.get())  # 阻塞态!!  当队列取值超出范围没有值可取时,会出现阻塞 直到队列中有值可取
    # print(q.empty())  # 该行代码不会被执行,因为程序已经阻塞在上一行
    print(q.get_nowait())  # 若队列中没有值可取的话,不等待直接报错
    '''
    报错结果:
         raise Empty
    queue.Empty
    '''

    注意:

    点full() 方法    点empty() 方法   点get_nowait() 方法都不适用与多进程情况!

    因为多进程情况下,如果某一时刻队列为空,但是判断的同时有一个进程向队列中添加值,情况就不好判断

    """
    full
    get_nowait
    empty
    都不适用于多进程的情况
    原因:多进程的时候‘空’和‘满’的状况不好判断,也许队列刚被取空就有其他进程向队列中添加值。。。
    """

    二、进程间通信IPC机制

    进程间通信

    IPC(Inter-Process Communication)

    队列

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    子进程放数据,主进程获取数据

    def producer(q):
        q.put('hello baby!')
    
    # def consumer(q):
    #     q.get()
    
    if __name__ == '__main__':
        q = Queue(5)
        p = Process(target=producer,args=(q,))
        p.start()
        print(q.get()) # 该步骤不需要time.sleep()模拟延迟,因为get() 会知道子进程向队列中放入值才能取,否则进入阻塞

    两个子进程相互放、取数据

    from multiprocessing import Process,Queue
    
    def producer(q):
        q.put('hello baby!')
    
    def consumer(q):
        print(q.get())  # 取数据
    
    if __name__ == '__main__':
        q = Queue(5)
        p = Process(target=producer,args=(q,))
        c = Process(target=consumer,args=(q,))
        p.start()
        c.start()
        print('') 
    '''
    执行结果:
    主
    hello baby!
    '''

    三、生产者与消费者模型

    """
    生产者:生产/制造数据的
    消费者:消费/处理数据的
    例子:做包子的,买包子的
            1.做包子远比买包子的多
            2.做包子的远比包子的少
            供需不平衡的问题
    """

    引出问题:

    from multiprocessing import Process,JoinableQueue,Queue
    import time
    import os
    import random
    
    def producer(name,food,q):
        for i in range(10):
            data = '%s 生产了%s%s'%(name,food,i)
            time.sleep(random.random())  # random.random()随机产生0~1之间的小数
            q.put(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            # if data == None: break
            print('%s 吃了%s'%(name,data))
            time.sleep(random.random())
    
    
    if __name__ == '__main__':
        q = Queue()
    
        p1 = Process(target=producer,args=('egon','馒头',q))
        p2 = Process(target=producer,args=('jason','生蚝',q))
        c1 = Process(target=consumer,args=('',q))
        c2 = Process(target=consumer,args=('',q))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        # p1.join()
        # p2.join()
    
        # q.put(None)
        # q.put(None)
    引出问题

    问题:消费者进程出现阻塞,无值可取!

    解决思想:

      首先要确定生产者已经生产完毕,然后确定消费者已经取值完毕!

    方法1:

    在生产者执行完毕后,主进程添加队列 None值,当消费者在队列中取值 == None时,退出
        应用方法:join()
    from multiprocessing import Process,JoinableQueue,Queue
    import time
    import os
    import random
    
    def producer(name,food,q):
        for i in range(10):
            data = '%s 生产了%s%s'%(name,food,i)
            time.sleep(random.random())  # random.random()随机产生0~1之间的小数
            q.put(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            # if data == None: break
            print('%s 吃了%s'%(name,data))
            time.sleep(random.random())
    
    
    if __name__ == '__main__':
        q = Queue()
    
        p1 = Process(target=producer,args=('egon','馒头',q))
        p2 = Process(target=producer,args=('jason','生蚝',q))
        c1 = Process(target=consumer,args=('',q))
        c2 = Process(target=consumer,args=('',q))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        # p1.join()
        # p2.join()
    
        # q.put(None)
        # q.put(None)
    
    '''
    出现问题:消费者进程阻塞,无值可取
    解决方法:
        首先要确定生产者已经生产完毕,然后确定消费者已经取值完毕!
        在生产者执行完毕后,主进程添加队列 None值,当消费者在队列中取值 == None时,退出
        应用方法:join()
    
    '''
    方法一

    方法2:

    JoinableQueue   能够被等待的队列

    from multiprocessing import Process,JoinableQueue
    import time
    import os
    import random
    
    def producer(name,food,q):
        for i in range(10):
            data = '%s 生产了%s%s'%(name,food,i)
            time.sleep(random.random())  # random.random()随机产生0~1之间的小数
            q.put(data)
    
    def consumer(name,q):
        while True:
            data = q.get()
            # if data == None: break
            print('%s 吃了%s'%(name,data))
            time.sleep(random.random())
            # q.task_done() 告诉队列你已经从队列中取出了一个数据 并且处理完毕了
            q.task_done()
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        p1 = Process(target=producer,args=('egon','馒头',q))
        p2 = Process(target=producer,args=('jason','生蚝',q))
        c1 = Process(target=consumer,args=('',q))
        c2 = Process(target=consumer,args=('',q))
        p1.start()
        p2.start()
        c1.daemon = True  # 给两个消费者添加守护进程,等待主进程执行完 q.join() 之后自动结束
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        # q.join() 等到队列中数据全部取出
        q.join()
    方法二

      

  • 相关阅读:
    <译>Spark Sreaming 编程指南
    <译>Zookeeper官方文档
    <译>Flink官方文档-Flink概述
    <译>流计算容错
    <译>Flink编程指南
    <续>调度算法补充
    storm源码阅读笔记之任务调度算法
    海量数据处理方法归类
    storm中worker、executor、task之间的关系
    javax.swing.jFrame
  • 原文地址:https://www.cnblogs.com/qinsungui921112/p/11340845.html
Copyright © 2011-2022 走看看