zoukankan      html  css  js  c++  java
  • python消息队列Queue

    实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”

    #coding=utf-8
    from multiprocessing import Queue 
     
    q = Queue(3)#初始化一个Queue对象,最多可接收三条put消息
    q.put('message-1')
    q.put('message-2')
    print(q.full())#False,是否满了
    q.put('message-3')
    print(q.full())#True
     
    #因为消息队列已满,下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个try会立即抛出异常
    try:
        q.put('message-4',True,2)
    except:
        print('except1,消息队列已满,现有消息数量:%s'%q.qsize())
     
    try:
        q.put_nowait('message-4')
    except:
        print('except2,消息队列已满,现有消息数量:%s'%q.qsize())
     
    #判断队列是否已满
    if not q.full():
        q.put_nowait('message-4')
     
    #读取消息时,先判断消息队列是否为空,在读取
    if not q.empty():
        for i in range(q.qsize()):
            print(q.get())#q.get会阻塞,q.get_nowait()不阻塞,但会抛异常   

    False

    True

    except1,消息队列已满,现有消息数量:3

    except2,消息队列已满,现有消息数量:3

    message-1

    message-2

    message-3

    实例二:通过Process进程间通信


    from multiprocessing import Process,Queue
    import os,time,random 
     
    #写数据
    def write(q):
        for value in ['A','B','C']:
            print('Put %s to queue...'%value)
            q.put(value)
            time.sleep(random.random())
     
    #读数据
    def read(q):
        while True:
            if not q.empty():
                value = q.get(True)
                print('Get %s from queue...'%value)
                time.sleep(random.random())
            else:
                break
     
    if __name__ == '__main__':
        print('start...')
        q = Queue()
        #父进程的queue传递给子进程
        pw = Process(target=write,args=(q,))
        pr = Process(target=read,args=(q,))              
        #写进程
        pw.start()
        pw.join()
        #读进程
        pr.start()
        pr.join()
        print('done...')

    start...

    Put A to queue...

    Put B to queue...

    Put C to queue...

    Get A from queue...

    Get B from queue...

    Get C from queue...

    done...

    实例三:通过Manager进程间通信
    from multiprocessing import Manager,Pool
    import os,time,random 
     
    #写数据
    def writer(q):
        print('writer启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
        for i in 'chaoge':
            q.put(i)
     
    #读数据
    def reader(q):
        print('reader启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
        for i in range(q.qsize()):
            print('reader 从Queue获取到消息:%s'%q.get())
     
     
    if __name__ == '__main__':
        print('(%s) start'%os.getpid())
        q = Manager().Queue()#使用Manager中的Queue来初始化
        po=Pool()
        #使用阻塞模式创建进程,这样就不需要再reader中使用死循环了,可以等write执行完成后,再用reader
        po.apply(writer,(q,))
        po.apply(reader,(q,))
        #写进程
        po.close()
        po.join()
        print('(%s) End'%os.getpid())

    (7720) start

    writer启动(7284),父进程为(7720)

    reader启动(8712),父进程为(7720)

    reader 从Queue获取到消息:c

    reader 从Queue获取到消息:h

    reader 从Queue获取到消息:a

    reader 从Queue获取到消息:o

    reader 从Queue获取到消息:g

    reader 从Queue获取到消息:e

    (7720) End




  • 相关阅读:
    async函数
    Generator生成器
    ES6中的迭代器iterator
    Java多线程系列---“JUC锁”06之 公平锁(下)
    Java多线程系列---“JUC锁”05之 公平锁(上)
    Java多线程系列---“基础篇”14之 wait,sleep,join,yield,park,unpark,notify等通信机制对比
    Java多线程系列---“JUC锁”04之 LockSupport
    Java多线程系列---“JUC锁”03之 Condition
    Java多线程系列---“JUC锁”02之 ReentrantLock
    Java多线程系列---“JUC锁”01之 框架
  • 原文地址:https://www.cnblogs.com/fonyer/p/9784870.html
Copyright © 2011-2022 走看看