zoukankan      html  css  js  c++  java
  • 042.Python进程队列介绍


    进程队列介绍

    1  基本语法及过程

    先进先出,后进后出,q = Queue()

    过程

    (1)把数据放到q队列中 put

    (2)把书局从队列中拿出来 get

    复制代码
    from multiprocessing import Process,Queue
    
    q = Queue()
    
    q.put(111)
    
    res = q.get()
    
    print (res)
    复制代码

    执行

    [root@node10 python]# python3 test.py
    111

    (3)当队列里面的值都拿出来了,已经没有数据的时候,在获取会阻塞

    复制代码
    from multiprocessing import Process,Queue
    q = Queue()
    q.put(111)
    res = q.get()
    print (res)
    res = q.get()
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py
    111
    Traceback (most recent call last):   #这里阻塞住,使用ctrl+c退出
      File "test.py", line 6, in <module>
        res = q.get()
      File "/usr/lib64/python3.6/multiprocessing/queues.py", line 94, in get
        res = self._recv_bytes()
      File "/usr/lib64/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
        buf = self._recv_bytes(maxlength)
      File "/usr/lib64/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
        buf = self._recv(4)
      File "/usr/lib64/python3.6/multiprocessing/connection.py", line 379, in _recv
        chunk = read(handle, remaining)
    KeyboardInterrupt
    复制代码

    (4)get_nowait 无论有没有都拿,如果拿不到,直接报错

    复制代码
    from multiprocessing import Process,Queue
    q = Queue()
    q.put(111)
    res = q.get()
    print (res)
    res = q.get_nowait()
    复制代码

    执行

    (5)异常处理,抑制错误

    语法:

    复制代码
    try:
        code1
        code2
    except:
        code1
        code2
    复制代码

    把可能出现异常的代码扔到try代码块中
    如果发生异常,直接执行except中的代码块,抑制错误

    复制代码
    q = Queue()
    q.put(111)
    res = q.get()
    print (res)
    try:
            res = q.get_nowait()
    except:
            pass
    复制代码

    执行

    [root@node10 python]# python3 test.py
    111

    2 可以使用queue 指定队列长度

    当指定队列的长度,例如对垒长度为3,则最多放3个,超过最大长度再放,会直接阻塞

    复制代码
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞
    
    q.put_nowait(4) #如果队列已经满了,再放值,直接报错
    复制代码

    执行

    异常处理

    复制代码
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞
    
    #q.put_nowait(4) #如果队列已经满了,再放值,直接报错
    try:
            q.put_nowait(4)
    except:
            pass
    复制代码

    不会报错,但是没有输出

    1.2 full empty介绍 

    如果队列满了,返回真,反之亦然

    复制代码
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4) 阻塞 如果队列已经满了,在放值,直接阻塞
    
    #q.put_nowait(4) #如果队列已经满了,再放值,直接报错
    try:
            q.put_nowait(4)
    except:
            pass
    res = q.full()
    print(res)
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    True

    empty如果队列空了,返回真,反之亦然

    复制代码
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    1
    2
    3
    True

    加如队列不为空

    复制代码
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    #print(q.get())
    print(q.empty())
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    1
    2
    False

    1.3 进程之间的通讯

    复制代码
    from multiprocessing import Process,Queue
    def func(q):
            #主进程添加值,子进程可以通过队列拿到
            res  = q.get()
            print ("I'm the subprocess",res)
            q.put("from subprocess")
    q = Queue()
    p = Process(target=func,args=(q,))
    p.start()
    #主进程添加数据
    q.put("I'm the main process")
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    I'm the subprocess I'm the main process

    产生问题

    复制代码
    def func(q):
            #主进程添加值,子进程可以通过队列拿到
            res  = q.get()
            print ("I'm the subprocess",res)
            q.put("from subprocess")
    q = Queue()
    p = Process(target=func,args=(q,))
    p.start()
    #主进程添加数据
    q.put("I'm the main process")
    res = q.get()
    print(res)
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    I'm the main process
    
    #这里发生阻塞

    原因,是因为再主进程执行完成输入消息和消费消息之后,就结束了,子进程还没有执行完成,这时候子进程直接被干掉

    使用join

    子进程添加的值,主进程通过队列拿到

    复制代码
    from multiprocessing import Process,Queue
    def func(q):
            #主进程添加值,子进程可以通过队列拿到
            res  = q.get()
            print ("I'm the subprocess",res)
            q.put("from subprocess")
    q = Queue()
    p = Process(target=func,args=(q,))
    p.start()
    #主进程添加数据
    q.put("I'm the main process")
    p.join()
    #子进程添加的值,主进程通过队列拿到 res = q.get() print(res)
    复制代码

    执行

    [root@node10 python]# python3 test.py vi test.py
    I'm the subprocess I'm the main process
    from subprocess

    二 生产者和消费者模型

    2.1 介绍

    爬虫例子

    1号进程负责爬取网页中所有内容,可以看成一个生产者
    2号进程负责匹配提取网页中的关键字,可以看成一个消费者

    有时可能生产者必消费者块,反之亦然,所以为了减少生产者和消费者速度上的差异化,加了一个中间的缓冲队列

    生产者和消费者模型从程序上看就是一个存放数据和拿取数据的过程,最为理想的生产者消费者模型,两者之间的速度相对平均.

    2.2 建立模型

    模拟汽车生产和消费

    复制代码
    from multiprocessing import Process,Queue
    import random,time
    #消费者模型
    def consumer(q,name):
            while True:
                    car = q.get()
                    print("%s buy a %s"%(name,car))
    #生产者模型
    def producer(q,name,car):
            for i in range(3):
                    print("%s produce %s,%s"%(name,car,i))
                    q.put(car+str(i))
    
    q = Queue()
    #消费者
    c1 = Process(target=consumer,args=(q,"Marketting"))
    c1.start()
    
    #生产者
    p1 = Process(target=producer,args=(q,"Factory","BYD"))
    p1.start()
    复制代码

    执行

    复制代码
    [root@node10 python]# vi test.py
    [root@node10 python]# python3 test.py vi test.py
    Factory produce BYD,0
    Factory produce BYD,1
    Factory produce BYD,2
    Marketting buy a BYD0
    Marketting buy a BYD1
    Marketting buy a BYD2
    复制代码

    生产太快,加一个延迟

    复制代码
    import random,time
    #消费者模型
    def consumer(q,name):
            while True:
                    car = q.get()
                    time.sleep(random.uniform(0.1,1))
                    print("%s buy a %s"%(name,car))
    #生产者模型
    def producer(q,name,car):
            for i in range(3):
                    time.sleep(random.uniform(0.1,1))
                    print("%s produce %s,%s"%(name,car,i))
                    q.put(car+str(i))
    
    q = Queue()
    #消费者
    c1 = Process(target=consumer,args=(q,"Marketting"))
    c1.start()
    
    #生产者
    p1 = Process(target=producer,args=(q,"Factory","BYD"))
    p1.start()
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py vi test.py
    Factory produce BYD,0
    Factory produce BYD,1
    Marketting buy a BYD0
    Marketting buy a BYD1
    Factory produce BYD,2
    Marketting buy a BYD2
    #这里有阻塞
    复制代码

    解决最终的阻塞问题

    复制代码
    from multiprocessing import Process,Queue
    import random,time
    #消费者模型
    def consumer(q,name):
            #死循环,即一直等待生产出来,然后消费,最终在这里会阻塞
            while True:
                    car = q.get()
                    #在这里判断,当car时空值,退出
                    if car is None:
                            break
                    #加入延迟阻塞
                    time.sleep(random.uniform(0.1,1))
                    print("%s buy a %s"%(name,car))
    #生产者模型
    def producer(q,name,car):
            for i in range(3):
                    time.sleep(random.uniform(0.1,1))
                    print("%s produce %s,%s"%(name,car,i))
                    q.put(car+str(i))
    
    q = Queue()
    #消费者
    c1 = Process(target=consumer,args=(q,"Marketting"))
    c1.start()
    
    #生产者
    p1 = Process(target=producer,args=(q,"Factory","BYD"))
    p1.start()
    
    #由于下面插入空值,而且在消费者做了判断,收到空值就会退出,这里执行完,主进程就会退出,导致消费者子进程无法完成消费,使用join
    p1.join()
    
    #在这里加一个控制,当执行完生产,在这里插入一个空值
    q.put(None)
    复制代码

    执行

    复制代码
    [root@node10 python]# python3 test.py vi test.py
    Factory produce BYD,0
    Marketting buy a BYD0
    Factory produce BYD,1
    Marketting buy a BYD1
    Factory produce BYD,2
    Marketting buy a BYD2
    复制代码

    则一个简单模型建立

    学习记录,小白一枚
  • 相关阅读:
    python3--生成器
    python3--列表生成式
    python3--装饰器高级学习版
    python3--嵌套函数
    python3-装饰器
    《Hexo+github搭建个人博客》
    Html网页的代码
    关于内联框架
    性能调优
    Java知识总结
  • 原文地址:https://www.cnblogs.com/wangsirde0428/p/14322778.html
Copyright © 2011-2022 走看看