zoukankan      html  css  js  c++  java
  • Python Day34 python并发编程之多进程

    一 multiprocessing模块介绍:

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
        multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

      multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

        需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内

    二 Process类的介绍:

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    ~~~~参数介绍:
        1.group参数未使用,值始终为None
        2.target表示调用对象,即子进程要执行的任务
        3.args表示调用对象的位置参数元祖,args=(1,2,‘egon’,‘age’:18)
        4.kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}
        5.name为子进程的名称
    ~~~~方法介绍:
        1.p.start():
    启动进程,并调用该子进程中的p.run()
        2.p.run():进程启动时运行的方法,正是他去调用target指定的函数,一定要实现该方法我们自定义类的类中一定要实现该方法
        3.p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸程序,使用该方法需要特别小心这种情况。如果p还保存了一个锁n那么也将不会被释放,进而导致死锁
        4.p.is_alive():如果p仍然运行,返回Ture
        5.p.join([timeout]):主线程等待p终止。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能jion住run开启的进程
    
    
    ~~~~属性介绍:

          1.p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

          2.p.name:进程的名称

          3.p.pid:进程的pid

          4.p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

          5.p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

    三 Process类的使用:

    注意:在windows中Process()必须放到# if __name__ == '__main__':下

    由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
    如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
    这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
    1.创建并开启子进程的两种方式
    #开进程的方法一:
    import time
    import random
    from multiprocessing import Process
    def piao(name):
        print('%s piaoing' %name)
        time.sleep(random.randrange(1,5))
        print('%s piao end' %name)
    
    
    
    p1=Process(target=piao,args=('egon',)) #必须加,号
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))
    
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主线程')
    
    方法一
    #开进程的方法二:
    import time
    import random
    from multiprocessing import Process
    
    
    class Piao(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('%s piaoing' %self.name)
    
            time.sleep(random.randrange(1,5))
            print('%s piao end' %self.name)
    
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')
    
    p1.start() #start会自动调用run
    p2.start()
    p3.start()
    p4.start()
    print('主线程')
    
    方法二

    2.Process对象的join方法:

    from multiprocessing import Process
    import time
    import random
    
    class Piao(Process):
        def __init__(self,name):
            self.name=name
            super().__init__()
        def run(self):
            print('%s is piaoing' %self.name)
            time.sleep(random.randrange(1,3))
            print('%s is piao end' %self.name)
    
    
    p=Piao('egon')
    p.start()
    p.join(0.0001) #等待p停止,等0.0001秒就不再等了
    print('开始')
    
    join:主进程等,等待子进程结束
    from multiprocessing import Process
    import time
    import random
    def piao(name):
        print('%s is piaoing' %name)
        time.sleep(random.randint(1,3))
        print('%s is piao end' %name)
    
    p1=Process(target=piao,args=('egon',))
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('yuanhao',))
    p4=Process(target=piao,args=('wupeiqi',))
    
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    
    #有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
    #当然不是了,必须明确:p.join()是让谁等?
    #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
    
    #详细解析如下:
    #进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
    #而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
    #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
    # 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    
    print('主线程')
    
    
    #上述启动进程与join进程可以简写为
    # p_l=[p1,p2,p3,p4]
    # 
    # for p in p_l:
    #     p.start()
    # 
    # for p in p_l:
    #     p.join()
    
    有了join,程序不就是串行了吗???

    四 守护进程:

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    from multiprocessing import Process
    import time
    import random
    
    class Piao(Process):
        def __init__(self,name):
            self.name=name
            super().__init__()
        def run(self):
            print('%s is piaoing' %self.name)
            time.sleep(random.randrange(1,3))
            print('%s is piao end' %self.name)
    
    
    p=Piao('egon')
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    print('')

    五 进程同步(锁,也叫互斥锁)

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

    竞争带来的结果就是错乱,如何控制,就是加锁处理

    part1:多个进程共享同一打印终端

    #并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    from multiprocessing import Process
    import os,time
    def work():
        print('%s is running' %os.getpid())
        time.sleep(2)
        print('%s is done' %os.getpid())
    
    if __name__ == '__main__':
        for i in range(3):
            p=Process(target=work)
            p.start()
    
    并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    #由并发变成了串行,牺牲了运行效率,但避免了竞争
    from multiprocessing import Process,Lock
    import os,time
    def work(lock):
        lock.acquire()
        print('%s is running' %os.getpid())
        time.sleep(2)
        print('%s is done' %os.getpid())
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        for i in range(3):
            p=Process(target=work,args=(lock,))
            p.start()
    
    加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争

    part2:多个进程共享同一文件

    文件当数据库,模拟抢票

    #文件db的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db.txt'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db.txt'))
        time.sleep(0.1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('33[43m购票成功33[0m')
    
    def task(lock):
        search()
        get()
    if __name__ == '__main__':
        lock=Lock()
        for i in range(100): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,))
            p.start()
    
    并发运行,效率高,但竞争写同一文件,数据写入错乱
    #文件db的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db.txt'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db.txt'))
        time.sleep(0.1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2) #模拟写数据的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('33[43m购票成功33[0m')
    
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        for i in range(100): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,))
            p.start()
    
    加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
     
    
    
    总结:

    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理

    因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    六 队列:

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

     创建队列的类(底层就是以管道和锁定的方式实现)

    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 
    参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。
    方法介绍:
        

      q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

    q.get_nowait():同q.get(False)
    q.put_nowait():同q.put(False)

    q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    ~~~ 应用:

    '''
    multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    都是基于消息传递实现的,但是队列接口
    '''
    
    from multiprocessing import Process,Queue
    import time
    q=Queue(3)
    
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)
    q.put(3)
    q.put(3)
    print(q.full()) #满了
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty()) #空了





  • 相关阅读:
    文本比较算法Ⅳ——Nakatsu算法
    网游中,“时间停止的实现”的设想
    GDI+绘制自定义行距的文本(续)
    GDI+绘制自定义行距的文本的三种方法。
    文本比较算法Ⅱ——Needleman/Wunsch算法
    文本比较算法Ⅲ——计算文本的相似度
    javascript笔记:深入分析javascript里对象的创建(上)续篇
    java笔记:自己动手写javaEE框架(五)Spring事务管理学习
    java笔记:自己动手写javaEE框架(三)引入SQL监控技术P6spy
    java笔记:自己动手写javaEE框架(一)数据访问层DAO以及DAO的单元测试
  • 原文地址:https://www.cnblogs.com/liuduo/p/7651402.html
Copyright © 2011-2022 走看看