zoukankan      html  css  js  c++  java
  • Python之并发编程(二)进程

    进程

    1. multiprocessing模块介绍

      1. python中的多线程无法利用CPU资源,在python中大部分情况使用多进程。python中提供了非常好的多进程包multiprocessing。
      2. multiprocessing模块用来开启子进程,并在子进程中执行功能(函数),该模块与多线程模块threading的编程接口类似。
      3. multiprocessing的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
    2. Process类的介绍

      1. 创建进程的类:

        Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
        
        强调:
        1. 需要使用关键字的方式来指定参数
        2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
        
      2. 参数介绍:

        group参数未使用,值始终为None
        
        target表示调用对象,即子进程要执行的任务
        
        args表示调用对象的位置参数元组,args=(1,2,'egon',)
        
        kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
        
        name为子进程的名称
        
      3. 方法介绍:

        p.start():启动进程,并调用该子进程中的p.run() 
        p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
        
        p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
        如果p还保存了一个锁那么也将不会被释放,进而导致死锁
        p.is_alive():如果p仍然运行,返回True
        
        p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,
        需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
        
      4. 属性介绍:

        p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
        
        p.name:进程的名称
        
        p.pid:进程的pid
        
        p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
        
        p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
        
    3. Process类的使用

      1. 一定要把开进程的代码写在if name=='main':下面

        开一个进程和主进程是并发的关系,我start一下就是先告诉操作系统我要开一个进程
        ,然而它不会等待,他会去执行下面的代码,完了他吧进程开始后,就开始执行了

        strat():方法的功能

          1.开启进程
          2.执行功能

    4. 开启进程的两种方式:

      1. 第一种:

        from multiprocessing import Process
        import time
        import random
        def piao(name):
            print('%s is piaoing'%name)
            time.sleep(random.randint(1,3))
            print('%s is piao end'%name)
        if __name__ =='__main__':
            p1 = Process(target=piao,kwargs={'name':'alex'})# 创建一个进程对象
            p2 = Process(target=piao,kwargs={'name':'alex'})
            p3 = Process(target=piao,kwargs={'name':'alex'})
            p1.start() # 只是向操作系统发出一个开辟子进程的信号,然后就执行下一行了.
            # 这个信号操作系统接收到之后,会从内存中开辟一个子进程空间,然后在将主进程所有数据copy加载到子进程,然后在调用cpu去执行.
            # 开辟子进程开销是很大的
            p2.start()
            p3.start()
            print('主进程')
        
        #第一种方式
        
      2. 第二种:

        from multiprocessing import Process
        import time
        import random
        import os
        class Piao(Process):
            def __init__(self,name):
                super().__init__() #必须继承父类的一些属性
                self.name = name
            def run(self):  #必须得实现一个run方法
                print(os.getppid(),os.getpid())
                print('%s is piaoing'%self.name)
                time.sleep(random.randint(1,3))
                print('%s is piao end'%self.name)
        if __name__ =='__main__':
            p1 = Piao('alex')
            p2 = Piao('wupeiqi')
            p3 = Piao('yuanhao')
            p1.start()
            p2.start()
            p3.start()
            print('主进程',os.getpid())
        
        #第二种方式
        
    5. 验证进程之间的空间隔离:

      # 接下来我们验证一下进程之间的互相隔离。
      
      # 在一个进程中
      x = 1000
      
       def task():
           global x
           x = 2
      
       task()
       print(x)
      # 在不同的进程中:
       from multiprocessing import Process
       import time
       x = 1000
      
       def task():
           global x
           x = 2
      
       if __name__ == '__main__':
           p = Process(target=task)
           p.start()
           time.sleep(3)
           print(x)
      
      #代码验证
      
    6. 进程对象的join方法:

      from multiprocessing import Process
      import time
      
      # 父进程等待子进程结束之后在执行
      # 方法一 加sleep 不可取!
      
       def task(n):
           time.sleep(3)
           print('子进程结束....')
      
       if __name__ == '__main__':
           p = Process(target=task,args=('太白金星',))
           p.start()
           time.sleep(5)
           print('主进程开始运行....')
      
      # 这样虽然达到了目的,
      # 1,但是你在程序中故意加sleep极大影响程序的效率。
      # 2,sleep(3)只是虚拟子进程运行的时间,子进程运行完毕的时间是不固定的。
      
      
      # 方法二: join
      
       from multiprocessing import Process
       import time
      
      
       def task(n):
           time.sleep(3)
           print('子进程结束....')
      
      
       if __name__ == '__main__':
           p = Process(target=task,args=('太白金星',))
           p.start()
           p.join()  # 等待p这个子进程运行结束之后,在执行下面的代码(主进程).
           print('主进程开始运行....')
      
      # 接下来我要开启十个子进程,先看看效果
      
       from multiprocessing import Process
       import time
      
       def task(n):
           print('%s is running' %n)
      
       if __name__ == '__main__':
           for i in range(1, 11):
               p = Process(target=task,args=(i,))
               p.start()
      #         '''
      #         我这里是不是运行十个子进程之后,才会运行主进程?当然不会!!!
      #         1,p.start()只是向操作系统发送一个请求而已,剩下的操作系统在内存开启进程空间,运行进程程序不一定是马上执行。
      #         2,开启进程的开销是比较大的。
      #         '''
           print('主进程开始运行....')
          
      # 那么有人说,老师我对这个不理解,我给你拆解开来。
      
       from multiprocessing import Process
       import time
      
       def task(n):
           print('%s is running' %n)
      
       if __name__ == '__main__':
           p1 = Process(target=task,args=(1,))
           p2 = Process(target=task,args=(2,))
           p3 = Process(target=task,args=(3,))
           p4 = Process(target=task,args=(4,))
           p5 = Process(target=task,args=(5,))
      
           p1.start()
           p2.start()
           p3.start()
           p4.start()
           p5.start()
      
           print('主进程开始运行....')
      
      # 接下来 实现起多子个进程,然后等待这些子进程都结束之后,在开启主进程。
      
       from multiprocessing import Process
       import time
      
       def task(n):
           time.sleep(3)
           print('%s is running' %n)
      
       if __name__ == '__main__':
           start_time = time.time()
           p1 = Process(target=task,args=(1,))
           p2 = Process(target=task,args=(2,))
           p3 = Process(target=task,args=(3,))
      #     # 几乎同一个时刻发送三个请求
           p1.start()
           p2.start()
           p3.start()
      #     # 对着三个自己成使用三个join
      
           p1.join()
           p2.join()
           p3.join()
      
           print(time.time() - start_time,'主进程开始运行....')
           # 3s 多一点点这是来回切换的所用时间。
      
      # 那么在进行举例:
      
       from multiprocessing import Process
       import time
      
       def task(n):
           time.sleep(n)
           print('%s is running' %n)
      
       if __name__ == '__main__':
           start_time = time.time()
           p1 = Process(target=task,args=(1,))
           p2 = Process(target=task,args=(2,))
           p3 = Process(target=task,args=(3,))
           # 几乎同一个时刻发送三个请求
           p1.start()
           p2.start()
           p3.start()
           # 对着三个自己成使用三个join
      
           p1.join()  # 1s
           p2.join()  # 2s
           p3.join()  # 3s
      
           print(time.time() - start_time,'主进程开始运行....')
          # 3s 多一点点这是来回切换的所用时间。
          
      # 利用for循环精简上面的示例:
      
      
       from multiprocessing import Process
       import time
      
       def task(n):
           time.sleep(1)
           print('%s is running' %n)
      
       if __name__ == '__main__':
           start_time = time.time()
            for i in range(1,4):
                p = Process(target=task,args=(i,))
                p.start()
                p.join()
      
           p1 = Process(target=task,args=(1,))
           p2 = Process(target=task,args=(2,))
           p3 = Process(target=task,args=(3,))
      #     # 几乎同一个时刻发送三个请求
           p1.start()
           p1.join()
           p2.start()
           p2.join()
           p3.start()
           p3.join()
           # 上面的代码,p1.join()他的作用:你的主进程代码必须等我的p1子进程执行完毕之后,在执行
      #     # p2.start()这个命令是主进程的代码。
      #     # 而 如果你这样写:
      #     '''
      #     p1.join()
      #     p2.join()
       #    p3.join()
      #     '''
      
           print(time.time() - start_time,'主进程开始运行....')
      
      # 所以你上面的代码应该怎么写?
      
      
       from multiprocessing import Process
       import time
      
       def task(n):
           time.sleep(3)
           print('%s is running' %n)
      
       if __name__ == '__main__':
           p_l = []
           start_time = time.time()
           for i in range(1,4):
               p = Process(target=task,args=(i,))
               p.start()
               p_l.append(p)
           # 对着三个自己成使用三个join
           for i in p_l:
               i.join()
           print(time.time() - start_time,'主进程开始运行....')
      
    7. 进程对象的其他属性(了解):

      # from multiprocessing import Process
      # import time
      # import os
      #
      # def task(n):
      #     time.sleep(3)
      #     print('%s is running' %n,os.getpid(),os.getppid())
      #
      # if __name__ == '__main__':
      #     p1 = Process(target=task,args=(1,),name = '任务1')
      #     # print(p1.name) # 给子进程起名字
      #     # for i in range(3):
      #     #     p = Process(target=task, args=(1,))
      #     #     print(p.name)  # 给子进程起名字
      #     p1.start()
      #     # p1.terminate()
      #     # time.sleep(2)  # 睡一会,他就将我的子进程杀死了。
      #     # print(p1.is_alive())  # False
      #     print(p1.pid)
      #     # print('主')
      #     print(os.getpid())
      
    8. 守护进程

      1. 守护进程的概念:

        # 守护进程:
        # 古时候 太监守护这个皇帝,如果皇帝驾崩了,太监直接也就死了.
        # 子进程守护着主进程,只要主进程结束,子进程跟着就结束,
        
      2. 代码示例:

        from multiprocessing import Process
        import time
        
        def task(name):
            print(f'{name} is running')
            time.sleep(2)
            print(f'{name} is gone')
        
        if __name__ == '__main__':
            p = Process(target=task, args=('立业',))
            p.daemon = True #将子进程p设置成守护进程,只要主进程结束,守护进程也马上结束
            p.start()
            time.sleep(1)
            print('==主进程执行')
        #打印结果
        # 立业 is running  #主进程结束,子进程也随之结束,
        # ==主进程执行
        
    9. 僵尸进程与孤儿进程:

      1. 僵尸进程和孤儿进程只会出现在Unix,linux,macos上面,windows上不会出现

      2. 进程的概念:

        #1:主进程需要等待子进程结束之后,主进程才结束
        	主进程时刻监视子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收
        
      3. 为什么主进程不在子进程结束后马上对其进行回收

        #1:主进程和子进程时异步关系,主进程无法马上捕获子进程什么时候结束
        #2:如果子进程结束后马上在内存中释放资源,主进程就没有办法监视子进程的状态了
        
      4. Unix针对上面的问题,提供了一个机制

        #所有的子进程结束之后,立马会释放文件的操作链接,内存中的大部分数据,但会保留一些内容:进程号,结束时间,运行状态,等待主进程监测,回收
        
      5. 僵尸进程:

        #所有的子进程在结束之后,在被主进程回收之前都会进入僵尸状态
        
      6. 僵尸进程是有害的:

        #如果父进程不对僵尸进程进行回收,产生大量的僵尸进程,这样就会占用内存,占用进程pid号
        
      7. 僵尸进程如何解决:

        #父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由init进行回收
        
      8. 孤儿进程:

        #父进程由于某种原因结束了,但是你的子进程还在运行,这样你的这些子进程就变成了孤儿进程,你的父进程如果结束了,你的所有的孤儿进程,就会被init进程回收,init就变成了父进程,对孤儿进程进行回收
        
    10. 互斥锁:

      1. 什么是互斥锁:

        #互斥锁就是在保证子进程串行的同时,也保证了子进程执行顺序的随机性,以及数据的安全性
        
        使用互斥锁的注意点:
            使用互斥锁不能连续锁,不然会造成阻塞和死锁
        
      2. 代码示例:

        from multiprocessing import Process
        from multiprocessing import Lock
        import time
        import os
        import random
        import sys
        def task1(lock):
            lock.acquire()
            print(f'{os.getpid()}开始打印了')
            time.sleep(random.randint(1,3))
            print(f'{os.getpid()}打印结束了')
            lock.release()
        
        def task2(lock):
            lock.acquire()
            print(f'{os.getpid()}开始打印了')
            time.sleep(random.randint(1, 3))
            print(f'{os.getpid()}打印结束了')
            lock.release()
        
        def task3(lock):
            lock.acquire()
            print(f'{os.getpid()}开始打印了')
            time.sleep(random.randint(1, 3))
            print(f'{os.getpid()}打印结束了')
            lock.release()
        
        if __name__ == '__main__':
            lock=Lock()
            for i in  ['task1','task2','task3']:
                p=Process(target=getattr(sys.modules[__name__],i),args=(lock,))
                p.start()
        
      3. 示意图:

      4. join和Lock的区别:

        #共同点:
        	都可以把并发改成串行,保证了执行的顺序
        #不同点:
        	join是人为的设置顺序,Lock锁时让其争抢顺序,保证了公平性
        
      5. 基于文件的进程之间的通信:

        #进程在内存级别是隔离的,但是文件在磁盘上
        #使用lock锁实现文件上的通讯
        from multiprocessing import Process
        from multiprocessing import Lock
        import json
        import time
        import os
        import random
        # 当多个进程共抢一个数据时,如果要保证数据的安全,必须要串行.
        # 要想让购买环节进行串行,我们必须要加锁处理.
        
        def search():
            time.sleep(random.randint(1,3))  # 模拟网络延迟(查询环节)
            with open('ticket.json',encoding='utf-8') as f1:
                dic = json.load(f1)
                print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}')
        def paid():
            with open('ticket.json', encoding='utf-8') as f1:
                dic = json.load(f1)
            if dic['count'] > 0:
                dic['count'] -= 1
                time.sleep(random.randint(1,3))  # 模拟网络延迟(购买环节)
                with open('ticket.json', encoding='utf-8',mode='w') as f1:
                    json.dump(dic,f1)
                print(f'{os.getpid()} 购买成功')
        def task(lock):
            search()
            lock.acquire()
            paid()
            lock.release()
        
        if __name__ == '__main__':
            mutex = Lock()
            for i in range(6):
                p = Process(target=task,args=(mutex,))
                p.start()
        #当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行.
        #互斥锁: 可以公平性的保证顺序以及数据的安全.
        
      6. 基于文件的进程通信的特点

        #1:效率低
        #2:自己加锁麻烦而且容易出现死锁
        
    11. 队列

      1. 队列的特点:

        #1:把队列当成一个容器,这个容器可以承载一些数据
        #2:队列的特性是数据保持先进先出原则,FIFO
        
      2. 队列示例代码:

        from multiprocessing import Queue
        q = Queue(3)
        
        q.put(1)
        q.put('alex')
        q.put([1,2,3])
        q.put(5555)  # 当队列满了时,在进程put数据就会阻塞.
        
        print(q.get())
        print(q.get())
        print(q.get())
        print(q.get())  # 当数据取完时,在进程get数据也会出现阻塞,直到某一个进程put数据.
        
        
        from multiprocessing import Queue
        q = Queue(3)  # maxsize
        
        q.put(1)
        q.put('alex')
        q.put([1,2,3])
        # q.put(5555,block=False)
        #
        print(q.get())
        print(q.get())
        print(q.get())
        print(q.get(timeout=3))  # 阻塞3秒,3秒之后还阻塞直接报错.
        # print(q.get(block=False))
        
        # block=False 只要遇到阻塞就会报错.
        
      3. 使用基于文件和队列的进程通信方式的优缺点:

        #基于文件的进程通讯方式,在写入文件的时候耗费性能,效率低,在枷锁的时候,又可能出现锁死或递归锁的情况,相比之下队列的效率更高,也不会出现锁死或者递归锁的情况
        
  • 相关阅读:
    解析数据库xml格式字段
    Spring切面表达式,hibernate,struts
    常见枚举
    经典冒泡排序,九九乘法表,三角形
    拦截器后台安全验证
    Oracle 将表中多条数据同一字段拼成一列显示
    idea 11 控制台日志乱码
    BigDecimal 分转元,元转分
    currtDownLatch可能会出现的三个问题
    CountDownLatch用法
  • 原文地址:https://www.cnblogs.com/zhangdadayou/p/11431904.html
Copyright © 2011-2022 走看看