zoukankan      html  css  js  c++  java
  • 多进程编程——理论讲解与 multiprocessing 模块

    多进程编程
      import os
      pid = os .fork()
      功能 :创建新的进程
      参数: 无
      返回值 :失败返回一个负数
          成功:在原有进程中返回新的进程的PID号
              在新进程中返回为0
    * 子进程会复制父进程传问代码段,包括fork之前产生的内存空间
    * 子进程从fork的下一句开始执行,与父进程互不干扰
    * 父子进程的执行顺序是不一定的,父子进程公用一个终端显示
    * 父子进程通常会根据fork返回值得差异选择执行不同的代码。所以if 结构几乎是fork 的固定搭配
    * 父子进程空间独立,操作的都是本空间的内容,互不影响
    * 子进程也有自己的特性,比如PID号,PCB,命令集等

    获取进程PID
      os.getpid()
        功能: 获取当前进程的进程号
        返回值: 返回进程号
      os.getppid()
        功能: 当前进程父进程的PID号
        返回值: 返回进程号 
    进程退出
      os._exit(status)    结束进程之后不在执行后面的内容
        功能:进程退出状态
        参数:进程的退出状态
      sys.exit([status])   sys.exit('这是退出信息')   退出时,打印退出信息
        功能: 进程退出
        参数: 数字表示退出状态,不写默认为0
            字符串,表示退出时打印的内容
        此函数是抛出异常退出,这个异常可以通过【except SystemExit as e】被捕获,并显示出来,之后程序将不会退出

    Z :该程序应该已经终止,但是其父程序却无法正常的终止他,造成 zombie (疆尸) 程序的状态 (僵尸进程,孤儿进程)

      孤儿进程:定义

          父进程先于子进程退出,此时子进程就称为孤儿进程。

          * 孤儿进程会被操作系统指定的进程收养,系统进程就成为孤儿进程的新的父进程

        僵尸进程 :定义

    子进程先于父进程退出,但是父进程没有处理子进程的退出状态,此时子进程就会成为僵尸进程

          * 僵尸进程会存留少量PCB信息在内存中,大量的僵尸进程会消耗系统资源,应该避免僵尸进程的产生

              如何避免僵尸进程产生

                * 处理子进程退出状态

               1 pid,status = os.wait()

                   功能 :在父进程中阻塞等待处理子进程退出

                  

    import os
    from time import sleep
    pid = os.fork()
    if pid < 0:
        print('创建失败')
    elif pid == 0 :
        print('创建成功PID,',os.getpid())
        os._exit(9) #返回退出状态[状态码*256]
    else:
        print('主进程PID,',os.getpid())
        pid,status = os.wait() #获取子进程退出PID和进程状态,阻塞函数
        print(pid,status) #打印
        print(os.WEXITSTATUS(status)) #获取子进程退出状态
        while True:
            sleep(2)
    View Code wait

     

                2   返回值 : PID退出的子进程的PID号 ,,stastus 获取子进程退出状态

                 pid,status = os.waitpid(pid,option)  

                  功能 : 在父进程中阻塞等待处理子进程退出

                   参数 : pid  -1 表示等待任意子进程退出

                               >0 表示等待对应PID号的子进程退出

                          option 0 表示阻塞等待

                                 WNOHANG 表示非阻塞

                   返回值 : pid 退出的子进程的PID号

                            status 获取子进程退出状态

                   waitpid(-1,0) ==> wait(

     

    import os
    from time import sleep
    pid = os.fork()
    if pid < 0:
        print('创建失败')
    elif pid == 0 :
        print('创建成功PID,',os.getpid())
        os._exit(9) #返回退出状态[状态码*256]
    else:
        print('主进程PID,',os.getpid())
        while True:
            pid,status = os.waitpid(-1,os.WNOHANG)
            print(pid,status)
            print(os.WEXITSTATUS(status))
            sleep(1)
    View Code -非阻塞

     

     multiprocessing 模块创建进程

      1 需要将要执行的事情封装为函数

      2 使用 multiprocessing模块中 Process类创建进程对象

      3 通过对象属性设置和 Process的初始化函数对进程进行设置,绑定要执行的函数

      4 启动进程,会自动执行进程绑定的函数

      5 完成进程的回收

      Process()  

        功能 :创建进程对象

        参数 : name   进程名称  (默认为Process-1)

             target  绑定函数 (必填)

            args   元组 给target函数按照位置传参

            kwargs  字典 给target函数按照键值对传参

        返回需要创建进程对象

        p = Process(target = * , args(*,))

        p.start 

          功能 :启动进程

          *target 函数会自动执行,此时进程真正被创建

        p.join([timeout])

          功能 : 阻塞等待回收子进程

          参数 : 超时时间

          * 使用 multiprocessing 创建子进程,同样子进程复制父进程的全部代码段,父子进程各自执行互不影响,

              父子进程有各自的运行空间

          * 如果不使用join回收子进程则子进程退出后会成为僵尸进程

          * 使用 multiprocessing 创建子进程往往父进程只是用来创建进程回收进程

        ****************

          注意

            1 如果子进程从父进程拷贝对象,对象和网络或者文件相关联,那么父子进程会使用同一套对象属性,相互有一定的关联性

            2 如果在子进程中单独创建对象,则和父进程完全没有关联

        ****************

        p.is_alive() 判断进程生命周期状态,处于生命周期得到True ,否则返回False 

        p.name 进程名称 默认为Process-1

        p.pid 进程的PID 号

        p.daemon 

          默认为False ,主进程退出不会影响子进程执行

          如果设置为True 则子进程会随着主进程结束而结束

          * 要在start 前设置

          * 一般不和 join一起使用

      创建算定义进程类

        1 继承Process 

        2 编写自己的__init__,同时加载父类的init方法

        3 重写run方法,可以通过生成的对象调用start自动运行

    class Process_1(Process):
        def __init__(self,value):
            self.value = value
            super().__init__()
    
        def run(self):
            for i in range(3):
                print('%s--%s-'%(i,self.value))
    p=Process_1(5)
    p.start()
    p.join()
    View Code

       多进程

         优点:

            1 可以使用计算机多核,进行任务的并发执行,提高执行效率!

            2 空间独立,数据安全

            3 运行不受其他进程影响,创建方便

         缺点

            进程的创建和删除消耗的系统资源较多

    进程池技术:

      产生原因 : 如果有大量任务需要多进程完成,则可能需要频繁的创建删除进程,给计算机带来较多的资源消耗。

      原理: 创建适当的进程被放入进程池,用来处理待处理事件,处理完毕后进程不销毁,仍然在进程池中等待处理其他事件。

          进程的利用降低了资源的消耗

      使用方法:

        1 创建进程池,在池内放入适当的进程

        2 将事件加入到进程池等待队列

        3 不断取进程执行事件,直到所有事件执行完毕

        4 关闭进程池,回收进程

        from multiprocessing import Pool

        Poll(process)

          功能 :创建进程池对象

          参数 :表示进程池中有多少进程

        pool.apply_async(func,args,kwds)

          功能:将事件放入到进程池队列

          参数:func事件函数

          args:以元组形式给func传参

          kwds: 以字典形式给func传参

        pool.close()  关闭进程池

        poll.join()  回收进程池

    from multiprocessing import Process,Pool
    import time
    def we(msg):
        time.sleep(2)
        print(msg)
        return  time.time()
    pool = Pool(4)
    a=[]  #创建列表
    for i in range(10):
        msg = 'hell ___%s:::'%i
        c=pool.apply_async(we,(msg,)) #返回的是一个result对象
        a.append(c) 
    pool.close()
    pool.join()
    
    for i in a:
        print(i.get())##或获取进程的返回值
    View Code

         poll.map(func,iter)

            功能:将要做的事件放入进程池

            参数:func 要执行的函数

               iter  迭代对象

            返回值 :返回事件函数的返回值列表

    from multiprocessing import Process,Pool
    import time
    def we(msg):
        time.sleep(2)
        print(msg,Process.__name__)
        return  msg*2
    pool = Pool(4)
    a=pool.map(we,range(10)) #返回可迭代对象,此处map跟高阶map函数相差不多
    pool.close()
    pool.join()
    for i in a:
        print(i)
    View Code---map相差不多

     


     

     

     进程间通信(IPC)

        原因 :进程空间相对独立,资源无法相互获取,此时在不同进程间通信需要专门的方法

        进程间通信的方法: 管道 ,消息队列,共享内存,信号,信号量,套接字

      管道通信 pipe

        通信原理 :在内存中开辟管道空间,生成管道操作对象,多个进程使用‘同一个’管道对象进行操作即可实现通信

      multiprocessing --> Pipe

      fd1,fd2 = Pipe(duplex = True)

        功能 :创建管道

        参数 :默认表示双向管道

            如果设置为False则为单向管道

        返回值 :表示管道的两端

             如果是双向管理 都可以读写

             如果是单向管道 ,则fd1 只读, fd2只写

        fd.recv() 没有参数

          功能 :从管道读取信息  

          返回值:读取到的内容

          *如果管道为空则阻塞

        fd.send(data)

          功能:向管道写入内容

          参数: 要写入的内容

          * 可以发送python 任意数据类型

    from multiprocessing import Process,Pipe
    from time import sleep
    
    fd1,fd2 = Pipe() #建立管道
    def f1(fd1):
        a=0
        while a<5:
            date=fd1.recv()
            print(date)
            if date:
                a+=1
        for i in range(5): #单进程控制5条进程信息,所以 发5次
            fd1.send('完成')
        print('正在结束!')
        
    def f2(fd2):
        fd2.send('这是f2的name')
        date=fd2.recv()
        print(date)
    
    p1 = Process(target=f1,args=(fd1,))  #创建接收管道,单进程进行读写
    p1.start()
    
    a=[]
    for i in range(5):
        p = Process(target=f2,args=(fd2,))
        a.append(p)
        p.start()
    
    for i in a:
        i.join()
    print('子进程已结束,等待收进程结束')
    p1.join()
    View Code--管道读写

      消息队列

          队列 :先进先出

          通信原理 :在内存中建立队列数据结构模型。多个进程都可以通过队列存入内容,取出内容的顺序和存入顺序保持一致

        创建队列 

          q = Queue(maxsize = 0 )

            功能 : 创建消息队列、

            参数 : 表示最多存放多少消息,默认表示根据内存分配存储

            返回值:队列对象

          q.put(data,[block,timeout])

            功能:向队列存储消息

            参数 :data 要存的内容

                block 默认队列满时会阻塞,设置为False 则非阻塞

                timeout 超时时间

          data = q.get([block,timeout])

            功能 :获取队列消息

            参数 : block 默认队列空时会阻塞,设置为False则非阻塞

                timeout 超时时间

            返回值 :返回取出的内容

          q.full()  判断队列是否为满

          q.empty()  判断队列是否为空

          q.qsize()  判断队列中消息数量

          q.close()  关闭队列

      共享内存 

         通信原理:在内存中开辟一块空间,对多个进程可见,进程可以写入输入,但是每次写入的内容会覆盖之前的内容。

         obj = Value(ctype,obj)

          功能 :开辟共享内存空间

          参数 :ctype 要存储的数据类型('c','i')

                     

              obj 共享内存的初始化数据

          返回 :共享内存对象

          obj.value 即为共享内存值,对其修改即修改共享内存

     

    from multiprocessing import Process,Value
    import time
    import random
    money =Value('i',2000)
    
    def deposite():
        for i in range(100):
            time.sleep(0.01)
            money.value += random.randint(1,200)
    
    def withdraw():
        for i in range(100):
            time.sleep(0.02)
            money.value -= random.randint(1,100)
    p1 = Process(target=deposite)
    p2 = Process(target=withdraw)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(money.value)
    View Code--value用法

     

     

     

        obj = Array(ctype,obj)

          功能:开辟共享内存空间

          参数: ctype 要存储的数据格式

              obj 初始化存入的内容,比如列表,字符串

                如果是数组则表示开辟空间的个数

          返回值 :返回共享内存对象

               * 可以通过遍历获取每个元素的值

               * 如果存入的是字符串,可以通过obj.value 表示字符串的首地址    

    from multiprocessing import Process,Array
    import time
    shm = Array('i', [1,2,3,4])#创建共享内存,开辟4个整形空间
    print(shm)
    def f1(shm):
        for i in shm:
            print(i)
        shm[3]=145 #
    p = Process(target=f1,args=(shm,))
    p.start()
    p.join()
    View Code---Array-列表操作

     

    from multiprocessing import Process,Array
    import time
    shm = Array('c', b'hello')
    print(shm)
    def f1(shm):
        for i in shm:
            print(i)
        shm[0]=b'H'
    p = Process(target=f1,args=(shm,))
    p.start()
    p.join()
    print(shm.value)#打印字符串
    View Code --字符操作

     

     

    开辟空间 管道内存 消息队列 共享内存
    读写方式

    两端读写

    双向/单向

    先进先出 覆盖之前内容
    效率 一般 一般 较高
    应用 多用于父子进程 广泛灵活 需要注意进行互斥操作

       

      信号通信 

        一个进程向另一个进程发送一个信号来传递某种讯息,接收者根据接收到的信号进行相应的行为

        信号见《https://www.cnblogs.com/Skyda/p/9397913.html》

        os.signal

        os.kill(pid,sig)

          功能:发送信号

          参数 :PID 目标进程

              SIG 要发送的信号

        

        import signal
        signal.alarm(sec)
          功能:向自身改善时钟信号 --》SIGALRM
          参数:sec 时钟时间

          同步执行:按照顺序逐句执行,一步完成再做下一步

          异步执行:在执行过程中利用内核记录延迟发生或者准备处理的事件。这样不影响应用层的持续执行    

             当事件发生时再由内核告知应用层处理

          * 信号是唯一的异步通信方法

          进程中只能有一个时钟,第二个会覆盖第一个时钟

        signal.pause()

          功能:阻塞等待接收一个信号

       signal.signal(signum,handler)
         功能:处理信号
         参数:signum 要处理的信号
            handler 信号的处理方法
                SIG_DFL  表示使用默认的方法处理
                SIG_IGN  表示忽略这个信号
                func   传入一个函数表示用指定函数处理
                   def func(sig,frame)
                      sig: 捕获到的信号
                      frame: 信号对象
      常用的的信号,,,
        HUP     1    终端断线
        INT     2    中断(同 Ctrl + C)
        QUIT    3    退出(同 Ctrl + )
        TERM   15    终止
        KILL    9    强制终止一个进程
        CONT   18    继续(与STOP相反, fg/bg命令)
        STOP   19    暂停(同 Ctrl + Z)    
        ALRM  14   时钟信号    
        CHLD  17  子进程状态改变时给父进程发出 【在父进程中忽略子进程状态改变,子进程退出自动系统处理】
        SIGTSTP 20  (Ctrl + Z )
     
    import signal
    import time
    signal.alarm(3)
    
    
    #异步加载到内核如果还有alarm信号,依然忽略
    signal.signal(14,signal.SIG_IGN)
    while True:
        print('1')
        time.sleep(1)
    View Code--alarm函数
    import signal
    import time
    
    def a(sig,framer):
        if sig == 14:
            print('这是时钟信号')
        elif sig == signal.SIGINT:
            print('结束不了')
    
    signal.alarm(3)
    signal.signal(signal.SIGALRM,a)
    signal.signal(signal.SIGINT,a)
    
    while True:
        time.sleep(1)
        print('1')
    View Code---用函数来解决信号
    from multiprocessing import  Process
    import os,sys
    from signal import  *
    from time import  sleep
    
    def hand_(sig,frame):
        print(sig)#获取信号并判断
        if sig == SIGINT:
            os.kill(os.getppid(),SIGUSR1)
        elif sig == SIGQUIT:
            os.kill(os.getppid(),SIGUSR2)
        elif sig == SIGUSR1:
            print('这里是exit')
            os._exit(9) #退出发出状态码
    #子进程
    def son():
        signal(SIGINT,hand_)#需要处理的信号
        signal(SIGQUIT,hand_)
        signal(SIGUSR1,hand_)
        signal(SIGTSTP,SIG_IGN)#需要忽略的信号
        while True:#不让子进程结束
            sleep(2)
    p = Process(target=son)
    p.start()
    
    
    #父进程
    def parent(sig,frame):
        if sig == SIGUSR1:
            print('开车了')
        if sig == SIGUSR2:
            print('车速有点快')
        if sig == SIGTSTP:
            os.kill(p.pid,SIGUSR1)
    
    #使用信号的时候,需要注意,主进程和子进程 都能接收到发出的信号
    #所以需要在主进程上进行信号忽略处理
    signal(SIGUSR1,parent) #需要处理的信号
    signal(SIGUSR2,parent)#需要处理的信号
    signal(SIGTSTP,parent)#需要处理的信号
    signal(SIGINT,SIG_IGN)#需要忽略的信号
    signal(SIGQUIT,SIG_IGN)#需要忽略的信号
    while True:#不让主进程结束
        pid,status = os.wait()#获取子进程退出状态码,并判断主进程退出
        if status:
            sys.exit()
    View Code ---进程之间用信号通信

    信号量(信号灯)

      原理:给定一个数量,对多个进程可见,且多个进程都可以操作。进程通过对数量多少的判断执行各自的行为

      multiprocessing -->  Semaphore

      sem = Semaphore(num)

        功能 :创建信号量

        参数: 信号量初始值

        返回:信号量对象

       sem.get_value()  获取信号量值

       sem.acquire()  将信号量减1 当信号量为0会阻塞

       sem.release()  将信号量加1

       进程的同步互斥

         临界资源:多个进程或者线程都能够操作的共享资源

         临界区: 操作临界资源的代码段

        同步:同步是一种合作关系,为完成某个任务,多进程或者多线程之间形成一种协调,按照约定或条件执行操作临界资源。

        互斥:互斥是一种制约关系,当一个进程或者线程使用临界资源时进行上锁处理,当另一个进程使用时会阻塞等待,直到解锁后才能继续使用

    Event 事件

      from multiprocessing import Event

      创建事件对象

        e = Event

      设置事件阻塞

        e.wait([timeout])

      事件设置 --当事件被设置后 e.wait()不再阻塞

        e.set()

      清除设置 ,当事件设置被clear后 e.wait又会阻塞

        e.clear()

      事件状态判断

        e.is_set()

      

    from multiprocessing import Event,Process
    from time import sleep
    def wait_(e):
        print('临界区操作')
        e.wait()
        print('这里是临界资源1')
        with open('file.txt','rb') as f:
            print(f.read())
    
    e = Event()
    p1 = Process(target=wait_,args=(e,))
    p1.start()
    print('主进程执行')
    with open('file.txt','wb') as f:
        print('这里正在写')
        f.write(b'i l looo')
    e.set()
    p1.join()
    View Code--临界区的操作

      Lock 锁

       创建对象 lock = lock()

          lock.acquire()上锁 

            如果锁已经是上锁状态调用此函数会阻塞

          lock.release()解锁

         

     
  • 相关阅读:
    ansible常用模块及参数(1)
    ansible搭建wordpress,安装nfs和rsync
    ElasticSearch 集群
    kibana数据操作
    ElasticSearch 交互使用
    Elasticsearch的基础安装
    Elasticsearch的简介
    redis数据审计
    redis数据迁移
    redis集群节点添加与删除
  • 原文地址:https://www.cnblogs.com/Skyda/p/9621384.html
Copyright © 2011-2022 走看看