zoukankan      html  css  js  c++  java
  • day 27 Python中进程的操作

    进程的创建和结束:

    multiprocess模块:

    multiprocess不是一个模块而是python中一个操作、管理进程的包

    分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    
    参数介绍:
    1 group参数未使用,值始终为None
    2 target表示调用对象,即子进程要执行的任务
    3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
    4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    5 name为子进程的名称
    1 p.start():启动进程,并调用该子进程中的p.run() 
    2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    4 p.is_alive():如果p仍然运行,返回True
    5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程  
    方法介绍
    1 p.daemon:守护进程 
        默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随            之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    2 p.name:进程的名称
    3 p.pid:进程的pid
    4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)                
    属性介绍
    在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候  ,就不会递归运行了。
    windows下操作进程

    使用process模块创建进程

    启动两个进程(可以用for循环启动多个进程)

    from multiprocessing import Process
    
    def func(name):
        print(666)
        print('我是子进程', name)
    
    if __name__ == '__main__':
        p1 = Process(target=func, args=('alex',))  # 传的必须是元组,所以alex后面要逗号
        p2 = Process(target=func, args=('bob',))  # 传的必须是元组,所以alex后面要逗号
        p1.start()  # 第一个进程
        p2.start()  # 第二个进程
        print('我是主进程'))

     join用法:

    from multiprocessing import Process
    import time
    
    
    def func(a):
        time.sleep(2)
        print('子进程', a)
    
    
    if __name__ == '__main__':
        p = Process(target=func, args=(1,))
        p.start()  # 把自己程交给CPU,就开始执行下面的代码。非阻塞态不用等
        p.join()  # 等待子进程运行完后,父进程才运行,父进程要回收子进程内存空间
        print('主进程') 
    join用法

    查看子进程和父进程 id: 子进程:os.getpid   父进程:os.getppid

    from multiprocessing import Process
    import time
    import os
    
    
    def func(a):
        time.sleep(2)
        print('子进程', os.getpid())  #  os.getpid获得子进程ID
        print('父进程', os.getppid()) # os.getppid 获得主进程ID
    
    
    if __name__ == '__main__':
        print('主进程ID', os.getpid())  # os.getpid获得主进程ID
        p = Process(target=func, args=(1,))
        p.start()

    同时启动多个子进程:

    from multiprocessing import Process
    import time
    import os
    
    def func(a):
        time.sleep(1)
        print('子进程ID', os.getpid())
    
    
    if __name__ == '__main__':
        for i in range(5): # 启动五个子进程
            p = Process(target=func, args=(1,))
            p.start()
            print('主进程ID', os.getppid())

    多个子进程中join的用法:

    from multiprocessing import Process
    import os
    
    def func(a):
        print('子进程ID%s'% a, os.getpid())
    
    
    if __name__ == '__main__':
        for i in range(5): # 启动五个子进程
            p = Process(target=func, args=(i,))
            p.start()
            # p.join() # 放这里会等待第一个子进程结束后,在执行下一个子进程。不能实现同时启动运行
        p.join() #放这里,只会等待进程4结束后,就执行下面的程序。此时可能进程1,2,3并没有执行完。无法达到阻隔子进程的效果
        print('主进程ID', os.getppid())
    简单版
    from multiprocessing import Process
    
    def func(a):
        time.sleep(0.3)
        print('子进程%s正在发发邮件' % a)
    
    
    if __name__ == '__main__':
        p_lis = []
        for i in range(5): # 启动五个子进程
            p = Process(target=func, args=(i,))
            p.start()
            p_lis.append(p)
        for i in p_lis:
            p.join()   #遍历列表,列表里面是每一个进程。都会被join阻隔
        print('所有邮件都已经发出')  # 等所有进程都给join阻隔完后,才运行
    升级版,发邮件例子

    进程间的数据隔离:

    进程间数据是相互隔离的

    from multiprocessing import Process
    n = 100
    def func():
        global n
        for i in range(5):
            n = n - 1
            print(n)  #  每一个进程执行的结果都是 95,说明子进程间彼此隔离
    if __name__ == '__main__':
        p_lst = []
        for i in range(10):
            p = Process(target=func)
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        print(n)  # 100
    数据隔离,和进程间隔离

    开启进程的另一种方式: 继承Process类

    class Myprocess(Process):
        def run(self):   
            print('子进程',os.getpid())
    
    if __name__ == '__main__':
        p = Myprocess()
        p.start()
        print('主进程', os.getppid())
    固定格式
    class Myprocess(Process):
        def __init__(self,arg):  # 用于接收参数
            super().__init__()   # 必须执行父类的init方法
            self.arg = arg
        def run(self):
            print('子进程',os.getpid(), self.arg)   # 使用参数
    
    if __name__ == '__main__':
        p = Myprocess('我是参数')  # 实例化是传输参数
        p.start()
        print('主进程', os.getppid())
    带参数形式

    多进程中的其他方法:

    p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive()) #判断进程是否存活,结果为True or False
    from multiprocessing import Process
    import time
    class Myprocess(Process):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
        def run(self):
            print('子进程',os.getpid(), self.arg)
    
    if __name__ == '__main__':
        p = Myprocess('我是参数')
        p.start()
        print(p.is_alive())  # 判断进程是否存活,返回True,False
        print('主进程', os.getppid())
        p.terminate()  # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
        time.sleep(1)
        print(p.is_alive())
    多进程中的其他方法

    守护进程:

    # 1.守护进程 会随着主进程代码的结束而结束
    # 2.守护进程不会守护除了主进程代码之外的其他子进程
    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
    p.daemon = True # 定义p为守护进程
    import time
    from multiprocessing import Process
    
    def guard():
        while True:
            print('我是守护进程,正在工作')  # 只守护主进程
            time.sleep(1)
    
    def main_main():
        print('我不知道自己是不是主进程,正在运行....')
        time.sleep(5)
        print('结束')
    
    def func():
        print('我不知道自己是不是主进程')
        time.sleep(3)
        print('func结束了')
    
    if __name__ == '__main__':
        p = Process(target=guard)
        p.daemon = True  # 定义一个守护进程
        p.start()
        print('大结局')  # 这也是主进程
        # p2 = Process(target=func())
        # p2.start()  # 如果用Process启动,则是子进程
        p1 = Process(target=main_main)  # 非主进程
        p1.start()
        func()  # 这是主进程
    守护进程
    多进程 实现socket tcp协议 server端的并发
    
    
    import socket
    from multiprocessing import Process
    
    def func(conn):
        while True:
            conn.send(b'hello')
    
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1',9000))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            p = Process(target=func,args = (conn,))
            p.start()
    server端
    
    
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))
    
    while True:
        msg = sk.recv(1024)
        print(msg)
    client端
    
    

    进程同步: (multiprocess.Lock), 抢票例子

    保证数据安全,会让程序重新变成串行,浪费时间

    # 多个进程 抢占同一个数据资源 会造成 数据不安全
    # 我们必须要牺牲效率来保证数据的安全性
    import json
    import time
    from multiprocessing import Lock
    from multiprocessing import Process
    def search(name):
        with open('ticket') as f:
            ticket_count = json.load(f)
        if ticket_count['count'] >=1:
            print('%s : 有余票%s张'%(name,ticket_count['count']))
        else:
            print('%s : 没票了'%name)
    
    def buy(name):
        with open('ticket') as f:
            ticket_count = json.load(f)
        time.sleep(0.2)
        if ticket_count['count'] >=1:
            print('有余票%s张'%ticket_count['count'])
            ticket_count['count'] -= 1
            print('%s买到票了'%name)
        else:
            print('%s没买到票' % name)
        time.sleep(0.2)
        with open('ticket','w') as f:
            json.dump(ticket_count,f)
    
    def opt(lock,name):
        search(name)
        lock.acquire()   # 拿走钥匙
        buy(name)
        lock.release()   # 归还钥匙
    
    if __name__ == '__main__':
        lock = Lock()   # 锁  互斥锁
        for i in range(10):
            p = Process(target=opt,args = (lock,'alex'+str(i),))
            p.start()
    抢票例子
    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    队列(multiprocess.Queue)

    IPC: 进程间通信

    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    队列参数

    生产者消费者模型:

    import time
    from multiprocessing import Process, Queue
    
    def consumer(name,q):
        while True:
            food = q.get()
            if not food: break
            time.sleep(1)
            print('%s吃了一个%s' % (name, food))
    
    def producer(q, food_name):
        for i in range(20):
            time.sleep(0.1)
            food='%s%s' % (food_name, i )
            print('制造了%s' % food)
            q.put(food)
    
    if __name__ == '__main__':
        q = Queue(5)
        p1 = Process(target=consumer, args=('alex',q))
        p2 = Process(target=consumer, args=('wusir',q))
        p3 = Process(target=producer, args=(q, '泔水'))
        p1.start()
        p2.start()
        p3.start()
        p2.join()
        q.put(None)
        q.put(None)
    生产消费者模型

    数据共享   Manager

    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
            d['count']-=1
    
    if __name__ == '__main__':
        lock=Lock()
        with Manager() as m:
            dic=m.dict({'count':100})
            p_l=[]
            for i in range(100):
                p=Process(target=work,args=(dic,lock))
                p_l.append(p)
                p.start()
            for p in p_l:
                p.join()
            print(dic)
    
    Manager例子

    进程池:  multiprocessing.Pool

    定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

    import os
    import time
    from multiprocessing import Pool
    
    def func(i):
        time.sleep(0.1)
        print(i,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(4)   # cpu个数 + 1/cpu的个数
        for i in range(10):
            p.apply_async(func,args=(i,))   # async异步的提交任务
        p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
        p.join()  # 阻塞,直到池子中的任务都执行完毕
    进程池
    起多进程的意义
    # 1.为了更好的利用CPU,所以如果我们的程序中都是网络IO,文件IO就不适合起多进程
    # 2.为了数据的隔离,如果我们的程序中总是要用到数据共享,那么就不适合使用多进程
    # 3.超过了cpu个数的任务数,都应该使用进程池来解决问题,而不能无限的开启子进程
    Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 initargs:是要传给initializer的参数组
    参数介绍
     p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执func(*args,**kwargs),然后返回结果。
     p.close():关闭进程池,防止进一步操作
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    主要方法

    多进程与进程池性能测试:

    import os
    import time
    from multiprocessing import Pool,Process
    
    def func(i):
        print(i,os.getpid())
    
    if __name__ == '__main__':
        start = time.time()
        p_lst = []
        for i in range(100):
            p = Process(target=func,args = (i,))
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        end = time.time()
        pro_time = end-start
        start = time.time()
        p = Pool(4)
        for i in range(100):
            p.apply_async(func,args=(i,))   # async异步的提交任务
        p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
        p.join()  # 阻塞,直到池子中的任务都执行完毕
        end = time.time()
        pool_time = end - start
        print(pro_time,pool_time)
    View Code

    同步和异步:

    import os,time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(3)
        return n**2
    
    if __name__ == '__main__':
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
                                        # 但不管该任务是否存在阻塞,同步调用都会在原地等着
        print(res_l)
    
    进程池的同步调用
    进程池的同步调用
    import os
    import time
    import random
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(random.random())
        return n**2
    
    if __name__ == '__main__':
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
                                              # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                              # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                              # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
            res_l.append(res)
    
        # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()
        p.join()
        for res in res_l:
            print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    
    进程池的异步调用
    进程池的异步调用

    回调函数:

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
    
    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
    import time
    import random
    from multiprocessing import Process,Pool
    def get(i):    # 进程池的子进程执行的
        time.sleep(random.random())
        print('从网页获取一个网页的内容', i)
        return i,'网页的内容'*i
    
    def call_back(content):   # 主进程执行的
        print(content)
    
    if __name__ == '__main__':
        # p = Process(target=get)
        # p.start()
        p = Pool(5)
        ret_l = []
        # for i in range(10):
        #     ret = p.apply_async(get,args=(i,))
        #     ret_l.append(ret)
        # for ret in ret_l:
        #     content = ret.get()
        #     print(len(content))
        for i in range(10):
            p.apply_async(get,args=(i,),callback=call_back)
        p.close()
        p.join()
    
    
    # 将n个任务交给n个进程去执行
    # 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给callback参数指定的那个函数去进行处理
    # 这样的话 所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作
    # 能在最短的时间内得到结果
    View Code



  • 相关阅读:
    FEniCS 1.1.0 发布,计算算术模型
    Piwik 1.10 发布,增加社交网站统计
    淘宝褚霸谈做技术的心态
    CyanogenMod 10.1 M1 发布
    Druid 发布 0.2.11 版本,数据库连接池
    GNU Gatekeeper 3.2 发布
    Phalcon 0.9.0 BETA版本发布,新增大量功能
    EUGene 2.6.1 发布,UML 模型操作工具
    CVSps 3.10 发布,CVS 资料库更改收集
    Opera 移动版将采用 WebKit 引擎
  • 原文地址:https://www.cnblogs.com/echo2019/p/10322009.html
Copyright © 2011-2022 走看看