zoukankan      html  css  js  c++  java
  • 快速了解Python并发编程的工程实现(下)

    关于我
    一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。
    Github:https://github.com/hylinux1024
    微信公众号:终身开发者(angrycode)

    0x00 使用进程实现并发

    上一篇文章介绍了线程的使用。然而Python中由于Global Interpreter Lock(全局解释锁GIL)的存在,每个线程在在执行时需要获取到这个GIL,在同一时刻中只有一个线程得到解释锁的执行,Python中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。
    如果要充分利用现代多核CPU的并发能力,就要使用multipleprocessing模块了。

    0x01 multipleprocessing

    与使用线程的threading模块类似,multipleprocessing模块提供许多高级API。最常见的是Pool对象了,使用它的接口能很方便地写出并发执行的代码。

    from multiprocessing import Pool
    
    def f(x):
        return x * x
    
    if __name__ == '__main__':
        with Pool(5) as p:
            # map方法的作用是将f()方法并发地映射到列表中的每个元素
            print(p.map(f, [1, 2, 3]))
    
    # 执行结果
    # [1, 4, 9]
    

    关于Pool下文中还会提到,这里我们先来看Process

    Process

    要创建一个进程可以使用Process类,使用start()方法启动进程。

    from multiprocessing import Process
    import os
    
    def echo(text):
        # 父进程ID
        print("Process Parent ID : ", os.getppid())
        # 进程ID
        print("Process PID : ", os.getpid())
        print('echo : ', text)
    
    if __name__ == '__main__':
        p = Process(target=echo, args=('hello process',))
        p.start()
        p.join()
        
    # 执行结果
    # Process Parent ID :  27382
    # Process PID :  27383
    # echo :  hello process
    
    进程池

    正如开篇提到的multiprocessing模块提供了Pool类可以很方便地实现一些简单多进程场景。
    它主要有以下接口

    • apply(func[, args[, kwds]])
      执行func(args,kwds)方法,在方法结束返回前会阻塞。
    • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
      异步执行func(args,kwds),会立即返回一个result对象,如果指定了callback参数,结果会通过回调方法返回,还可以指定执行出错的回调方法error_callback()
    • map(func, iterable[, chunksize])
      类似内置函数map(),可以并发执行func,是同步方法
    • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
      异步版本的map
    • close()
      关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
    • terminate()
      终止进程池
    • join()
      等待工作进程执行完,必需先调用close()或者terminate()
    from multiprocessing import Pool
    
    def f(x):
        return x * x
    
    if __name__ == '__main__':
        with Pool(5) as p:
            # map方法的作用是将f()方法并发地映射到列表中的每个元素
            a = p.map(f, [1, 2, 3])
            print(a)
            # 异步执行map
            b = p.map_async(f, [3, 5, 7, 11])
            # b 是一个result对象,代表方法的执行结果
            print(b)
            # 为了拿到结果,使用join方法等待池中工作进程退出
            p.close()
            # 调用join方法前,需先执行close或terminate方法
            p.join()
            # 获取执行结果
            print(b.get())
    
    # 执行结果
    # [1, 4, 9]
    # <multiprocessing.pool.MapResult object at 0x10631b710>
    # [9, 25, 49, 121]
    

    map_async()apply_async()执行后会返回一个class multiprocessing.pool.AsyncResult对象,通过它的get()可以获取到执行结果,ready()可以判断AsyncResult的结果是否准备好。

    进程间数据的传输

    multiprocessing模块提供了两种方式用于进程间的数据共享:队列(Queue)和管道(Pipe)

    Queue是线程安全,也是进程安全的。使用Queue可以实现进程间的数据共享,例如下面的demo中子进程put一个对象,在主进程中就能get到这个对象。
    任何可以序列化的对象都可以通过Queue来传输。

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        # 使用Queue进行数据通信
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        # 主进程取得子进程中的数据
        print(q.get())  # prints "[42, None, 'hello']"
        p.join()
    
    # 执行结果
    # [42, None, 'hello']
    

    Pipe()返回一对通过管道连接的Connection对象。这两个对象可以理解为管道的两端,它们通过send()recv()发送和接收数据。

    from multiprocessing import Process, Pipe
    
    def write(conn):
        # 子进程中发送一个对象
        conn.send([42, None, 'hello'])
        conn.close()
    
    def read(conn):
        # 在读的进程中通过recv接收对象
        data = conn.recv()
        print(data)
    
    if __name__ == '__main__':
        # Pipe()方法返回一对连接对象
        w_conn, r_conn = Pipe()
    
        wp = Process(target=write, args=(w_conn,))
        rp = Process(target=read, args=(r_conn,))
    
        wp.start()
        rp.start()
    
    # 执行结果
    # [42, None, 'hello']
    
    

    需要注意的是,两个进程不能同时对一个连接对象进行sendrecv操作。

    同步

    我们知道线程间的同步是通过锁机制来实现的,进程也一样。

    from multiprocessing import Process, Lock
    import time
    
    def print_with_lock(l, i):
        l.acquire()
        try:
            time.sleep(1)
            print('hello world', i)
        finally:
            l.release()
    
    def print_without_lock(i):
        time.sleep(1)
        print('hello world', i)
    
    if __name__ == '__main__':
        lock = Lock()
    
        # 先执行有锁的
        for num in range(5):
            Process(target=print_with_lock, args=(lock, num)).start()
        # 再执行无锁的
        # for num in range(5):
        #     Process(target=print_without_lock, args=(num,)).start()
    
    

    有锁的代码将每秒依次打印

    hello world 0
    hello world 1
    hello world 2
    hello world 3
    hello world 4
    

    如果执行无锁的代码,则在我的电脑上执行结果是这样的

    hello worldhello world  0
    1
    hello world 2
    hello world 3
    hello world 4
    

    除了Lock,还包括RLockConditionSemaphoreEvent等进程间的同步原语。其用法也与线程间的同步原语很类似。API使用可以参考文末中引用的文档链接。
    在工程中实现进程间的数据共享应当优先使用队列或管道。

    0x02 总结

    本文对multiprocessing模块中常见的API作了简单的介绍。讲述了ProcessPool的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。
    通过与上篇的对比学习,本文的内容应该是更加容易掌握的。

    0x03 引用

  • 相关阅读:
    又见Python<4>:Pandas之DataFrame对象的使用
    又见Python<3>:Pandas之Series对象的使用
    使用tdload工具将本地数据导入到Teradata数据库中
    解决ubuntu系统root用户下Chrome无法启动问题
    又见Python<2>:如何安装第三方库(Windows)
    又见Python<1>:使用Anaconda搭建Python开发环境(Windows7)
    数据仓库原理<4>:联机分析处理(OLAP)
    数据仓库原理<3>:数据仓库与ODS
    配置hibernate,Struts。文件
    jQuery +ajax +json+实现分页
  • 原文地址:https://www.cnblogs.com/angrycode/p/11391635.html
Copyright © 2011-2022 走看看