zoukankan      html  css  js  c++  java
  • 第三十九天 数据通信和进程池

    1.管道:Pipe,管道是双向通信的,左边传入的数据,输送到管道内的右边,输出时从右边输出,从左边接受到的数据传输到右边从从左边输出

      1.1主进程和子进程之间的通讯:

    from multiprocessing import Pipe,Process
    def func(conn1):
        pass
        conn1.send('helloword')
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=func,args=(conn1,))
       p.start()
       print(conn2.recv())#当管道内没有数据的时候程序会进入堵塞状态
    结果为
    C:pycharmpython.exe D:/python练习程序/第三十九天/pipe.py
    helloword
    View Code

      1.2子进程与子进程之间的通讯:

    from multiprocessing import Pipe,Process
    def func(conn1):
        conn1.send('helloword')
    def func2(conn2):
        msg=conn2.recv()
        print(msg)
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=func,args=(conn1,))
       p.start()
       p1=Process(target=func2,args=(conn2,))
       p1.start()
    结果为
    helloword
    View Code

        1.3看下面程序有什么现象,并且如何进行改进:

    from multiprocessing import Pipe,Process
    def func(conn1):
       while True:
           msg=conn1.recv()
           print(msg)
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=func,args=(conn1,))
       p.start()
       for i in range(20):
           conn2.send('是否结束')
    View Code

    结果为

     通过结果我们可以发现程序一直在堵塞阶段无法进行结束

      1.4改进措施:

    from multiprocessing import Pipe,Process
    def func(conn1):
       while True:
           msg=conn1.recv()
           if msg is None:break
           print(msg)
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=func,args=(conn1,))
       p.start()
       for i in range(20):
           conn2.send('是否结束')
       conn2.send(None) #当数据传输完成之后在发送一个数据判断位
    View Code

      改进措施二:

    from multiprocessing import Pipe,Process
    def func(conn1,conn2):
       while True:
           conn2.close()
           try:
                msg=conn1.recv()
                print(msg)
           except EOFError:
               conn1.close()
               break
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=func,args=(conn1,conn2))
       p.start()
       conn1.close()
       for i in range(20):
           conn2.send('是否结束')
       conn2.close()
    View Code

      1.5总结:

    管道端点的正确管理问题,如果生产者或者消费者中没有使用管道中的某个端点,就应该将其关闭,这也就是说为什么在生产过程中关闭了输出端管道,在消费

    者中关闭了输入管道。如果忘记执行这些步骤,程序可能在消费者的recv中启动进程。管道是由操作系统进行计数的,必须在所有进程中关闭管道后才会生成Eoferror异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

      1.6用管道做生产包子的程序:

    from multiprocessing import Pipe,Process
    import time
    import random
    def producer(conn1,conn2,food,name):
        conn2.close()
        for i in range(15):
            time.sleep(random.randint(1,3))
            f='%s生产了第%s%s'%(name,i,food)
            conn1.send(f)
            print(f)
        conn1.close()
    def customer(conn1,conn2,name):
        conn1.close()
        while True:
            try:
                msg=conn2.recv()
                msg='%s吃了%s'%(name,msg)
                print(msg)
            except EOFError:#进行数据读取时没有数据会抛出异常
                conn2.close()
                break
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       p=Process(target=producer,args=(conn1,conn2,'包子','alex'))
       p.start()
       p1=Process(target=customer,args=(conn1,conn2,'wusir'))
       p1.start()
       conn1.close()
       conn2.close()
    View Code

      在多个进程进行数据读取时,有可能会出现多个进程抢一个数据的情况。这个时候我们需要进行加锁(同步进行)来保证数据的安全性:

    from multiprocessing import Pipe,Process,Lock
    import time
    import random
    def producer(conn1,conn2,food,name):
        conn2.close()
        for i in range(15):
            time.sleep(random.randint(1,3))
            f='%s生产了第%s%s'%(name,i,food)
            conn1.send(f)
            print(f)
        conn1.close()
    def customer(conn1,conn2,name,lock):
        conn1.close()
        while True:
            try:
                lock.acquire()
                msg=conn2.recv()
                msg='%s吃了%s'%(name,msg)
                print(msg)
                lock.release()
            except EOFError:#进行数据读取时没有数据会抛出异常
                conn2.close()
                break
    if __name__=='__main__':
       conn1,conn2= Pipe()  #创建一个管道
       l=Lock()
       p=Process(target=producer,args=(conn1,conn2,'包子','alex'))
       p.start()
       p1=Process(target=producer,args=(conn1,conn2,'干会','suyi'))
       p1.start()
       p2=Process(target=customer,args=(conn1,conn2,'wusir',l))
       p2.start()
       p3=Process(target=customer,args=(conn1,conn2,'sires',l))
       p3.start()
       conn1.close()
       conn2.close()
    View Code

    2进程之间的数据共享:Manager

    from multiprocessing import Process,Manager
    def func(dic):
        dic['num']-=1
    if __name__=='__main__':
        m=Manager()
        dic=m.dict({'num':100})
        p_list=[]
        for i in range(50):
            p=Process(target=func,args=(dic,))
            p.start()
            p_list.append(p)
        for i in p_list:i.join()
        print(dic)
    结果为
    {'num': 51}
    View Code

    从结果来看数据是不正确的,说明进程在进行的过程当中出现了数据争抢的问题,需要使用lock来进行解决此问题

    from multiprocessing import Process,Manager,Lock
    def func(dic,lock):
        lock.acquire()
        dic['num']-=1
        lock.release()
    if __name__=='__main__':
        m=Manager()
        dic=m.dict({'num':100})
        l=Lock()
        p_list=[]
        for i in range(50):
            p=Process(target=func,args=(dic,l))
            p.start()
            p_list.append(p)
        for i in p_list:i.join()
        print(dic)
    结果为
    {'num': 50}
    View Code

      加了锁以后就使多个异步的进程变成了同步进程,虽然效率降低了但是数据安全了。

    3.总结:在进程之间进行通讯,为了数据的安全和速率一般我们使用Queue(队列)

    4.进程池:

      python中先创建一个属于进程的池子(pool)

      ·这个池子指定放入多少个进程(一般是cpu的个数或者cpu个数加一)

      执行过程一般是:创建5个进程放入到进程池里,然后再创建100个任务,这100个任务先去进程池中拿出5个进程去执行任务,当任务执行结束以后并不是结束任务,而是把进程放到进程池中,下面任务再去调用使用。虽然 操作系统依然要调度,但是总的操作时间会减少。(如果开始多个进程,创建需要消耗·时间,销毁也需要浪费时间,第二开始成千上万个进程,操作系统也不是让他们同时执行,而是采用时间片轮转的形式进行,这样反而会影响效率。不如使用进程池效率高。进程池在一定程度上可以实现并发的效果。

     1. 进程的第一例子map

    from multiprocessing import Pool,Process
    def func(n):
        for i in range(10):
            print(n+1,end='	')
    if __name__=='__main__':
        pool=Pool(5)#当进程数大于3个以上就不要使用呢process使用进程池
        pool.map(func,range(100))  #第一输入函数,第二个填写可迭代对象
    结果为
    C:pycharmpython.exe D:/python练习程序/第三十九天/pipe.py
    1    1    1    1    1    1    1    1    1    1    2    2    2    2    2    2    2    2    2    2    3    3    3    3    3    3    3    3    3    3    4    4    4    4    4    4    4    4    4    4    5    5    5    5    5    5    5    5    5    5    6    6    6    6    6    6    6    6    6    6    7    7    7    7    7    7    7    7    7    7    8    8    8    8    8    8    8    8    8    8    9    9    9    9    9    9    9    9    9    9    10    10    10    10    10    10    10    10    10    10    11    11    11    11    11    11    11    11    11    11    12    12    12    12    12    12    12    12    12    12    13    13    13    13    13    13    13    13    13    13    14    14    14    14    14    14    14    14    14    14    15    15    15    15    15    15    15    15    15    15    16    16    16    16    16    16    16    16    16    16    17    17    17    17    17    17    17    17    17    17    18    18    18    18    18    18    18    18    18    18    19    19    19    19    19    19    19    19    19    19    20    20    20    20    20    20    20    20    20    20    21    21    21    21    21    21    21    21    21    21    22    22    22    22    22    22    22    22    22    22    23    23    23    23    23    23    23    23    23    23    24    24    24    24    24    24    24    24    24    24    25    25    25    25    25    25    25    25    25    25    26    26    26    26    26    26    26    26    26    26    27    27    27    27    27    27    27    27    27    27    28    28    28    28    28    28    28    28    28    28    29    29    29    29    29    29    29    29    29    29    30    30    30    30    30    30    30    30    30    30    31    31    31    31    31    31    31    31    31    31    32    32    32    32    32    32    32    32    32    32    33    33    33    33    33    33    33    33    33    33    34    34    34    34    34    34    34    34    34    34    35    35    35    35    35    35    35    35    35    35    36    36    36    36    36    36    36    36    36    36    37    37    37    37    37    37    37    37    37    37    38    38    38    38    38    38    38    38    38    38    39    39    39    39    39    39    39    39    39    39    40    40    40    40    40    40    40    40    40    40    41    41    41    41    41    41    41    41    41    41    42    42    42    42    42    42    42    42    42    42    43    43    43    43    43    43    43    43    43    43    44    44    44    44    44    44    44    44    44    44    45    45    45    45    45    45    45    45    45    45    46    46    46    46    46    46    46    46    46    46    47    47    47    47    47    47    47    47    47    47    48    48    48    48    48    48    48    48    48    48    49    49    49    49    49    49    49    49    49    49    50    50    50    50    50    50    50    50    50    50    51    51    51    51    51    51    51    51    51    51    52    52    52    52    52    52    52    52    52    52    53    53    53    53    53    53    53    53    53    53    54    54    54    54    54    54    54    54    54    54    55    55    55    55    55    55    55    55    55    55    56    56    56    56    56    56    56    56    56    56    57    57    57    57    57    57    57    57    57    57    58    58    58    58    58    58    58    58    58    58    59    59    59    59    59    59    59    59    59    59    60    60    60    60    60    60    60    60    60    60    61    61    61    61    61    61    61    61    61    61    62    62    62    62    62    62    62    62    62    62    63    63    63    63    63    63    63    63    63    63    64    64    64    64    64    64    64    64    64    64    65    65    65    65    65    65    65    65    65    65    66    66    66    66    66    66    66    66    66    66    67    67    67    67    67    67    67    67    67    67    68    68    68    68    68    68    68    68    68    68    69    69    69    69    69    69    69    69    69    69    70    70    70    70    70    70    70    70    70    70    71    71    71    71    71    71    71    71    71    71    72    72    72    72    72    72    72    72    72    72    73    73    73    73    73    73    73    73    73    73    74    74    74    74    74    74    74    74    74    74    75    75    75    75    75    75    75    75    75    75    76    76    76    76    76    76    76    76    76    76    77    77    77    77    77    77    77    77    77    77    78    78    78    78    78    78    78    78    78    78    79    79    79    79    79    79    79    79    79    79    80    80    80    80    80    80    80    80    80    80    81    81    81    81    81    81    81    81    81    81    82    82    82    82    82    82    82    82    82    82    83    83    83    83    83    83    83    83    83    83    84    84    84    84    84    84    84    84    84    84    85    85    85    85    85    85    85    85    85    85    86    86    86    86    86    86    86    86    86    86    87    87    87    87    87    87    87    87    87    87    88    88    88    88    88    88    88    88    88    88    89    89    89    89    89    89    89    89    89    89    90    90    90    90    90    90    90    90    90    90    91    91    91    91    91    91    91    91    91    91    92    92    92    92    92    92    92    92    92    92    93    93    93    93    93    93    93    93    93    93    94    94    94    94    94    94    94    94    94    94    95    95    95    95    95    95    95    95    95    95    96    96    96    96    96    96    96    96    96    96    97    97    97    97    97    97    97    97    97    97    98    98    98    98    98    98    98    98    98    98    99    99    99    99    99    99    99    99    99    99    100    100    100    100    100    100    100    100    100    100    
    Process finished with exit code 0
    View Code

      2.比较多个开启100个进程和5个进程池执行100个任务看谁的时间短:

    from multiprocessing import Pool,Process
    import time
    def func(n):
        for i in range(10):
            n+1
    if __name__=='__main__':
        pool=Pool(5)#当进程数大于3个以上就不要使用呢process使用进程池
        start2=time.time()
        pool.map(func,range(100))  #第一输入函数,第二个填写可迭代对象
        t1=time.time()-start2
        start1=time.time()
        p_list=[]
        for i in range(100):
            p=Process(target=func,args=(i,))
            p.start()
            p_list.append(p)
        for i in p_list: i.join()
        t2=time.time()-start1
        print(t1,t2)
    结果为
    0.15658092498779297 2.4231150150299072
    View Code

    从结果我们可以看的出进程池所消耗的时间要比同时开启n个进程消耗的时间少。

      3.检验多个任务执行时,工作机制:使用同步进程

    from multiprocessing import Pool,Process
    import time
    import os
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    if __name__=='__main__':
        p=Pool(5)#当进程数大于3个以上就不要使用呢process使用进程池
        for i in range(20):
            p.apply(func,args=(i,))  #第一输入函数,第二个填写可迭代对象
    结果为
    start func0 11404
    end func0 11404
    start func1 19700
    end func1 19700
    start func2 17784
    end func2 17784
    start func3 19008
    end func3 19008
    start func4 10492
    end func4 10492
    start func5 11404
    end func5 11404
    start func6 19700
    end func6 19700
    start func7 17784
    end func7 17784
    start func8 19008
    end func8 19008
    start func9 10492
    end func9 10492
    start func10 11404
    end func10 11404
    start func11 19700
    end func11 19700
    start func12 17784
    end func12 17784
    start func13 19008
    end func13 19008
    start func14 10492
    end func14 10492
    start func15 11404
    end func15 11404
    start func16 19700
    end func16 19700
    start func17 17784
    end func17 17784
    start func18 19008
    end func18 19008
    start func19 10492
    end func19 10492
    View Code

      4.使用异步进程池:

    from multiprocessing import Pool,Process
    import time
    import os
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    if __name__=='__main__':
        p=Pool(5)#当进程数大于3个以上就不要使用呢process使用进程池
        for i in range(20):
            p.apply_async(func,args=(i,))  #第一输入函数,第二个填写可迭代对象
    View Code

    结果为

     从结果可以看出主进程和子进程也是异步执行,互不影响,主进程执行结束,子进程执行结束

    解决办法:使用close()

    from multiprocessing import Pool,Process
    import time
    import os
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    if __name__=='__main__':
        p=Pool(5)#当进程数大于3个以上就不要使用呢process使用进程池
        for i in range(20):
            p.apply_async(func,args=(i,))  #第一输入函数,第二个填写可迭代对象
        p.close()  #当进程池把所有任务结束以后子进程结束
        p.join()#将子进程与主进程改为同步
    View Code

    使用进程池来写tcp中的服务:

    server端程序:

    import socket
    from multiprocessing import Pool
    def func(conn):
        conn.send('你好'.encode('utf-8'))
        ret= conn.recv(1024).decode('utf-8')
        print(ret)
        conn.close()
    if __name__=='__main__':
        sk=socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        p=Pool(5)#默认位cpu的个数
        while True:
                conn,addr=sk.accept()
                p.apply_async(func,args=(conn,))
        p.close()
        p.join()
        sk.close()
    View Code

    client端程序:

    import socket
    sk=socket.socket()
    sk.connect(('127.0.0.1',8080))
    ret=sk.recv(1024).decode('utf-8')
    print(ret)
    info=input('》》》').encode('utf-8')
    sk.send(info)
    View Code

     总结:使用p.apply_async 异步调用和主进程是完全异步的需要手动进行close和join

    5.进程池里返回值的问题:

      1·.map函数的返回值:

    from multiprocessing import Pool
    import time
    def func(n):
        time.sleep(1)    
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        ret=p.map(func,range(10))
        print(ret)#返回值回以列表 ,等所有结果全部计算结束以后再出结果
    结果为
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    View Code

      2.apply的返回值:

    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        for i in range(10):
            ret=p.apply(func,args=(i,))
            print(ret)#同步进行没执行完一次结果打印输出一个值
    结果位
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    View Code

      apply_async的返回值:

    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        for i in range(10):
            ret=p.apply_async(func,args=(i,))
            print(ret)#同步进行没执行完一次结果打印输出一个值
    结果为
    <multiprocessing.pool.ApplyResult object at 0x000001A1AADF5E10>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AADF5EF0>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AADF5F98>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE130B8>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE13160>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE13208>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE132B0>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE13390>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE13470>
    <multiprocessing.pool.ApplyResult object at 0x000001A1AAE13518>
    View Code

    发现结果放入池子中要使用get来进行取出

    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        start=time.time()
        for i in range(10):
            ret=p.apply_async(func,args=(i,))
            print(ret.get())#进入堵塞状态 ,程序进入同步状态
        t=time.time()-start
        print(t)
    结果为
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    10.11383843421936
    View Code
    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        start=time.time()
        for i in range(10):
            ret=p.apply_async(func,args=(i,))
        print(ret.get())#只是在最后一次进入堵塞 ,程序进入同步状态
        t=time.time()-start
        print(t)
    结果为
    81
    2.1083409786224365
    View Code
    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p=Pool()#默认起cpu的个数
        start=time.time()
        for i in range(10):
            ret=p.apply_async(func,args=(i,))
        t=time.time()-start
        print(t)
    结果为
    C:pycharmpython.exe D:/python练习程序/第三十九天/demo1/client.py
    0.000995635986328125
    View Code

    结果办法:使用join()

    from multiprocessing import  Pool
    import time
    def func(n):
        time.sleep(1)
        return n*n
    if __name__=='__main__':
        p_list=[]
        p=Pool()#默认起cpu的个数
        start=time.time()
        for i in range(10):
            ret=p.apply_async(func,args=(i,))
            p_list.append(ret)  #把异步执行的结果放到列表中,然后在进行读取
        for i in p_list:
            print(i.get())
        t=time.time()-start
        print(t)
    结果为
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    2.1015079021453857
    View Code

    回调函数,就是把上一次函数执行返回的值,作为下一次函数执行的传入值(常用于爬虫,爬虫时间主要浪费再爬取网页和延时上,我么可以把爬取来的数据通过回调函数来进行写)

    from multiprocessing import  Pool
    import time
    import os
    def func(n):
        time.sleep(1)
        print('子进程pid',os.getpid())
        return n*n
    def func2(m):
        print('infunc2')
        print(m)
        print('调用函数进程pid',os.getpid())
    if __name__=='__main__':
        print('主进程pid',os.getpid())
        p=Pool()#默认起cpu的个数
        for i in range(10):
            ret=p.apply_async(func,args=(i,),callback=func2)
        p.close()
        p.join()
    
    
    结果为
    C:pycharmpython.exe D:/python练习程序/第三十九天/demo1/client.py
    主进程pid 11280
    子进程pid 1648
    infunc2
    0
    调用函数进程pid 11280
    子进程pid 6972
    infunc2
    1
    调用函数进程pid 11280
    子进程pid 15168
    子进程pid 12876
    子进程pid 14224
    infunc2
    9
    调用函数进程pid 11280
    infunc2
    4
    调用函数进程pid 11280
    infunc2
    16
    调用函数进程pid 11280
    子进程pid 10124
    infunc2
    25
    调用函数进程pid 11280
    子进程pid 14644
    infunc2
    View Code

    从返回结果来看,回调函数是在主函数里进行的

  • 相关阅读:
    poj 3278 catch that cow
    POJ 1028 Web Navigation
    poj 2643 election
    hdu 1908 double queues
    hdu_2669 Romantic(扩展欧几里得)
    0/1背包 dp学习~6
    校验码
    最长上升子序列(LIS经典变型) dp学习~5
    LCS最长公共子序列~dp学习~4
    最长上升子序列(LIS) dp学习~3
  • 原文地址:https://www.cnblogs.com/ab461087603/p/12492206.html
Copyright © 2011-2022 走看看