zoukankan      html  css  js  c++  java
  • 多进程

    多进程

      同一时刻并行的处理多个任务,即为多进程。比如,你一边喝茶、看书还听着音乐。真正的并行多任务只能在多核的CPU上实现,由于任务数量是远远多于CPU的核数,所以操作系统会自动将多任务短时间轮流切换执行,给我们的感觉就像同时在执行一样。

      进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。编写的代码没有运行叫程序,正在运行的代码就是进程。

    fork

      Python中可以使用os模块fork()函数来创建子进程。程序执行os.fork()时,操作系统会创建一个子进程,然后复制父进程的所有信息到子进程中;调用一次os.fork()时,会返回两次值。返回给子进程的值一定是0,返回给父进程的是子进程中的pid号。返回给父进程pid(子进程的)号,是因为父进程可以fork出多个子进程,所以有必要记住子进程的pid号。

    import os
    
    print("当前进程pid= %d"% os.getpid())
    num = 0
    pid = os.fork()
    if pid == 0:
        print("我是子进程: %s,父进程是:%s"%(os.getpid(),os.getppid()))
        num += 1
        print("num = %d"%num)
    else:
        print("我是父进程:%s,我的子进程是:%s"%(os.getpid(),pid))
        num += 1
        print("num = %d"%num)
    
    print("父、子进程都可以执行")
    
    输出:
    当前进程pid= 4262
    我是父进程:4262,我的子进程是:4263
    num = 1
    父、子进程都可以执行
    我是子进程: 4263,父进程是:4262
    num = 1
    父、子进程都可以执行

    多进程中,每个进程都各自拥有一份,互不影响,如上num = 1。

    multiprocessing

      由于fork函数存在于Linux、Unix、Mac操作系统中,Windows操作系统无fork函数调用,Python作为一个跨平台的语言,使用multiprocessing模块封装fork函数来创建多进程。multiprocessing提供一个Process类来代表一个进程对象。

    Process使用:

    Process([group [, target [, name [, args [, kwargs]]]]])


        target:表示这个进程实例所调用对象;

        args:表示调用对象的位置参数元组;

        kwargs:表示调用对象的关键字参数字典;

        name:为当前进程实例的别名;

        group:大多数情况下用不到;

    Process类常用方法:

        is_alive():判断进程实例是否还在执行;

        join([timeout]):是否等待进程实例执行结束,或等待多少秒;

        start():启动进程实例(创建子进程);

        run():如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法;

        terminate():不管任务是否完成,立即终止;

    Process类常用属性:

        name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数;

        pid:当前进程实例的PID值;

    from multiprocessing import Process
    import os
    import time
    
    def run_proc(name,age,**kwargs):
        for i in range(10):
            print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid()))
            print(kwargs)
            time.sleep(1)
    
    if __name__ == '__main__':
        print('父进程 %d'%os.getpid())
        p = Process(target=run_proc,args=('MI',18),kwargs={'S':99})
        print('将要执行子进程')
        p.start()
        time.sleep(2)
        p.terminate()
        p.join()
        print('子进程结束了')
    
    输出:
    父进程 4460
    将要执行子进程
    子进程运行中,name= MI,age=18,pid=4461
    {'S': 99}
    子进程运行中,name= MI,age=18,pid=4461
    {'S': 99}
    子进程结束了

       Process创建子进程,只需要传入一个函数和函数参数,创建一个Process实例对象,然后用start方法启动,这样创建比fork()更简单。join()方法可以等待子进程结束后主进程继续往下执行,通常用于进程同步。

    from multiprocessing import Process
    import os
    import time
    import signal
    
    def run_proc(name,age,**kwargs):
        for i in range(10):
            print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid()))
            print(kwargs)
            time.sleep(0.1)
    
    if __name__ == '__main__':
        print('父进程 %d'%os.getpid())
        p = Process(target=run_proc,args=('MI',18),kwargs={'S':99})
        print('将要执行子进程')
        p.start()
        time.sleep(0.2)
        os.killpg(os.getpid(),signal.SIGKILL)  #杀死父进程
        print('子进程结束了')
    
    输出:
    父进程 3791
    将要执行子进程
    子进程运行中,name= MI,age=18,pid=3792
    {'S': 99}
    子进程运行中,name= MI,age=18,pid=3792
    {'S': 99}
    已杀死

      Process中主进程会默认等待子进程执行完后继续执行父进程,如果父进程退出或者异常被终止了,子进程也会退出运行。当进程中处理的事物比较复杂,一个函数不能完成时,我们可以让一个类去继承Process实现要处理的任务。

    进程池Pool

      当创建的子进程不多的时候,可以使用Process动态成生多个进程,如果需要创建成百上千时,手动创建工作量大,最主要的是不断创建和删除子进程调度系统分配资源很耗时间。所以我们可以创建一个进程池,预先放一些进程进去,要用的时候就直接调用,用完之后再把进程归还给进程池,省下创建删除进程的时间,提高了效率。

    multiprocessing.Pool常用函数:

    •     apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执  行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
    •     apply(func[, args[, kwds]]):使用阻塞方式调用func
    •     close():关闭Pool,使其不再接受新的任务;
    •     terminate():不管任务是否完成,立即终止;
    •     join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
    from multiprocessing import Pool
    import os,time,random
    
    def worker(msg):
        tStart = time.time()
        print("%s开始执行,进程号为%d"%(msg,os.getpid()))
        time.sleep(random.random()*2)
        tStop = time.time()
        print(msg,"执行完毕,耗时%0.2f"%(tStop-tStart))
    
    if __name__ == '__main__':
    
        #创建一个进程池,最大进程数3
        po = Pool(3)
        for i in range(0,5):
            po.apply_async(worker,(i,))
        print('----start----')
        #关闭进程池,不再添加新的请求
        po.close()
        #必须放在close()之后,等待po中所有子进程执行完毕
        po.join()
        print('----end----')
    
    输出:
    ----start----
    0开始执行,进程号为4049
    1开始执行,进程号为4050
    2开始执行,进程号为4051
    1 执行完毕,耗时0.96
    3开始执行,进程号为4050
    0 执行完毕,耗时0.98
    4开始执行,进程号为4049
    2 执行完毕,耗时1.30
    4 执行完毕,耗时0.77
    3 执行完毕,耗时1.11
    ----end----

      初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

    if __name__ == '__main__':
    
        #创建一个进程池,最大进程数3
        po = Pool(3)
        for i in range(0,5):
            po.apply_async(worker,(i,))
        print('----start----')
        #关闭进程池,不再添加新的请求
        #po.close()
        #必须放在close()之后,等待po中所有子进程执行完毕
        #po.join()
        print('----end----')
    
    输出:
    ----start----
    ----end----

      如果我们在进程池之后没有添加join()会导致进程池中的任务不被执行。主进程创建/添加任务后,主进程默认不会等待进程池中的任务执行完后才结束,而是当主进程的任务做完之后立马结束。

    进程通信Queue

      Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。

     Queue使用:

      初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
        Queue.qsize():返回当前队列包含的消息数量;  
        Queue.empty():如果队列为空,返回True,反之False ;
        Queue.full():如果队列满了,返回True,反之False;
        Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
      1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如    果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
      2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
        Queue.get_nowait():相当Queue.get(False);
        Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
      1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾  出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
      2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
          Queue.put_nowait(item):相当Queue.put(item, False);

    from multiprocessing import Process,Queue
    import os,time,random
    
    
    def write(q):
        print('Process to write: %s '%os.getpid())
        for value in ['A','B','C']:
            print('put %s to queue--'%value)
            q.put(value)
            time.sleep(random.random())
    
    def read(q):
        print('Process to read: %s '%os.getpid())
        while True:
                value = q.get(True)
                print('get %s from queue '%value)
    
    if __name__ == '__main__':
        q = Queue()
        pw = Process(target=write,args=(q,))
        pr = Process(target=read,args=(q,))
        pw.start()
        pr.start()
        pw.join()
        pr.terminate()
        print('Complete reading and writing')
    
    输出:
    Process to write: 4707 
    put A to queue--
    Process to read: 4708 
    get A from queue 
    put B to queue--
    get B from queue 
    put C to queue--
    get C from queue 
    Complete reading and writing

    参考:

    https://blog.csdn.net/sayhello_world/article/details/72829329

    https://blog.csdn.net/hello_bravo_/article/details/52528283

  • 相关阅读:
    3G来临,程序员你准备好了吗?
    如何从开发人员走向架构师
    中国营销六大怪
    什么是3G
    未来五年中国高端软件人才缺口20万
    项目需求分析的20条法则
    让好的销售习惯提升你的业绩
    有关精通时间管理的最佳理念
    李嘉诚:让员工忠诚的简单办法
    手机软件开发人才严重短缺
  • 原文地址:https://www.cnblogs.com/jsnhdream/p/10068495.html
Copyright © 2011-2022 走看看