zoukankan      html  css  js  c++  java
  • 进程,进程

    一,结束while循环

    from        multiprocessing import Process
    from  multiprocessing import Queue
    import time
    import random
    def aa(q,food):
        for i in range(10):
            q.put('%s%s号'%(food,i))
            print('%s%s号'%(food,i))
            time.sleep(random.random())
    def bb(q,man):
    
        while True:
            print('%s吃了'%(man),q.get())
    if __name__=='__main__':
        q=Queue()
        p1=Process(target=aa,args=(q,'泔水'))
        p1.start()
        p2 = Process(target=aa, args=(q,'馒头'))
        p2.start()
        c1 = Process(target=bb, args=(q,'小高'))
        c1.start()
        c2 = Process(target=bb, args=(q,'小赵'))
        c2.start()
        c3 = Process(target=bb, args=(q,'小陈'))
        c3.start()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/37day/37.py
    泔水0号
    馒头0号
    小高吃了 泔水0号
    小高吃了 馒头0号
    泔水1号
    小高吃了 泔水1号
    馒头1号
    小赵吃了 馒头1号
    馒头2号
    小陈吃了 馒头2号
    泔水2号
    小高吃了 泔水2号
    泔水3号
    小赵吃了 泔水3号
    馒头3号
    小陈吃了 馒头3号
    馒头4号
    小高吃了 馒头4号
    馒头5号
    小赵吃了 馒头5号
    泔水4号
    小陈吃了 泔水4号
    泔水5号
    小高吃了 泔水5号
    馒头6号
    小赵吃了 馒头6号
    泔水6号
    小陈吃了 泔水6号
    小高吃了 馒头7号
    馒头7号
    泔水7号
    小赵吃了 泔水7号
    馒头8号
    小陈吃了 馒头8号
    泔水8号
    小高吃了 泔水8号
    馒头9号
    小赵吃了 馒头9号
    泔水9号
    小陈吃了 泔水9号

    由此可见,当生产者结束了以后,消费者并没有退出循环

    from        multiprocessing import Process
    from  multiprocessing import Queue
    import time
    import random
    def aa(q,food):
        for i in range(10):
            q.put('%s%s号'%(food,i))
            print('%s%s号'%(food,i))
            time.sleep(random.random())
        q.put(None)
        q.put(None)
        q.put(None)
    def bb(q,man):
    
        while True:
            d=q.get()
            if d==None:break
            print('%s吃了'%(man),d)
    if __name__=='__main__':
        q=Queue()
        p1=Process(target=aa,args=(q,'泔水'))
        p1.start()
        p2 = Process(target=aa, args=(q,'馒头'))
        p2.start()
        c1 = Process(target=bb, args=(q,'小高'))
        c1.start()
        c2 = Process(target=bb, args=(q,'小赵'))
        c2.start()
        c3 = Process(target=bb, args=(q,'小陈'))
        c3.start()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/37day/37.py
    泔水0号
    馒头0号
    小高吃了 泔水0号
    小高吃了 馒头0号
    馒头1号
    小高吃了 馒头1号
    泔水1号
    小赵吃了 泔水1号
    泔水2号
    小陈吃了 泔水2号
    馒头2号
    小高吃了 馒头2号
    馒头3号
    小赵吃了 馒头3号
    馒头4号
    小陈吃了 馒头4号
    馒头5号
    小高吃了 馒头5号
    泔水3号
    小赵吃了 泔水3号
    泔水4号
    小陈吃了 泔水4号
    泔水5号
    小高吃了 泔水5号
    泔水6号
    小赵吃了 泔水6号
    泔水7号
    小陈吃了 泔水7号
    馒头6号
    小高吃了 馒头6号
    泔水8号
    小赵吃了 泔水8号
    泔水9号
    小陈吃了 泔水9号
    馒头7号
    小高吃了 馒头7号
    馒头8号
    馒头9号
    
    Process finished with exit code 0

    加个判断就能解决这个问题,可是新的问题又来了,在你不知道有几个消费者,或者消费者过多的情况下,你该如何呢?

    from        multiprocessing import Process
    from  multiprocessing import JoinableQueue
    import time
    import random
    def aa(q,food):
        for i in range(10):
            q.put('%s%s号'%(food,i))
            print('%s%s号'%(food,i))
            time.sleep(random.random())
        q.join()
    def bb(q,man):
    
        while True:
            d=q.get()
            print('%s吃了'%(man),d)
            q.task_done()
    if __name__=='__main__':
        q=JoinableQueue()
        p1=Process(target=aa,args=(q,'泔水'))
        p1.start()
        p2 = Process(target=aa, args=(q,'馒头'))
        p2.start()
        c1 = Process(target=bb, args=(q,'小高'))
        c1.daemon=True
        c1.start()
        c2 = Process(target=bb, args=(q,'小赵'))
        c2.daemon = True
        c2.start()
        c3 = Process(target=bb, args=(q,'小陈'))
        c3.daemon = True
        c3.start()
        p1.join()
        p2.join()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/37day/37.py
    泔水0号
    馒头0号
    小高吃了 泔水0号
    小高吃了 馒头0号
    馒头1号
    小高吃了 馒头1号
    馒头2号
    小赵吃了 馒头2号
    馒头3号
    小高吃了 馒头3号
    泔水1号
    小陈吃了 泔水1号
    馒头4号
    小赵吃了 馒头4号
    泔水2号
    小高吃了 泔水2号
    馒头5号
    小陈吃了 馒头5号
    泔水3号
    小赵吃了 泔水3号
    馒头6号
    小高吃了 馒头6号
    小陈吃了 泔水4号
    泔水4号
    泔水5号
    小赵吃了 泔水5号
    泔水6号
    小高吃了 泔水6号
    馒头7号
    小陈吃了 馒头7号
    泔水7号
    小赵吃了 泔水7号
    馒头8号
    小高吃了 馒头8号
    馒头9号
    小陈吃了 馒头9号
    泔水8号
    小赵吃了 泔水8号
    小高吃了 泔水9号
    泔水9号
    
    Process finished with exit code 0

    这里,引用一个模块,task_done当消费者完成以后,将这个发送给生产者,q.join生产者接收后,结束,由于消费者事先被设置成守护进程,主进程结束,随之结束,

    ipc机制

    二,pipe管道

    from  multiprocessing import Pipe
    p1,p2=Pipe()
    p1.send('hello')
    print(p2.recv())
    p2.send('heo')
    
    print(p1.recv())
    print(p1.recv())
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/37day/37.py
    hello
    heo

    管道支持双向通信,但是,当接收方大于发送方的时候,程序会被阻塞在这里

    三进程池

    map,,拿到返回值,存放在一个列表中

    传参的时候,传的必须是一个可迭代的

    import time
    from multiprocessing import Pool
    def fun(i):
        time.sleep(1)
        i+=1
        return i
    if __name__ == '__main__':
        s = time.time()
        p = Pool(4)
        ret = p.map(fun,range(10))
        print(ret)
        print(time.time() - s)
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/38day/39.py
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    3.408907413482666
    
    Process finished with exit code 0
    from  multiprocessing import Process
    from  multiprocessing import Pool
    import time
    def aa(i):
        i+=1
    if __name__=='__main__':
        p=Pool(5)
        start=time.time()
        p.map(aa,range(19))
        p.close()
        p.join()
        print(time.time()-start)
        l=[]
        start=time.time()
        for i in range(19):
            p=Process(target=aa,args=(i,))
            p.start()
            l.append(p)
        [i.join() for i in l]
        print(time.time() - start)
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/37day/37.py
    0.21370530128479004
    1.504941463470459
    
    Process finished with exit code 0

    通过对比,很明显,用了进程池和不用的差距,

    1,基于进程池下的同步提交

    def aa(i):
        time.sleep(1)
        i+=1
        print(i)
    if __name__=='__main__':
        p=Pool(5)
        for i in range(19):
            p.apply(aa, args=(i,))

    2,基于进程池下的异步提交

    from  multiprocessing import Pool
    import time
    import time
    def aa(i):
        time.sleep(1)
        i+=1
        print(i)
    if __name__=='__main__':
        p=Pool(5)
        l=[]
        for i in range(19):
            p.apply_async(aa, args=(i,))
            
        p.close()
        p.join()

    Manager模块里提供了很多数据类型来进行进程之间的数据共享,但是几乎数据都不安全

    from multiprocessing import Process
    def func(dic):
        dic['count']+=1
        print(dic)
    
    # m=Manager()
    # dic=m.dict()
    dic={}
    dic['count'] =0
    
    if __name__=='__main__':
        for i in range(2):
            p=Process(target=func,args=(dic,))
            p.start()
        p.join()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/38day/39.py
    {'count': 1}
    {'count': 1}
    
    Process finished with exit code 0

    正常情况下进程之间数据是不共享的,所以一直是1

    from multiprocessing import Manager
    from multiprocessing import Process
    def func(dic):
        dic['count']+=1
        print(dic)
    if __name__=='__main__':
        m = Manager()
        dic = m.dict()
        dic['count'] = 0
        for i in range(10):
            p=Process(target=func,args=(dic,))
            p.start()
        p.join()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/38day/39.py
    {'count': 1}
    {'count': 2}
    {'count': 3}
    {'count': 4}
    {'count': 5}
    {'count': 6}
    {'count': 7}
    {'count': 8}
    {'count': 9}
    {'count': 10}
    
    Process finished with exit code 0

    Manager的作用就是是进程之间的数据可以共享

    回调函数:

    回调函数是在主进程中完成的,不能传参,只能接受多进程中的返回值

    import os
    import time
    from multiprocessing import Pool
    def func(i):
        print('子进程%s:%s'%(i,os.getpid()))
        return i*'*'
    
    def call(arg):
        print('回调 :',os.getpid())
        print(arg)
    if __name__ == '__main__':
        print('---->',os.getpid())
        p = Pool(5)
        for i in range(10):
            p.apply_async(func,args=(i,),callback=call)
        p.close()
        p.join()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/38day/39.py
    ----> 11904
    子进程0:7060
    回调 : 11904
    
    子进程1:9504
    子进程2:7060
    回调 : 11904
    **
    子进程3:7060
    子进程4:9504
    回调 : 11904
    *
    子进程5:7060
    回调 : 11904
    ***
    回调 : 11904
    ****
    子进程6:9504
    回调 : 11904
    *****
    回调 : 11904
    ******
    子进程7:7060
    回调 : 11904
    *******
    子进程8:9504
    回调 : 11904
    ********
    子进程9:7060
    回调 : 11904
    *********
    
    Process finished with exit code 0

    关于回调函数的一个实例:

    用到requests模块

    import requests
    from multiprocessing import Pool
    def get_url(url):
        ret = requests.get(url)
        return {'url':url,
                'status_code':ret.status_code,
                'content':ret.text}
    
    def parser(dic):
        print(dic['url'],dic['status_code'],len(dic['content']))
        # 把分析结果写到文件里
    if __name__ == '__main__':
        url_l = [
            'http://www.baidu.com',
            'http://www.sogou.com',
            'http://www.hao123.com',
            'http://www.yangxiaoer.cc',
            'http://www.python.org'
        ]
        p = Pool(4)
        for url in url_l:
            p.apply_async(get_url,args=(url,),callback=parser)
        p.close()
        p.join()
    C:UsershcAppDataLocalProgramsPythonPython36python3.exe C:/s9/38day/39.py
    http://www.baidu.com 200 2381
    http://www.sogou.com 200 22431
    http://www.hao123.com 200 498845
    http://www.yangxiaoer.cc 200 35667
    http://www.python.org 200 48871
    
    Process finished with exit code 0
  • 相关阅读:
    Middleware
    Languages
    Errors
    Config
    CLI Console
    Linux远程复制文件
    CentOS下安装Gitlab
    Maven_POM配置结构
    Maven_POM配置详解
    MySQL索引背后的数据结构及算法原理
  • 原文地址:https://www.cnblogs.com/xuguangzong/p/8418912.html
Copyright © 2011-2022 走看看