zoukankan      html  css  js  c++  java
  • 进程间通信--队列和管道

    1.进程间通信--队列和管道

    IPC(Inter-Process Communication)

    队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

    # 队列 先进先出
    # IPC
    # from multiprocessing import Queue
    # q = Queue(5)
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # q.put(4)
    # q.put(5)
    # print(q.full())   # 队列是否满了
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.empty())
    # while True:
    #     try:
    #         q.get_nowait()
    #     except:
    #         print('队列已空')
    #         time.sleep(0.5)
    # for i in range(6):
    #     q.put(i)
    
    from multiprocessing import Queue,Process
    def produce(q):
        q.put('hello')
    
    def consume(q):
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=produce,args=(q,))
        p.start()
        c = Process(target=consume, args=(q,))
        c.start()
    队列
    # 队列
    # 生产者消费者模型
    
    # 生产者 进程
    # 消费者 进程
    import time
    import random
    from multiprocessing import Process,Queue
    def consumer(q,name):
        while True:
            food = q.get()
            if food is None:
                print('%s获取到了一个空'%name)
                break
            print('33[31m%s消费了%s33[0m' % (name,food))
            time.sleep(random.randint(1,3))
    
    def producer(name,food,q):
        for i in range(4):
            time.sleep(random.randint(1,3))
            f = '%s生产了%s%s'%(name,food,i)
            print(f)
            q.put(f)
    
    if __name__  == '__main__':
        q = Queue(20)
        p1 = Process(target=producer,args=('Egon','包子',q))
        p2 = Process(target=producer, args=('wusir','泔水', q))
        c1 = Process(target=consumer, args=(q,'alex'))
        c2 = Process(target=consumer, args=(q,'jinboss'))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
    生产者消费者模型
    import time
    import random
    from multiprocessing import Process,JoinableQueue
    def consumer(q,name):
        while True:
            food = q.get()
            print('33[31m%s消费了%s33[0m' % (name,food))
            time.sleep(random.randint(1,3))
            q.task_done()     # count - 1
    
    def producer(name,food,q):
        for i in range(4):
            time.sleep(random.randint(1,3))
            f = '%s生产了%s%s'%(name,food,i)
            print(f)
            q.put(f)
        q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕
    
    if __name__  == '__main__':
        q = JoinableQueue(20)
        p1 = Process(target=producer,args=('Egon','包子',q))
        p2 = Process(target=producer, args=('wusir','泔水', q))
        c1 = Process(target=consumer, args=(q,'alex'))
        c2 = Process(target=consumer, args=(q,'jinboss'))
        p1.start()
        p2.start()
        c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()      # 感知一个进程的结束
    
    #  在消费者这一端:
        # 每次获取一个数据
        # 处理一个数据
        # 发送一个记号 : 标志一个数据被处理成功
    
    # 在生产者这一端:
        # 每一次生产一个数据,
        # 且每一次生产的数据都放在队列中
        # 在队列中刻上一个记号
        # 当生产者全部生产完毕之后,
        # join信号 : 已经停止生产数据了
                    # 且要等待之前被刻上的记号都被消费完
                    # 当数据都被处理完时,join阻塞结束
    
    # consumer 中把所有的任务消耗完
    # producer 端 的 join感知到,停止阻塞
    # 所有的producer进程结束
    # 主进程中的p.join结束
    # 主进程中代码结束
    # 守护进程(消费者的进程)结束
    joinableQueue解决生产者消费者问题

    管道(了解)

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    View Code
    from multiprocessing import Pipe,Process
    
    def func(conn1,conn2):
        conn2.close()
        while True:
            try :
                msg = conn1.recv()
                print(msg)
            except EOFError:
                conn1.close()
                break
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        Process(target=func,args = (conn1,conn2)).start()
        conn1.close()
        for i in range(20):
            conn2.send('吃了么')
        conn2.close()
    View Code
  • 相关阅读:
    SharePoint 2010 新体验3 文档集
    Firebird 修改表名
    C++Builder XE7 up1 简单测试
    Firbird 将可 null 的列更新为 not null
    用delphiXE7 dbExpress Framework提供的功能获取数据表信息
    Linux的基本命令总结
    IOS 隐藏时间条
    最大流 ZQUOJ 10181 && POJ 1273
    最大二分匹配 匈牙利算法模板&&POJ 1469 COURSES
    新加坡第四天下午
  • 原文地址:https://www.cnblogs.com/shangchunhong/p/9277635.html
Copyright © 2011-2022 走看看