zoukankan      html  css  js  c++  java
  • 对列 、生产者与消费者

    一、对列的特点:先进先出   put()和get()方法

    from multiprocessing import Process,Queue
    q = Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())

    结果:

    1
    2
    3
    二、q.get_nowait()和q.put_nowait() 的用法
    2.1 当对列中有值,get()直接取值,对列没有值,get()就会阻塞,get_nowait()会直接报错。需要用到 try...except
    from multiprocessing import Process,Queue
    import queue
    q = Queue()
    q.put(1)
    try:
        print(q.get_nowait())  # 有值取值,没值则queue.Empty报错
    except queue.Empty:
        print('empty')

    结果:

    有值:
    1
    没值:
    empty  

    2.2 对列是有长度的,Queue(2)设置长度为2的对列。当持续put(),长度超多时,使用put()程序会阻塞,数据无法放进去,当使用put_nowait(),则直接报错,数据无法放进去。由于数据可能会丢失,所以很少用到这个put_nowait()。

    from multiprocessing import Process,Queue
    q = Queue(2)
    q.put(1)
    q.put(2)
    q.put_nowait(3)    # 直接报错

    三、判断对列是否为空、满

    from multiprocessing import Process,Queue
    import queue
    # 先进先出
    q = Queue(2)
    q.put(1)
    q.put(2)
    print(q.empty())
    print(q.full())
    
    结果:
    False
    True

    但是在多进程中,这个判断不准

     四、使用对列在进程间的相互通讯

    from multiprocessing import Process,Queue
    def consume(q):
        print('son:',q.get())
        q.put('asd')
    if __name__ == '__main__':
        q = Queue()
        p = Process(target= consume,args=(q,))
        p.start()
        q.put({'123':123})
        p.join()
        print('Foo:',q.get())
    
    结果:
    son: {'123': 123}
    Foo: asd

    五、生产者与消费者

    import time
    import random
    from multiprocessing import Process,Queue   # 默写
    def consumer(q,name):
      # 消费者
    while True: food = q.get() if food is None:break time.sleep(random.uniform(0.3,0.8)) print('%s吃了一个%s'%(name,food)) def producer(q,name,food):
      # 生产者
    for i in range(10): time.sleep(random.uniform(0.3,0.8)) print('%s生产了%s%s'%(name,food,i)) q.put(food + str(i)) if __name__ == '__main__': q = Queue() c1 = Process(target=consumer,args=(q,'alex')) c1.start() p1 = Process(target=producer, args=(q, '杨忠和','泔水')) p1.start() p1.join() q.put(None)

    结果为:

    杨忠和生产了泔水0
    杨忠和生产了泔水1
    alex吃了一个泔水0
    杨忠和生产了泔水2
    alex吃了一个泔水1
    杨忠和生产了泔水3
    alex吃了一个泔水2
    杨忠和生产了泔水4
    alex吃了一个泔水3
    杨忠和生产了泔水5
    alex吃了一个泔水4
    杨忠和生产了泔水6
    alex吃了一个泔水5
    杨忠和生产了泔水7
    alex吃了一个泔水6
    杨忠和生产了泔水8
    alex吃了一个泔水7
    杨忠和生产了泔水9
    alex吃了一个泔水8
    alex吃了一个泔水9

    5.2 JoinableQueue 类

    import time
    import  random
    from multiprocessing import Process,JoinableQueue
    def consumer(jq,name):
        # 处理数据
        while True:
            food = jq.get()
            time.sleep(random.uniform(0.3,0.8))
            print('%s吃了一个%s'%(name,food))
            jq.task_done()
    def producer(jq,name,food):
        # 获取数据
        for i in range(10):
            time.sleep(random.uniform(0.3,0.8))
            print('%s生产了%s%s'%(name,food,i))
            jq.put(food + str(i))
    if __name__ == '__main__':
        jq = JoinableQueue()
        c1 = Process(target=consumer,args=(jq,'alex'))
        c2 = Process(target=consumer,args=(jq,'wuser'))
        c1.daemon = True
        c2.daemon = True
        c1.start()
        c2.start()
        p1 = Process(target=producer, args=(jq, '杨宗和','泔水'))
        p2 = Process(target=producer, args=(jq, '何思浩','鱼刺鱼骨头'))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        jq.join()
    JoinableQueue
    put
    get
    task_done 通知对列已经有一个数据被处理了
    q.join() 阻塞直到放入对列中所有的数据都被处理掉(有多少个数据就接受多少个taskdone)


  • 相关阅读:
    闭包
    值类型和引用类型的区别?
    计算机基础知识和基本操作
    Git常用命令
    任务八
    CSS任务七
    CSS任务六
    MySql+EF6,不能选实体框架6.x或者闪退
    以一定的格式生成最新的数据库流水号
    layui父窗体获取弹出窗体元素
  • 原文地址:https://www.cnblogs.com/youhongliang/p/9681386.html
Copyright © 2011-2022 走看看