zoukankan      html  css  js  c++  java
  • 进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue、总结(五)

    #  队列

    # 队列 先进先出
    # IPC
    # from multiprocessing import Queue
    # q = Queue(5)
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # q.put(4)
    # q.put(5)
    # print(q.full()) # 队列是否满了,已满话再次放入会阻塞
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.empty()) # 队列是否空了,空了再去会阻塞
    # while True: #不阻塞处理
    # try:
    # q.get_nowait()
    # except:
    # print('队列已空')
    # time.sleep(0.5)
    # for i in range(6):
    # q.put(i)

    from multiprocessing import Queue,Process
    def produce(q):
    q.put('hello')

    def consume(q):
    print(q.get())

    if __name__ == '__main__': #在win下才需要这段代码
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()

    生产者消费者模型

    # 队列
    # 生产者消费者模型

    # 生产者 进程
    # 消费者 进程
    import time
    import random
    from multiprocessing import Process,Queue
    def consumer(q,name):
    while True:
    food = q.get()
    if food is None: #用q.empty()) 不可靠,也许在上报空后,另外又有生产者放入东西
    print('%s获取到了一个空'%name)
    break
    print('33[31m%s消费了%s33[0m' % (name,food))
    time.sleep(random.randint(1,3))

    def producer(name,food,q):
    for i in range(4):
    time.sleep(random.randint(1,3))
    f = '%s生产了%s%s'%(name,food,i)
    print(f)
    q.put(f)

    if __name__ == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start() 异步
    p2.start()
    c1.start()
    c2.start() 异步
    p1.join() # 这里非异步转同步,而是判断生产者是否结束
    p2.join()
    q.put(None)
    q.put(None)

    当取到None时为什么有阻塞情况,未显示程序执行完,因为两个人其中一个人拿到None,
    另一个人就取不到值出现等待情况,有几个人就put几个None就解决了

    #  生产者消费者模型_joinableQueue(解决一个None,多人get阻塞问题)

    import time
    import random
    from multiprocessing import Process,
    JoinableQueue
    def consumer(q,name):
    while True:
    food = q.get()
    print('33[31m%s消费了%s33[0m' % (name,food))
    time.sleep(random.randint(1,3))
    q.task_done() # count - 1

    def producer(name,food,q):
    for i in range(4):
    time.sleep(random.randint(1,3))
    f = '%s生产了%s%s'%(name,food,i)
    print(f)
    q.put(f)
    q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕

    if __name__ == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join() # 感知一个进程的结束

    # 在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

    # 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
    # 且要等待之前被刻上的记号都被消费完
    # 当数据都被处理完时,join阻塞结束

    # consumer 中把所有的任务消耗完
    # producer 端 的 join感知到,停止阻塞
    # 所有的producer进程结束
    # 主进程中的p.join结束
    # 主进程中代码结束
    # 守护进程(消费者的进程)结束

    总结

    # 信号量 Semaphore
    from multiprocessing import Semaphore
    # 用锁的原理实现的,内置了一个计数器
    # 在同一时间 只能有指定数量的进程执行某一段被控制住的代码

    # 事件
    # wait阻塞收到事件状态控制的同步组件
    # 状态 True False is_set
    # true -> false clear()
    # false -> true set()
    # wait 状态为True不阻塞 状态为False的时候阻塞

    # 队列
    # Queue
    # put 当队列满的时候阻塞等待队列有空位置
    # get 当队列空的时候阻塞等待队列有数据
    # full empty 不完全准确

    # JoinableQueue 和Queue比较,多两个方法
    # task_done 一般和get(消费者)连用
    # join 一般和put(生产者)连用



  • 相关阅读:
    重载与重写的区别
    类加载过程(clinit()),对象实例化过程(init())
    [c++]内联函数
    [c++]默认参数
    【一套C语言控制台的输出框代码】
    [windows]部分前缀以及其意义
    后缀名“.dll .obj .lib”和“ .so .o .a”文件的区别含义
    为什么变量一定要声明?
    wndows程序设计之书籍知识与代码摘录-封装一个类似printf的messagebox
    wndows程序设计之书籍知识与代码摘录-获取视屏显示器像素等参数GetsystemMetrics
  • 原文地址:https://www.cnblogs.com/mys6/p/10848835.html
Copyright © 2011-2022 走看看