zoukankan      html  css  js  c++  java
  • python并发之multiprocessing

    由于GIL(全局解释锁)的问题,python多线程并不能充分利用多核处理器。如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。multiprocessing可以给每个进程赋予单独的Python解释器,这样就规避了全局解释锁所带来的问题。与threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    Process

    单进程

    # coding: utf-8
    
    from multiprocessing import Process
    import time
    
    
    def task(msg):
        print 'hello, %s' % msg
        time.sleep(1)
    
    
    if __name__ == '__main__':
        p = Process(target=task, args=('world',))
    
        p.start()
        if p.is_alive():
            print 'Process: %s is running' % p.pid
        p.join()

    这段代码的执行过程:在主进程中创建子进程,然后调用start()启动子进程,调用join()等待子进程执行完,再继续执行主进程的整个的执行流程。 
    控制子进程进入不同阶段的是 start(), join(), is_alive(), terminate(), exitcode() 方法,这些方法只能在创建子进程的进程中执行。

    创建:创建进程需要一个 function 和相关参数,参数可以是dictProcess(target=func, args=(), kwargs = {}),name 可以用来标识进程。

    关闭:close停止接收新的任务,如果还有任务来,就会抛出异常。 join 是等待所有任务完成。 join 必须要在 close 之后调用,否则会抛出异常。

    等待:在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以在这里,我们调用了Process对象的join()方法 ,实际上等同于wait的作用。 
    对于多线程来说,由于只有一个进程,所以不存在此必要性。

    结束:terminate() 结束工作进程,不再处理未完成的任务。

    多进程

    # coding: utf-8
    
    from multiprocessing import Process
    import multiprocessing
    import time
    
    
    def task1(msg):
        print 'task1: hello, %s' % msg
        time.sleep(1)
    
    
    def task2(msg):
        print 'task2: hello, %s' % msg
        time.sleep(1)
    
    
    def task3(msg):
        print 'task3: hello, %s' % msg
        time.sleep(1)
    
    
    if __name__ == '__main__':
        p1 = Process(target=task1, args=('one',))
        p2 = Process(target=task2, args=('two',))
        p3 = Process(target=task3, args=('three',))
    
        start = time.time()
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:" + str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child p.name: " + p.name + "	p.id: " + str(p.pid))
    
        p1.join()
        p2.join()
        p3.join()
    
        end = time.time()
        print('3 processes take %s seconds' % (end - start))
    

    执行结果:

    task1: hello, one
    task2: hello, two
    task3: hello, three
    The number of CPU is:4
    child p.name: Process-1:p.id: 99359
    child p.name: Process-2:p.id: 99360
    child p.name: Process-3:p.id: 99361
    3 processes take 1.00933504105 seconds

    这里三个进程执行花费约1s,说明程序是并发执行的。对于更多的并发进程,我们可以放到一个循环中进行处理。

    Lock

    当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

    # coding: utf-8
    
    from multiprocessing import Lock, Process
    import time
    
    
    def task1(lock, f):
        with lock:
            f = open(f, 'w+')
            f.write('hello ')
            time.sleep(1)
            f.close()
    
    
    def task2(lock, f):
        lock.acquire()
        try:
            f = open(f, 'a+')
            time.sleep(1)
            f.write('world!')
        except Exception as e:
            print(e)
        finally:
            f.close()
            lock.release()
    
    
    if __name__ == '__main__':
        lock = Lock()
        fn = './file.txt'
    
        start = time.time()
        p1 = Process(target=task1, args=(lock, fn,))
        p2 = Process(target=task2, args=(lock, fn,))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
    
        end = time.time()
        print 'time cost: %s seconds' % (end - start)
    
        with open(fn, 'r') as f:
            for x in f.readlines():
                print x

    执行结果:

    time cost: 2.0059568882 seconds
    hello world!

    因为要访问共享文件,先获得锁的进程会阻塞后面的进程,因此程序运行耗时约2s。

    Semaphore

    Semaphore 和 Lock 稍有不同,Semaphore 相当于 N 把锁,获取其中一把就可以执行了。 信号量的总数 N 在构造时传入,s = Semaphore(N)。 和 Lock 一样,如果信号量为0,则进程堵塞,直到信号大于0。Semaphore可用来控制对共享资源的访问数量,例如池的最大连接数。

    # coding: utf-8
    
    from multiprocessing import Semaphore, Process
    import time
    
    
    def task(s, msg):
        s.acquire()
        print 'hello, %s' % msg
        time.sleep(1)
        s.release()
    
    
    if __name__ == '__main__':
        s = Semaphore(2)
    
        processes = []
        for x in range(8):
            p = Process(target=task, args=(s, x,))
            processes.append(p)
    
        start = time.time()
        for p in processes:
            p.start()
    
        for p in processes:
            p.join()
    
        end = time.time()
    
        print '8 process takes %s seconds' % (end - start)

    执行结果:

    hello, 0
    hello, 1
    hello, 3
    hello, 2
    hello, 4
    hello, 5
    hello, 7
    hello, 6
    8 process takes 4.00831484795 seconds

    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是Dijkstra信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器、文件这样的有限资源。

    Event

    Event用来实现进程间同步通信。

    # coding: utf-8
    
    from multiprocessing import Process, Event
    import time
    
    
    def task1(e, msg):
        print 'task1 is waitting...'
        e.wait()
        time.sleep(1)
        print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
    
    
    def task2(e, msg):
        print 'task2 is waitting...'
        e.wait(msg)
        print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())
    
    
    if __name__ == '__main__':
        e = Event()
    
        p1 = Process(target=task1, args=(e, 1))
        p2 = Process(target=task2, args=(e, 2))
    
        p1.start()
        p2.start()
    
        time.sleep(3)
    
        e.set()
        print 'main: event is set'

    执行结果:

    task1 is waitting...
    task2 is waitting...
    hello, 2, e.is_set(): False
    main: event is set
    hello, 1, e.is_set(): True

    Pool

    如果有50个task要执行,但 CPU 只有4核,我们当然可以循环创建50个进程来做这个事情,但这样处理大大增加进程管理和调度的开销。 
    如果可以只创建4个进程,让它们轮流工作完成任务,不用我们自己去管理具体的进程的创建、销毁和调度,岂不更好。multiprocessing中的 Pool 可以帮助我们做到这一点。

    多进程异步

    # coding: utf-8
    
    from multiprocessing import Pool
    import time
    
    
    def task(msg):
        print 'hello, %s' % msg
        time.sleep(1)
    
    
    if __name__ == '__main__':
        pool = Pool(processes=4)
    
        for x in range(10):
            pool.apply_async(task, args=(x,))
    
        pool.close()
        pool.join()
    
        print 'processes done.'

    注意:这里使用的是apply_async,多个进程异步执行;如果调用async,就变成阻塞版本了。 
    执行结果:

    hello, 0
    hello, 1
    hello, 2
    hello, 3
    hello, 4
    hello, 5
    hello, 6
    hello, 7
    hello, 8
    hello, 9
    10 processes take 3.09226489067 seconds

    Pool 进程池创建4个进程,不管有没有任务,都一直在进程池中等候。官网的描述:Worker processes within a Pool typically live for the complete duration of the Pool’s work queue。数据来的时候,若有空闲进程,则利用空闲的进程完成任务,直到所有任务完成为止;若没有空闲的进程,则需要等待,直到池中有进程结束。

    获取进程执行结果

    更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,我们可以通过获取apply_async的返回值得到执行结果。

    # coding: utf-8
    
    from multiprocessing import Pool
    import time
    
    
    def task(msg):
        print 'hello, %s' % msg
        time.sleep(1)
        return 'msg: %s' % msg
    
    
    if __name__ == '__main__':
        pool = Pool(processes=4)
    
        results = []
        for x in range(10):
            ret = pool.apply_async(task, args=(x,))
            results.append(ret)
    
        pool.close()
        pool.join()
    
        print 'processes done, result:'
    
        for x in results:
            print x.get()
    

    pool中的map方法

    上面我们是通过一个循环往进程池添加任务,Pool提供了更优雅的map方法来管理任务的提交,只需对上面的代码稍作修改。

    # coding: utf-8
    
    from multiprocessing import Pool
    import time
    
    
    def task(msg):
        print 'hello, %s' % msg
        time.sleep(1)
        return 'msg: %s' % msg
    
    
    if __name__ == '__main__':
        pool = Pool(processes=4)
    
        results = []
        msgs = [x for x in range(10)]
        results = pool.map(task, msgs)
    
        pool.close()
        pool.join()
    
        print 'processes done, result:'
    
        for x in results:
            print x
  • 相关阅读:
    如何创建一个基于 .NET Core 3 的 WPF 项目
    阅读源码系列
    近期计划
    Redis 源码走读(二)对象系统
    Redis 源码走读(一)事件驱动机制与命令处理
    basic paxos解析
    ThreadLocal深度解析
    MySQL InnoDB MVCC深度分析
    java泛型详解
    内存可见性,指令重排序,JIT。。。。。。从一个知乎问题谈起
  • 原文地址:https://www.cnblogs.com/kiko0o0/p/8494118.html
Copyright © 2011-2022 走看看