zoukankan      html  css  js  c++  java
  • python之Queue

    一、多进程的消息队列

      “消息队列”是在消息的传输过程中保存消息的容器

      消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

      操作系统提供了很多机制来实现进程中的通信,multiprocessing模块就提供了queue和pipe两种方法来实现

      使用multiprocessing里面的queue来实现消息队列,代码如下:

    from multiprocessing import Process
    from multiprocessing import Queue
    
    
    
    def write(q):
        for i in ["a","b","c","d"]:
            q.put(i)
            print ("put {0} to queue".format(i))
    
    
    def read(q):
        while 1:
            result = q.get()
            print ("get {0} from queue".format(result))
    
    def main():
        q = Queue()
        pw = Process(target=write,args=(q,))
        pr = Process(target=read,args=(q,))
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()
    
    if __name__ == "__main__":
        main()

    结果:

    put a to queue
    put b to queue
    put c to queue
    put d to queue
    get a from queue
    get b from queue
    get c from queue
    get d from queue

      通过multiprocessing里面的pipe来实现消息队列。pipe方法返回(conn1,conn2)代表一个管道的两个端。pipe方法有duplex参数,如果duplex参数为true(默认值),那么管道是全双工模式,也就是说conn1和conn2均可收发。duplex为false,conn1只负责接受消息,conn2只负责发送消息。

      send和recv方法分别是发送和接收消息的方法,close方法表示关闭管道。当消息接收结束以后,关闭管道。实例代码如下:

    from multiprocessing import Process,Pipe
    
    import time
    
    
    def proc1(pipe):
        for i in xrange(1,10):
            pipe.send(i)
            print ("send {0} to pipe".format(i))
            time.sleep(1)
    def proc2(pipe):
        n = 9
        while n>0:
            result = pipe.recv()
            print ("recv {0} from pipe".format(result))
            n -= 1
    
    def main():
        pipe = Pipe(duplex=False)
        print (type(pipe))
        p1 = Process(target = proc1,args=(pipe[1],))
        p2 = Process(target= proc2,args=(pipe[0],))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        pipe[0].close()
        pipe[1].close()
    
    if __name__ == "__main__":
        main()

    结果:

    <type 'tuple'>
    send 1 to pipe
    recv 1 from pipe
    send 2 to pipe
    recv 2 from pipe
    send 3 to pipe
    recv 3 from pipe
    send 4 to piperecv 4 from pipe

    send 5 to pipe
    recv 5 from pipe
    send 6 to pipe
    recv 6 from pipe
    send 7 to pipe
    recv 7 from pipe
    send 8 to pipe
    recv 8 from pipe
    send 9 to pipe
    recv 9 from pipe

    二、Queue模块

      Python提供了Queue模块来专门实现消息队列。Queue对象实现一个FIFO队列(其他的还有lifo、priority队列)。Queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

      Queue.qsize:返回消息队列的当前空间。返回值不一定可靠

      Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。

      Queue.full():类似上边,判断消息队列是否满

      Queue.put(item,block=True,timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception

      Queue.put_nowait(item):相当于put(itme,False)

      Queue.get(block = True,timeout = None):获取一个消息,其他同put

      Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成

      Queue.join():实际上意味着等队列为空,再执行别的操作

    程序实例如下:

    from Queue import Queue
    from threading import Thread
    
    import time
    
    
    class Proceduer(Thread):
        def __init__(self,queue):
            super(Proceduer,self).__init__()
            self.queue = queue
    
        def run(self):
            try:
                for i in xrange(1,10):
                    print ("put data is : {0} to queue".format(i))
                    self.queue.put(i)
            except Exception as e:
                print ("put data error")
                raise e
    
    class Consumer_odd(Thread):
        def __init__(self,queue):
            super(Consumer_odd,self).__init__()
            self.queue = queue
        def run(self):
            try:
                while not self.queue.empty:
                    number = self.queue.get()
                    if number % 2 != 0:
                        print ("get {0} from queue ODD,thread name is :{1}".format(number,self.getName()))
                    else:
                        self.queue.put(number)
                    time.sleep(1)
            except Exception as e :
                raise  e
    
    
    class Consumer_even(Thread):
        def __init__(self,queue):
            super(Consumer_even,self).__init__()
            self.queue = queue
        def run(self):
            try:
                while not self.queue.empty:
                    number = self.queue.get()
                    if number % 2 == 0:
                        print ("get {0} from queue Even,thread name is :{1}".format(number,self.getName()))
                    else:
                        self.queue.put(number)
                    time.sleep(1)
            except Exception as e :
                raise  e
    
    def main():
        queue = Queue()
        p = Proceduer(queue = queue)
        p.start()
        p.join()
        time.sleep(1)
        c1 = Consumer_odd(queue=queue)
        c2 = Consumer_even(queue=queue)
        c1.start()
        c2.start()
        c1.join()
        c2.join()
        print ("All threads is terminate!")
    
    if __name__ == '__main__':
        main()

    结果:

    put data is : 2 to queue
    put data is : 3 to queue
    put data is : 4 to queue
    put data is : 5 to queue
    put data is : 6 to queue
    put data is : 7 to queue
    put data is : 8 to queue
    put data is : 9 to queue
    get 1 from queue ODD,thread name is :Thread-2
    get 2 from queue Even,thread name is :Thread-3
    get 3 from queue ODD,thread name is :Thread-2
     get 4 from queue Even,thread name is :Thread-3
    get 5 from queue ODD,thread name is :Thread-2
     get 6 from queue Even,thread name is :Thread-3
    get 9 from queue ODD,thread name is :Thread-2
    get 8 from queue Even,thread name is :Thread-3
     get 7 from queue ODD,thread name is :Thread-2

  • 相关阅读:
    黑马程序员——指针的应用
    黑马程序员——C语言基础常量、运算符、函数
    黑马程序员——数组
    黑马程序员——循环结构for,while,do..while
    webView去掉右侧导航条
    使用Eclipse构建Maven的SpringMVC项目
    win7 自动登录
    eclipse 自动提示
    apache+php+mysql 环境配置
    KMP子串查找算法
  • 原文地址:https://www.cnblogs.com/huangdongju/p/8043484.html
Copyright © 2011-2022 走看看