zoukankan      html  css  js  c++  java
  • 三十一、管道,进程数据共享,进程池,进程池的返回值,回调函数

    一、管道

    管道:进程与进程之间能相互通信
    通信原理:是基于管道双向通信
    from multiprocessing import Pipe, Process
    conn1, conn2 = Pipe()
    conn1.send("123456")
    print(conn2.recv())
    from multiprocessing import Pipe, Process
    def func(conn):
        while True:
            msg = conn.recv()
            if msg == None:
                break
            print(msg)
    
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        Process(target=func, args=(conn1,)).start()
        for i in range(10):
            conn2.send("吃了么")
        conn2.send(None)
    
    
    
    """
    #第二种:
    def func(conn1, conn2):
        conn2.close()
        while True:
            try:
                msg = conn1.recv()
                print(msg)
            except EOFError :
                conn1.close()
                break
    
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        Process(target=func, args=(conn1,conn2)).start()
        conn1.close()
        for i in range(10):
            conn2.send("吃了么")
        conn2.close()
        # 疑问:主进程关闭通道不会影响子进程接收数据
        #
        # ********应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭
        这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中
        的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常
        因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
        # """
    例子
    from multiprocessing import Pipe, Process, Lock
    import random, time
    
    
    def producer(con, pro, name, food):
        con.close()
        for i in range(4):
            time.sleep(random.randint(1, 3))
            f = "%s生产%s%s" % (name, food, i)
            print(f)
            pro.send(f)
        pro.close()
    
    
    def consumer(con, pro, name):
        pro.close()
        while True:
            try:
                # lock.acquire()
                food = con.recv()
                # lock.release()
                print("%s吃了%s" % (name, food))
                time.sleep(random.randint(1, 3))
            except EOFError:
                print("%s吃完啦" % name)
                con.close()
                break
    
    
    if __name__ == '__main__':
        con, pro = Pipe()
        # lock = Lock()
        p = Process(target=producer, args=(con, pro, "猪狗", "泔水"))
        p.start()
        c = Process(target=consumer, args=(con, pro, "逗比"))
        c1 = Process(target=consumer, args=(con, pro, "坦克" ))
        c.start()
        c1.start()
        con.close()
        pro.close()
    
    # Pipe 管道数据不安全(可能发生多进程抢一个数据)枷锁来控制抢资源对象   管道属于最底层的东西
    # 队列是 管道+锁 所以比较安全,一般用队列
    """
        应该特别注意管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将它关闭
        这也说明为何在生产中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中
        的recv()操作阻塞。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常
        因此,在生产中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点
    """
    管道消费者模型

    二、进程之间的数据共享

    模块:Manager

    from multiprocessing import Manager, Process, Lock
    def main(dic, lock): lock.acquire() dic["count"] -= 1 lock.release() print(dic) if __name__ == '__main__': lock = Lock() # 牺牲效率变成串行(同步),但是安全 m = Manager() # 可以把数据共用多个进程共享,但是会发生多个子进程抢同一个资源,造成数据混乱,使用必须加锁 dic = m.dict({"count": 100}) p_lis = [] for i in range(50): p = Process(target=main, args=(dic, lock)) p_lis.append(p) # 把对象添加到列表 p.start() for i in p_lis: i.join() # 把每个对象添加join,所有子进程结束,主进程才运行 print("主进程:", dic)

    三、进程池

    效率:
    每次开启进程,开启属于这个进程的内存空间
    寄存器 堆栈 文件
    进程过多 操作系统的调度

    进程池:
    python中 先创建一个属于进程的池子
    这个池子指定能存放n个进程
    先将这些进程创建好,如果有n个任务,就让n个进程去执行,这样减少cpu的占用率,提高效率(防止一起去执行,占用资源过大)
    1.Pool模块进程池
    from multiprocessing import Pool, Process
    import time
    
    
    def fun(n):
        for i in range(10):
            print(n + 1)
    
    
    if __name__ == '__main__':
        start1 = time.time()
        pool = Pool(5)  # 5个进程
        pool.map(fun, range(100))  # 100个任务 自带join 和close   map参数必须是可迭代
        t1 = time.time() - start1
        start2 = time.time()
        p_lis = []
        for i in range(100):
            p = Process(target=fun, args=(i,))
            p_lis.append(p)
            p.start()
        for i in p_lis: i.join()
        t2 = time.time() - start2
        print(t1, t2)  # 0.1535499095916748 2.3756473064422607
    
    # 进程池:是同时并行执行5个程序,差不多执行20次  多进程:是100个子进程分别(异步)执行一次,100次
    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池内任务都处理完,然后可以用get收集结果
    # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了"""
    from multiprocessing import Pool
    import time, os
    
    
    def func(n):
        print("start func %s" % n, os.getpid())
        time.sleep(1)
        print("end func%s" % n, os.getpid())
    
    
    if __name__ == '__main__':
        pool = Pool(5)
        # pool.map(func,)
        for i in range(20):
            pool.apply_async(func, args=(i,))  # apply同步提交任务  apply_async异步提交任务
        time.sleep(5)
        # pool.close()  # 结束进程池接收任务    关闭进程池,防止进一步操作。如果所有操作持续阻塞,它们将在工作进程终止前完成
        # pool.join()  # 感知进程池中的任务执行结束
    # 5个进程交替执行任务
    # apply_async()需要先close 后join 来保持子进程和主进程代码的同步性

    2.第二种方法:from concurrent.futures import ProcessPoolExecutor

    from concurrent.futures import ProcessPoolExecutor
    import time
    import os
    
    
    def task(n):
        print(n, os.getppid())
        time.sleep(2)
        return n * 2
    
    
    def call_back(n):
        print("拿到异步提交任务返回结果:", n.result())
    
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(5)
        t_lis = []
        for i in range(10):
            res = pool.submit(task, i).add_done_callback(call_back)  #异步多进程
            t_lis.append(res)
        pool.shutdown()  # 相当于p.join和p.close()  等待子进程结束
        print("主进程:", os.getpid())

    三、进程池的返回值

    第一种:

    from multiprocessing import Pool
    import time
    
    
    def func(i):
        time.sleep(1)
        return i * i
    
    
    if __name__ == '__main__':
        p = Pool(5)
        # res = p.map(func, range(10))   # 自带close和join 进程结束后,一次性打印
        # print(res)
        res_lis = []
        for i in range(10):
            res = p.apply_async(func, args=(i,))
            res_lis.append(res)
            # print(res.get())  # get()阻塞等待结果
        p.close()
        p.join()
          for i in res_lis:
        print(i.get())  # 拿到返回值
     

    第二种:

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def func(i):
        time.sleep(1)
        return i ** 2
    
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(5)
        res_list = []
        for i in range(10):
            res = pool.submit(func, i)
            res_list.append(res)
        pool.shutdown()  # 相当于close()和join()
        for i in res_list:
            print(i.result())  # 不同于上面是get()换成result()

    *****模块不一样方式不同

    四、回调函数

    回调函数会在主函数中进行

    第一种:

    from concurrent.futures import ProcessPoolExecutor
    import os
    
    
    def func1(n):
        print("in func1 ", os.getpid())
        return n * n
    
    
    def func2(nn):
        print("in func2 ", os.getpid())
        print(nn.result())
    
    if __name__ == '__main__':
        pool =ProcessPoolExecutor(5)
        pool.submit(func1,10).add_done_callback(func2)
        pool.shutdown()  # 关机
        print("主进程:",os.getpid())

    第二种:

    from multiprocessing import Pool
    import os
    
    
    def func1(n):
        print("in func1", os.getpid())
        return n * n
    
    
    def func2(nn):
        print("in func2", os.getpid())
        print(nn)
    
    
    if __name__ == '__main__':
        p = Pool(5)
        # for i in range(10):
        p.apply_async(func1, args=(10,), callback=func2)
        p.close()
        p.join()
        print("主进程:", os.getpid())  # func2进程数和主进程一样,证明回调

    五、进程池socket 通信高并发

    #服务端
    from multiprocessing import Pool
    import os
    
    
    def func1(n):
        print("in func1", os.getpid())
        return n * n
    
    
    def func2(nn):
        print("in func2", os.getpid())
        print(nn)
    
    
    if __name__ == '__main__':
        p = Pool(5)
        # for i in range(10):
        p.apply_async(func1, args=(10,), callback=func2)
        p.close()
        p.join()
        print("主进程:", os.getpid())  # func2进程数和主进程一样,证明回调
    #客户端
    import socket
    
    client = socket.socket()
    client.connect(("127.0.0.1", 8080))
    
    ret = client.recv(1024).decode("utf8")
    print(ret)
    msg = input(">>>>>>>:").encode("utf8")
    client.send(msg)
    
    client.close()

    六、利用多线程,爬虫例子

    """ 装模块方法:cmd中: pip3 install 模块名"""
    import requests
    from multiprocessing import Pool
    
    
    def get(url):
        response = requests.get(url)
        if response.status_code == 200:
            return url, response.content.decode("utf8")
    
    
    def call_back(args):
        url, content = args
        print(url, len(content))
    
    
    if __name__ == '__main__':
        url_lst = ["http://www.baidu.com/",
                   "https://www.cnblogs.com",
                   "https://www.sogou.com/"]
        p = Pool(5)
        for url in url_lst:
            p.apply_async(get, args=(url,),callback=call_back)
        p.close()
        p.join()
    爬虫
     
  • 相关阅读:
    Kendo UI开发教程(8): Kendo UI 特效概述
    6.3 计算字符在字符串中出现的次数
    有意思的GacUI
    Qt信号量QSemaphore(在线程里使用,结合生产者消费者的问题)
    Qt 自定义事件详细实例(继承QEvent,然后QCoreApplication::postEvent()、sendEvent())
    为什么选择使用 Dropbox 而不是其他品牌同步工具(不要加上多余的功能,要极致和专注)
    DropBox与Box的区别,包括直接的投资人的评价(本地Sync可能还是挺重要的)
    ASP.NET所谓前台调用后台、后台调用前台想到HTTP——实践篇
    8个免费实用的C++GUI库
    .NET 利用反射将对象数据添加到数据库
  • 原文地址:https://www.cnblogs.com/wukai66/p/11358991.html
Copyright © 2011-2022 走看看