zoukankan      html  css  js  c++  java
  • 网络编程 队列 管道 数据共享 数据池

    队列 :是进程之间安全的 自带锁
    from queue import Queue
    q = Queue(2)
    给参数控制队列的大小如果超过大小且没人取值会一直阻塞
    1. 
    q.put(1) 添加数据 添加一个多一个
    2. 
    q.put_nowait(2) 超过队列大小 将引发Queue.Empty异常
    3. 
    print(q.get()) 取一个少一个 取值没值的时候阻塞
    4. 
    print(q.get_nowait()) 取值没有值的值的时候 报错 queue.Empty
    5. 
    print(q.full()) 看队列满不满 在多进程中是不准确的
    6. 
    print(q.empty()) 看列表空不空 在多进程中是不准确的
    7. 
    print(q.qsize()) 返回队列中目前项目正确数量

    1队列 先进先出FIFO 维护秩序的时候用的较多, 买票  秒杀

    from multiprocessing import Queue
    q=Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.qsize())
    print(q.get())
    print(q.qsize())
    print(q.get())
    print(q.get())
    print(q.get())  #没有值的时候回堵塞
    print(q.get_nowait())    #有值的时候会取值
    print(q.get_nowait())   #没有值的时候会报错
    print(q.full())  #在实例化的时候加一个值 当队列里的值满了会报True
    print(q.empty())   #当队列里没有值了 会报True
    print(q.qsize())   #获取管道里数据的个数
    View Code

     子程序进 子程序出

    # from multiprocessing import Queue,Process
    # def func(q):
    # q.put(22)
    #
    # def func1(q):
    # print(q.get())
    #
    # if __name__ == '__main__':
    # q=Queue()
    # p=Process(target=func,args=(q,))
    # p.start()
    # p = Process(target=func1, args=(q,))
    # p.start()

    # 生产者消费者模型 - 解决创造(生产)数据和处理(消费)数据的效率不平衡问题
    # 把创造数据 和  处理数据放在不同的进程中,
    # 根据他们的效率来调整进程的个数
    # 生产数据快 消费数据慢 内存空间的浪费
    # 消费数据快 生产数据慢 效率低下 
    解决办法
    第一版本 
    from multiprocessing import Queue,Process
    
    def func1(q,name):
        while True:
            food=q.get()
            print("%s吃了%s"  %(name,food))
    
    def func2(q,name,food,n=10):
        for i in range(n):
            fd=food+str(i)
            print('%s 生产了 %s' % (name, fd))
            q.put(fd)
    
    if __name__ == '__main__':
        q=Queue()
        p=Process(target=func1,args=(q,"姚志强"))
        p.start()
        p1 = Process(target=func2, args=(q,"戴恩","泔水"))
        p1.start()
    View Code

    第二版本 采用from multiprocessing import JoinableQueue 这个模块

    import time
    import random
    from multiprocessing import JoinableQueue,Process
    def func1(q,name):
        while True:
            food=q.get()
            print("%s吃了%s" % (name,food))
            time.sleep(1)
            q.task_done()
    def  func2(q,name,food,n=10):
        for i  in range(n):
            time.sleep(random.random())
            fd=food+str(i)
            print("%s制造了%s" % (name,fd))
            q.put(fd)
        q.join()
    
    if __name__ == '__main__':
        q=JoinableQueue(1)
        c=Process(target=func1,args=(q,"姚志强"))
        c.daemon = True
        c.start()
        c1=Process(target=func1,args=(q,"姚志强"))
        c1.daemon = True
        c1.start()
        z=Process(target=func2,args=(q,"戴恩","泔水"))
        z.start()
        z1 = Process(target=func2, args=(q, "戴恩", "清泉"))
        z1.start()
        z.join()
        z1.join()
    View Code
    # 只有multiprocessing中的队列 才能帮助你 实现 IPC
    # 永远不可能出现数据不安全的情况,多个进程不会同时取走同一个数据
    # 提供给你的方法
    # 由于先进先出的特点+进程通信的功能+数据进程安全,经常用它来完成进程之间的通信
    # 生产者消费者模型
        # 生产者和消费者的效率平衡的问题
        # 内存的控制 - 队列的长度限制
        # 让消费者自动停下来
    # JoinableQueue
        # 在消费数据的时候 task_done
        # 在生产端主进程 join

    二 管道

    # 管道
    # 队列就是基于管道实现的
    # 队列 数据安全的
    # 管道 数据不安全的
    # 队列 = 管道 + 锁

    # from multiprocessing import Pipe
    # left,right = Pipe()
    # left.send('aaa')
    # print(right.recv())
    
    # from multiprocessing import Pipe,Process
    # def consumer(left,right):
    #     left.close()
    #     while True:
    #         try:
    #             print(right.recv())
    #         except EOFError:
    #             break
    #
    # if __name__ == '__main__':
    #     left,right = Pipe()
    #     p = Process(target=consumer,args=(left,right))
    #     p.start()
    #     right.close()
    #     for i in range(10):
    #         left.send('hello')
    #     left.close()
    View Code

    EOF异常的触发
        # 在这一个进程中 如果不在用这个端点了,应该close
        # 这一在recv的时候,如果其他端点都被关闭了,就能够知道不会在有新的消息传进来
        # 此时就不会在这里阻塞等待,而是抛出一个EOFError
        # * close并不是关闭了整个管道,而是修改了操作系统对管道端点的引用计数的处理

    # from multiprocessing import Process,Pipe
    #
    # def consumer(p,name):
    #     produce, consume=p
    #     produce.close()
    #     while True:
    #         try:
    #             baozi=consume.recv()
    #             print('%s 收到包子:%s' %(name,baozi))
    #         except EOFError:
    #             break
    #
    # def producer(p,seq=10):
    #     produce, consume=p
    #     consume.close()
    #     for i in range(seq):
    #         produce.send(i)
    #
    # if __name__ == '__main__':
    #     produce,consume=Pipe()
    #     for i in range(5):
    #         c=Process(target=consumer,args=((produce,consume),'c1'))
    #         c.start()
    #     for i in range(5):
    #         p = Process(target=producer, args=((produce, consume)))
    #         p.start()
    #     producer((produce,consume))
    #     produce.close()
    #     consume.close()
    View Code

    三  数据共享

    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        with lock:
            d['count']-=1
    
    if __name__ == '__main__':
        lock = Lock()
        m = Manager()
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
    
     # with as 的机制
        # __enter__
        # __exit__
    View Code

    四 数据池

        # 进程不能无限开 会给操作系统调度增加负担
        # 且真正能被同时执行的进程最多也就和CPU个数相同等
        # 进程的开启和销毁都要消耗资源和时间

    import os
    import time
    from multiprocessing import Pool
    def func(i):
        time.sleep(0.1)
        print(os.getpid(),i)
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(20):
            p.apply_async(func,args=(i,))
        p.close()
        p.join()
    View Code
  • 相关阅读:
    SQL Server 附加数据库,报只读文件,无权修改其中某些文件
    NLog.config 配置
    系统架构设计师论文可靠性设计
    二、软件设计原则
    JavaScript 判断数组是否含有重复值
    mysql 添加索引 mysql 如何创建和删除索引
    利用pandas,BytesIO,zipfile打包csv文件,生成压缩文件
    不良人mysql索引
    转mysql数据库允许空值索引问题
    多线程中ThreadPoolExecutor.map()中传递多个参数
  • 原文地址:https://www.cnblogs.com/daien522556/p/9372304.html
Copyright © 2011-2022 走看看