zoukankan      html  css  js  c++  java
  • Day035--Python--管道, Manager, 进程池, 线程切换

    管道

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex设置成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    
    管道介绍
    View Code 了解

     

    from multiprocessing import Process, Pipe
    
    conn1, conn2 = Pipe()
    conn1.send('你好')
    print('>>>>>>>>>>>')
    msg = conn2.recv()
    print(msg)
    from multiprocessing import Process, Pipe
    def func1(conn2):
        msg = conn2.recv()
        print(msg)
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        p = Process(target=func1, args=(conn2,))
        p.start()
        conn1.send('你好啊,我叫赛利亚')

       管道错误模拟: 管道关闭, 异常处理

    from multiprocessing import Process, Pipe
    
    def func(conn2):
        while 1:
            try:
                # 如果管道一端关闭了, 另外一端接收消息时会报错, 要使用异常处理
                msg = conn2.recv()
                print(msg)
            except EOFError:
                print('对方管道已关闭')
                conn2.close()
                break
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        p = Process(target=func, args=(conn2,))
        p.start()
        conn1.send('你好啊')
        conn1.close()
        # conn1.recv()  # OSError: handle is closed

    数据共享

      Manager

    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    
    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
    
    A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
    from multiprocessing import Process, Manager
    
    def func(m_dic):
        m_dic['name'] = '大猪蹄子'    # 修改共享数据
    
    if __name__ == '__main__':
        m = Manager()
        m_dic = m.dict({'name': '大佬'})  # 创建共享数据
        print('原始>>>', m_dic)                # 打印初始共享数据
        p = Process(target=func, args=(m_dic,))
        p.start()
        p.join()
        print('主进程>>>>', m_dic)    # 打印的是修改后的共享数据
    '''
    多进程同时获取数据, 修改后重新赋值, 可能同时拿到100, 减1后都把99赋值回去, 得到的数据不准确,数据不安全
    可以通过加锁解决
    '''
    from multiprocessing import Process, Manager
    
    def func(m_dic):
        m_dic['count'] -= 1
    
    if __name__ == '__main__':
        m = Manager()
        m_dic = m.dict({'count': 100})
        lst = []
        for i in range(50):
            p = Process(target=func, args=(m_dic,))
            p.start()
            lst.append(p)
    
        [p.join() for p in lst]
        print('主进程>>>', m_dic)
    # 加锁, 解决数据错乱问题
    from multiprocessing import Process, Manager, Lock
    
    def func(m_dic, ml):
        # with ml: 下面的缩进内容等同于 ml.acquire()  ml.release() 之间的内容, 作用: 加锁
        with ml:   
            m_dic['count'] -= 1
    
    if __name__ == '__main__':
        m = Manager()
        ml = Lock()
        m_dic = m.dict({'count': 100})
        lst = []
        for i in range(50):
            p = Process(target=func, args=(m_dic, ml))
            p.start()
            lst.append(p)
    
        [p.join() for p in lst]
    
        print(m_dic)

    进程池  

      什么是进程池?进程池的作用. 并行 并发 同步 异步 阻塞 非阻塞 互斥 死锁.

    import time
    from multiprocessing import Process, Pool
    
    def func(n):
        time.sleep(1)
        print(n)
    
    if __name__ == '__main__':
        pool = Pool(4)  # 设置进程数量, 如果不设置, 默认是CPU数量
        pool.map(func, range(100))   # map自带join功能, 异步执行任务, 参数是可迭代对象

       进程池比多进程效率高

    import time
    from multiprocessing import Process, Pool
    
    def func(n):
        for i in range(100):
            n += 1
        print(n)
    
    if __name__ == '__main__':
        pool_start_time = time.time()
        pool = Pool()
        pool.map(func, range(100))
        pool_end_time = time.time()
        pool_dif_time = pool_end_time - pool_start_time
    
        lst = []
        p_s_time = time.time()
        for i in range(100):
            p = Process(target=func, args=(i,))
            p.start()
            lst.append(p)
        [p.join() for p in lst]
        p_e_time = time.time()
        pd_time = p_e_time - p_s_time
    
        print('进程池执行时间>>>', pool_dif_time)
        print('多进程执行时间>>>', pd_time)
    View Code 进程池与多进程运行时间对比

       

    import time
    from multiprocessing import Process, Pool
    
    def func(i):
        time.sleep(0.5)
        print(i**2)
    
    if __name__ == '__main__':
        pool = Pool(4)
        pool_s_time = time.time()
        pool.map(func, range(100))
        pool_e_time = time.time()
        pool_dif_time = pool_e_time - pool_s_time
    
        p_lst = []
        p_s_time = time.time()
        for i in range(100):
            p = Process(target=func, args=(i,))
            p.start()
            p_lst.append(p)
        [p.join() for p in p_lst]
        p_e_time = time.time()
        p_dif_time = p_e_time - p_s_time
    
        print('数据池执行时间:', pool_dif_time)
        print('多进程运行时间:', p_dif_time)
    View Code 这种情况下进程池比多进程 运行慢

      进程池的同步方法: apply

    import time
    from multiprocessing import Process, Pool
    
    def fun(i):
        time.sleep(0.5)
        # print(i)
        return i**2
    
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(10):
            res = p.apply(fun, args=(i,))   # 同步执行的方法, 它会等待任务的返回结果(return)
            print(res)                     # 打印的是fun的返回值(return)

       print('主进程结束') # 子进程都结束后打印

      进程池的异步方法:  apply_async      # [ apply()方法的变体,它返回一个结果对象。]

    import time
    from multiprocessing import Process, Pool
    
    def fun(i):
        time.sleep(0.5)
        return i**2
    
    if __name__ == '__main__':
        p = Pool(4)
        res_lst = []
        for i in range(10):
            res = p.apply_async(fun, args=(i,))    # 异步执行, res是对象multiprocessing.pool.ApplyResult object  主进程代码执行完毕不会等待子进程, 直接关闭主进程.
            res_lst.append(res)
        for i in res_lst:
            print(i.get())

       print('主进程结束') # 如果没有i.get()方法, 则主进程不会等待子进程执行完就会结束

    get([timeout])

    在产生结果时返回该结果。如果超时限制不是空, 而且结果没有在时限内返回, 则抛出多进程超时错误。 如果远程调用报出异常,那么get()方法将再次抛出这个异常。

    # 如果不加close和join, 程序会直接随主进程结束运行,不会等待打印i. 加join后可以感知进程的运行
    
    import time
    from multiprocessing import Process, Pool
    
    def fun(i):
        time.sleep(0.5)
        print(i)
        return i**2
    
    if __name__ == '__main__':
        p = Pool(4)
        res_lst = []
        for i in range(10):
            res = p.apply_async(fun, args=(i,))   
            res_lst.append(res)
            # print(res)      # 异步执行, res是多个对象  <multiprocessing.pool.ApplyResult object at 0x000001B5BBD7C128>
    
        p.close()     # 不是关闭进程池,而是不允许再有其他任务来使用进程池
        p.join()       # 这是感知进程池中任务的方法,等待进程池的任务全部执行完
        for el in res_lst:
            print('结果>>>', el.get())
    
       # time.sleep(4) # 如果把close和join还有for循环都注释掉, 此处等待几秒也可以打印出i
    print('主进程结束')

      

      回调函数 callback

    import os
    from multiprocessing import Process, Pool
    def func1(n):
        print('func1', os.getpid())
        return n*n
    
    def func2(nn):
        print('func2', os.getpid())
        print(nn)
    
    if __name__ == '__main__':
        print('主进程:', os.getpid())
        p = Pool(4)
        p.apply_async(func1, args=(10,), callback=func2)   #把func的返回结果传参给func2, func2 在主进程中运行    如果func1返回多个结果, 那么将以元组的形式传给func2
        p.close()
        p.join()

    线程切换

    #什么是线程:
    #指的是一条流水线的工作过程,关键的一句话:一个进程内最少自带一个线程,其实进程根本不能执行,进程不是执行单位,是资源的单位,分配资源的单位
    #线程才是执行单位
    #进程:做手机屏幕的工作过程,刚才讲的
    #我们的py文件在执行的时候,如果你站在资源单位的角度来看,我们称为一个主进程,如果站在代码执行的角度来看,它叫做主线程,只是一种形象的说法,其实整个代码的执行过程成为线程,也就是干这个活儿的本身称为线程,但是我们后面学习的时候,我们就称为线程去执行某个任务,其实那某个任务的执行过程称为一个线程,一条流水线的执行过程为线程
    
    #进程vs线程
    #1 同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的
    #2 创建线程的开销比创建进程的开销要小的多
    
    
    #并发三个任务:1启动三个进程:因为每个进程中有一个线程,但是我一个进程中开启三个线程就够了
    #同一个程序中的三个任务需要执行,你是用三个进程好 ,还是三个线程好?
    #例子:
        # pycharm 三个任务:键盘输入  屏幕输出  自动保存到硬盘
        #如果三个任务是同步的话,你键盘输入的时候,屏幕看不到
        #咱们的pycharm是不是一边输入你边看啊,就是将串行变为了三个并发的任务
        #解决方案:三个进程或者三个线程,哪个方案可行。如果是三个进程,进程的资源是不是隔离的并且开销大,最致命的就是资源隔离,但是用户输入的数据还要给另外一个进程发送过去,进程之间能直接给数据吗?你是不是copy一份给他或者通信啊,但是数据是同一份,我们有必要搞多个进程吗,线程是不是共享资源的,我们是不是可以使用多线程来搞,你线程1输入的数据,线程2能不能看到,你以后的场景还是应用多线程多,而且起线程我们说是不是很快啊,占用资源也小,还能共享同一个进程的资源,不需要将数据来回的copy!
    View Code什么是线程

       线程的创建方法一:

    # 线程和进程很像, 一个进程中至少有一个线程, 进程是资源层面的, 线程负责实际的操作
    
    import time
    from threading import Thread
    
    def func(n):
        time.sleep(1)   # 子线程运行地太快了, 如果不加time.sleep,会在打印主线程之前跑完
        print(123)
    
    if __name__ == '__main__':
        t = Thread(target=func, args=(1,))
        t.start()
        t.join()      # 等待子线程跑完再执行主线程下面的内容
        print('主线程')

      线程的创建方法二:

    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self, n):
            super().__init__()
            self.n = n
        def run(self):
            print('换汤不换药')
            print('self.n>>>', self.n)
    
    if __name__ == '__main__':
        t = MyThread('你好')
        t.start()
        t.join()
        print('主线程结束')
  • 相关阅读:
    【struts2】【2】添加interceptor出错
    C++本质:类的赋值运算符=的重载,以及深拷贝和浅拷贝
    Garbage Collection Essence.Net中Garbage Collection深入探讨
    数据结构C#实现二叉查找树的创建,查找,以及各种递归(非递归)遍历算法
    C#中不安全代码的编写和指针的应用
    C#中的安全策略
    系统诊断概述如何通过windbg来dump特定process的memory.
    经典数据结构之栈的应用迷宫问题
    CPU Scheduling进程调度算法
    ASP.NET中将检索出的数据写入Exel形成Report的一种solution
  • 原文地址:https://www.cnblogs.com/surasun/p/9848072.html
Copyright © 2011-2022 走看看