zoukankan      html  css  js  c++  java
  • Python-多进程

    1、多进程:

      由于Python的GIL,多线程未必是CPU密集型程序的最好选择。

      多进程可以完全独立的进程环境中运行程序,可以充分的利用多处理器。

      但是进程本身的隔离带来的数据不共享也是一个问题,而且线程比进程轻量级。

    2、multiprocessing:

      Process类遵循了Thread类的API 

      测试:开启三个进程,计算一个CPU密集型程序。

     1 import multiprocessing
     2 import datetime
     3 
     4 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
     5 def calc(i):
     6     sum = 0
     7     for _ in range(100000000):
     8         sum += 1
     9     print(i, sum)
    10 
    11 if __name__ == '__main__':
    12     start = datetime.datetime.now()
    13 
    14     ps = []
    15     for i in range(3):# 启动三个进程。
    16         p = multiprocessing.Process(target=calc, args=(i,), name='cac={}'.format(i))
    17         ps.append(p)
    18         p.start() # 分别启动三个进程。
    19     for p in ps:
    20         p.join() # 当前主进程 的主线程等待所有的子进程结束后,再退出。
    21 
    22     delta = (datetime.datetime.now() - start).total_seconds()
    23     print(delta)
    24     print('end---')
    测试:多进程
    1 1 100000000
    2 0 100000000
    3 2 100000000
    4 10.221419
    5 end---

      测试:同时CPU 密集型 ,单线程,执行3次:

     1 import multiprocessing
     2 import datetime
     3 import logging
     4 
     5 
     6 logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
     7 start = datetime.datetime.now()
     8 def calc(i):
     9     sum = 0
    10     for _ in range(100000000):
    11         sum += 1
    12     print(i, sum)
    13 
    14 calc(0)
    15 calc(1)
    16 calc(2)
    17 
    18 delta = (datetime.datetime.now() - start).total_seconds()
    19 logging.info(delta)
    测试:单线程执行三次
    1 0 100000000
    2 1 100000000
    3 6088 23.520346
    4 2 100000000

      测试:同上CPU 密集型,多线程,执行;

     1 import multiprocessing
     2 import datetime
     3 import logging
     4 import threading
     5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
     6 
     7 logging.basicConfig(level=logging.INFO, format='%(thread)s %(message)s')
     8 start = datetime.datetime.now()
     9 def calc(i):
    10     sum = 0
    11     for _ in range(100000000):
    12         sum += 1
    13     print(i, sum)
    14 
    15 t1 = threading.Thread(target=calc, args=(1,))
    16 t2 = threading.Thread(target=calc, args=(2,))
    17 t3 = threading.Thread(target=calc, args=(3,))
    18 
    19 t1.start()
    20 t2.start()
    21 t3.start()
    22 t1.join()
    23 t2.join()
    24 t3.join()
    25 
    26 
    27 delta = (datetime.datetime.now() - start).total_seconds()
    28 logging.info(delta)
    测试:多线程开启三个子线程
    2 100000000
    3 100000000
    9908 22.107265
    1 100000000

      总结:

        1、可以看到:CPU密集型,多线程和单线程执行的时间是差不多的,也就是说,由于GIL(全局解释器锁,进程锁)的存在,导致,即便是多线程,也是串行执行

        2、可以看到,多进程执行,时间要很短,而且充分利用CPU资源,每个进程调度到一个CPU上,但是并没有绑定。但是如果绑定的话,就分别在自己的绑定的CPU上运行,此外CPU绑定,是为了利用cpu缓存。

        3、注意的一点,多进程代码一定要在 __name__ == '__main__' 中执行。

                                      线程是没有如下属性或方法:

    名称 说明
    pid 进程ID
    exitcode 进程的退出状态
    terminate() 终止指定的进程

    3、进程间同步:(不怎么用,不过注意,方法,类相同,但是来自不同的模块下,线程是线程的,进程是进程的)

      Python在进程间同步提供了和线程同步一样的类,使用的方法一样,使用的效果也类似。

      不过,进程间代价要高于线程间,而且 底层实现是不同的,只不过Python屏蔽了这些不同之处,让用户简单使用多进程。

      multiprocessing 还提供共享内存,服务器进程来共享数据,还提供了用于 进程间通讯的Queue队列,Pipe管道

      通信方式不同:

        1、多进程就是启动多个解释器进程,进程间通信必须序列化,反序列化

        2、数据的线程安全性问题

          如果 每个进程中没有实现多线程,GIL 可以说没什么用了

      注:

        queue模块是给线程用的,但是进程的Queue是multiprocessing提供的,一般不用,是本机使用

        进程和进程之间通信:使用第三方Queue,如:kfakfa,而且基本都是网络通信(进程间通讯)

        多进程,就是多解释器进程

        进程间通讯:一般都要序列话,反序列化。

        进程间通讯的锁:分布式锁, zookeper

    4、进程池举例:

      multiprocessing.Pool 是进程池类

    名称 说明
    apply(self,func,args=(),kwds={}) 阻塞执行,导致主进程执行其他子进程就像一个执行

    apply_async(self,func, args=(),kwds={},

    callback=None, error_callback=None)

    与apply方法用法一致,非阻塞异步执行,得到结果后执行回调
    cose() 关闭池,池不能再接受新的任务
    terminate() 结束工作进程,不再处理未处理的任务
    join() 主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用

      测试:进程池的使用 apply_async

     1 import multiprocessing
     2 import datetime
     3 import logging
     4 import threading
     5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
     6 start = datetime.datetime.now()
     7 
     8 def calc(i):
     9     sum = 0
    10     for _ in range(100000000):
    11         sum += 1
    12     print(i, sum)
    13 
    14 if __name__ == '__main__':
    15 
    16 
    17     ps = []
    18 
    19     pool = multiprocessing.Pool(3)
    20     for i in range(3):
    21         pool.apply_async(calc, args=(i,))
    22     pool.close()
    23     pool.join()
    24 
    25     delta = (datetime.datetime.now() - start).total_seconds()
    26     print(delta)
    27     print('end---')
    4核心CPU处理三个进程
    2 100000000
    10.15358
    end---

      总结:

        可以看出,利用进程池得出的结果和上面的结果差不多的。

          有个前提要注意:

            首先测试的电脑是4核心cpu,处理三个进程不会出现不够用,如果处理的进程如果大于CPU 核心数,就会出现等待,

            但是这样,利用进程池,可以有效的使用有限资源,剩余的资源可以做别的事,但是,像上面的情况,就会出现明显的资源抢占,资源不够用。

            所以,一般推荐使用线程池。

      测试:进程池的使用 apply,可以看到,一个一个执行完,没有多进程优势存在

     1 import multiprocessing
     2 import datetime
     3 import logging
     4 import threading
     5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
     6 start = datetime.datetime.now()
     7 
     8 def calc(i):
     9     sum = 0
    10     for _ in range(100000000):
    11         sum += 1
    12     print(i, sum)
    13 
    14 if __name__ == '__main__':
    15 
    16 
    17     ps = []
    18 
    19     pool = multiprocessing.Pool(3)
    20     for i in range(3):
    21         pool.apply(calc, args=(i,))
    22     pool.close()
    23     pool.join()
    24 
    25     delta = (datetime.datetime.now() - start).total_seconds()
    26     print(delta)
    27     print('end---')
    apply
    0 100000000
    1 100000000
    2 100000000
    22.069262
    end---

      测试:回调

     1 import multiprocessing
     2 import datetime
     3 import logging
     4 import threading
     5 #   次函数是一个CPU 密集型,所以,多线程和单线程 分别计算5 次, 计算是一样的效果,至少4分钟左右
     6 logging.basicConfig(level=logging.INFO, format='%(process)s %(processName)s %(thread)s %(message)s')
     7 start = datetime.datetime.now()
     8 
     9 def calc(i):
    10     sum = 0
    11     for _ in range(100000000):
    12         sum += 1
    13     return i
    14 
    15 if __name__ == '__main__':
    16     ps = []
    17 
    18     pool = multiprocessing.Pool(3)
    19     for i in range(3):
    20         pool.apply_async(calc, args=(i,), callback=lambda x: logging.info('x ={}'.format(x)))
    21     pool.close()
    22     pool.join()
    23 
    24     delta = (datetime.datetime.now() - start).total_seconds()
    25     print(delta)
    26     print('end---')
    callback
    3848 MainProcess 7764 x =0
    3848 MainProcess 7764 x =2
    3848 MainProcess 7764 x =1
    13.521773
    end---

        回调:自己去主动调用,这里的回调就是,每个任务结束后的返回值,作为回调的参数传入。

        而使用多线程,线程的返回值 是不能被用到的。结束就结束了。

    5、多进程,多线程的选择:

      1、CPU密集型

      CPython 中使用到了GIL ,多线程的时候锁相互竞争,且多核优势不能发挥。Python多进程效率更高。

      2、IO密集型

      适合是用多线程,可以减少多进程间IO 的序列化开销,且在IO等待的视乎,切换大其他线程继续执行,效率不错。

    6、应用

      请求/ 应答 模型:WEB应用中常见的处理模型

      master 启动朵儿worker工作进程,一般 额CPU 数目相同,发挥多核优势。

      worker工作进程中,往往需要操作网络IO 和 磁盘 IO ,启动多线程,提高并发处理能力worker处理用户的请求,往往需要等待数据,处理完请求还要通过网络IO 返回响应。

      这就是Nginx 工作模式。

      worker进程开启多线程,因为要进行IO ,所以多线程相对合适。

      
      
    
    
    
            apply 是同步阻塞,不推荐使用
            apply_async :是异步非阻塞
    
            回调就是调用你的函数。调用空闲函数
    
    
            自己的任务结束后,才会执行,calback必须有一个参数,接受前面函数的返回值
    
    
            concurrent :
    
            多进程和多线程,注意是cpu密集型还是IO 密集型
    
    
    
    logging模块:
        
    为什么要坚持,想一想当初!
  • 相关阅读:
    批量替换文本的工具
    wcf异常显示错误到客户端
    文件以二进制存入数据库和从数据库读取二进制文件
    关于关系数据库的范式
    对于挑战书上的很久之前都看不懂的DP看懂的突破
    操作系统概念
    关于P,V操作理解的突破,关于并发设计与并行
    关于快速沃尔什变换
    我觉得我应该养成经常翻收藏夹的习惯
    目前我的思考模式
  • 原文地址:https://www.cnblogs.com/JerryZao/p/9873664.html
Copyright © 2011-2022 走看看