zoukankan      html  css  js  c++  java
  • 多进程

    multiprocessing模块下Process、Lock

    进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。

    无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

    并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发

    并行:同时运行,只有具备多个cpu才能实现并行

    开启子进程

    方式一:实例化Process对象,指定参数target,args

    方式二:重构Process类的run方法,直接实例化

    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running' %name)
        time.sleep(3)
        print('%s is done' %name)
    
    if __name__ == '__main__':
        # p = Process(target=task,kwargs={'name':'子进程1'})
        p = Process(target=task,args=('子进程1',))
        p.start() #仅仅只是给操作系统发送了一个信号
    
        print('')
    方式一
    from multiprocessing import Process
    import time
    
    
    class MyProcess(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):  # 方法名一定要叫run
            print('%s is running' % self.name)
            time.sleep(3)
            print('%s is done' % self.name)
    
    
    # windows系统里开进程一定要放在if __name__ == '__main__':
    
    if __name__ == '__main__':
        p = MyProcess('子进程1')
        p.start()  # 向操作系统发信号,告诉操作系统开启子进程
        print('')
    方式二

    查看进程号pid

    os.getpid():当前文件的进程id
    os.getppid():当前文件的父进程id,即pycharm
    windows 系统cmd查看进程id:tasklist | findstr pycharm(进程名)
           杀死进程:taskkill /F /PID 进程号xxx
    from multiprocessing import Process
    import time, os
    
    
    def task():
        print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
        time.sleep(3)
        print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))
    
    
    if __name__ == '__main__':
        p = Process(target=task, )
        p.start()
    
        print('', os.getpid(), os.getppid())
        # os.getpid():当前文件的进程id
        # os.getppid():当前文件的父进程id,即pycharm
        '''
        windows 系统cmd查看进程id命令:
        tasklist | findstr pycharm
        '''
    View Code

    join方法

    # join方法
    # from multiprocessing import Process
    # import time,os
    #
    # def task():
    #     print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid()))
    #     time.sleep(3)
    #     print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid()))
    #
    # if __name__ == '__main__':
    #     p=Process(target=task,)
    #     p.start()
    """"""
    #     p.join()  # 进程p执行结束以后程序才会往下走
    #     print('主',os.getpid(),os.getppid())
    #     print(p.pid)  # 验证僵尸进程,子进程已结束但pid未回收,
    #                   # 如果主进程pid被回收,子进程pid也被回收
    
    
    # from multiprocessing import Process
    # import time,os
    #
    # def task(name,n):
    #     print('%s is running' %name)
    #     time.sleep(n)
    #
    #
    # if __name__ == '__main__':
    #     start=time.time()
    #     p1=Process(target=task,args=('子进程1',5))
    #     p2=Process(target=task,args=('子进程2',3))
    #     p3=Process(target=task,args=('子进程3',2))
    #     p_l=[p1,p2,p3]
    #
    #     # p1.start()  # 向操作系统发信号,告诉操作系统开启子进程
    #     # p2.start()  # 但是操作系统什么时候开,先开谁
    #     # p3.start()  # 不知道
    #     for p in p_l:
    #         p.start()
    #
    #     # p1.join()
    #     # p2.join()
    #     # p3.join()
    #     for p in p_l:
    #         p.join()
    #
    #     print('主',(time.time()-start))
    
    
    # from multiprocessing import Process
    # import time,os
    #
    # def task(name,n):
    #     print('%s is running' %name)
    #     time.sleep(n)
    #
    #
    # if __name__ == '__main__':
    #     start=time.time()
    #     p1=Process(target=task,args=('子进程1',5))
    #     p2=Process(target=task,args=('子进程2',3))
    #     p3=Process(target=task,args=('子进程3',2))
    #
    #     p1.start()
    #     p1.join()
    #     p2.start()
    #     p2.join()
    #     p3.start()
    #     p3.join()
    #
    #     print('主',(time.time()-start))
    
    
    # 了解:is_alive
    from multiprocessing import Process
    import time, os
    
    
    def task():
        print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid()))
        time.sleep(3)
        print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid()))
    
    
    if __name__ == '__main__':
        # p=Process(target=task,)
        # p.start()
        # # print(p.is_alive()) # p进程是否结束
        # p.join()
        # print('主',os.getpid(),os.getppid())
        # print(p.pid)
        # # print(p.is_alive())
    
        p = Process(target=task, name='sub——Precsss')
        p.start()
        p.terminate()  # 向系统发信号:"干掉p进程"。系统什么时候执行?不知道
        time.sleep(3)
        print(p.is_alive())   # 进程p是否还活着
        print('')
        print(p.name)  # 进程名,在产生进程的时候可以设置,否则使用默认值
    #                  # 进程p已被干掉但进程名,进程id还在
    View Code

    守护进程:p.daemon=True   在p.start()之前设置

        主进程结束则整个程序结束,不会再等子进程执行完

    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running' %name)
        time.sleep(2)
        p=Process(target=time.sleep,args=(3,))
        p.start()
    
    
    if __name__ == '__main__':
        p=Process(target=task,args=('子进程1',))
        p.daemon=True  # 设置p为守护进程,一定要在start前设置
        p.start()      # 守护进程内不能再开子进程
    
        p.join()
        print('')
    View Code
    
    

    互斥锁:多个进程时,牺牲效率,保证数据安全

      在开启进程之前先生成锁,之后抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢

    # 互斥锁:牺牲效率,保证数据安全
    from multiprocessing import Process, Lock
    import time
    
    
    def task(name, mutex):
        mutex.acquire()  # 获取锁
        print('%s 1' % name)
        time.sleep(1)
        print('%s 2' % name)
        time.sleep(1)
        print('%s 3' % name)
        mutex.release()  # 释放锁
    
    
    if __name__ == '__main__':
        mutex = Lock()  # 生成锁
        for i in range(3):  # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢
            p = Process(target=task, args=('进程%s' % i, mutex))
            p.start()
    View Code
    if __name__ == '__main__':
        # mutex = Lock()  # 生成锁
        for i in range(3):  # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢
            p = Process(target=task, args=('进程%s' % i, Lock() ))
            p.start()
    
    
    可不可以这样呢?如果Lock() 是单例模式,那么没问题,否则不可。三个进程拿到三把不同的锁,就没有意义了。
    question

    join():把整个程序变成串行
    Lock():把并行的程序,需要串行的地方加锁改成串行

    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

    虽然可以用文件共享数据实现进程间通信,但问题是:

      1、效率低(共享数据基于文件,而文件是硬盘上的数据)

      2、需要自己加锁处理

    因此我们最好找寻一种解决方案能够兼顾:

      1、效率高(多个进程共享一块内存的数据)

      2、帮我们处理好锁问题。

    这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

    队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    队列:Queue

      先进先出

    from multiprocessing import Queue
    
    q = Queue(3)
    
    q.put('hello')
    q.put({'a': 1})
    q.put([3, 3, 3, ])
    print(q.full())
    
    # q.put(4)
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    View Code
    from multiprocessing import Process, Queue
    import time
    
    
    def producer(q):
        for i in range(3):
            res = '包子%s' % i
            time.sleep(0.5)
            print('生产者生产了%s' % res)
    
            q.put(res)
    
    
    def consumer(q):
        while True:
            res = q.get()
            # if res is None:break
            if not res: break
            time.sleep(1)
            print('消费者吃了%s' % res)
    
    
    if __name__ == '__main__':
        # 容器
        q = Queue()
    
        # 生产者们
        p1 = Process(target=producer, args=(q,))
        p2 = Process(target=producer, args=(q,))
        p3 = Process(target=producer, args=(q,))
    
        # 消费者们
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
    
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()  # 等所有的生产者生产完毕以后,向队列里放入结束信号
        q.put(None)  # 有几个消费者放几个结束信号
        q.put('')
        print('')
    View Code

    JoinableQueue

    from multiprocessing import Process, JoinableQueue
    import time
    
    
    def producer(q):
        for i in range(2):
            res = '包子%s' % i
            time.sleep(0.5)
            print('生产者生产了%s' % res)
    
            q.put(res)
        q.join()
    
    
    def consumer(q):
        while True:
            res = q.get()
            if res is None: break
            time.sleep(1)
            print('消费者吃了%s' % res)
            q.task_done()
    
    
    if __name__ == '__main__':
        # 容器
        q = JoinableQueue()
    
        # 生产者们
        p1 = Process(target=producer, args=(q,))
        p2 = Process(target=producer, args=(q,))
        p3 = Process(target=producer, args=(q,))
    
        # 消费者们
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
        c1.daemon = True
        c2.daemon = True
    
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()
        print('')
    View Code
    这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
  • 相关阅读:
    [转]深度理解依赖注入(Dependence Injection)
    [转]控制反转(IOC)和依赖注入(DI)
    [转]依赖注入的概念
    [转]struct实例字段的内存布局(Layout)和大小(Size)
    异步编程模式
    HTTP协议返回代码含义
    [转]StructLayout特性
    Stack的三种含义
    FineUI登入的例子中遇到的一些问题
    编程以外积累: 如何给项目生成类似VS2008的说明文档
  • 原文地址:https://www.cnblogs.com/webc/p/9167970.html
Copyright © 2011-2022 走看看