zoukankan      html  css  js  c++  java
  • python 进程

    导航:

    1、创建进程的两种方式
    2、Process的方法
    3、进程间的通讯1,进程队列Queue--先进先出
    4、进程间的通讯2,管道通讯 Pipe
    5、进程间的数据共享,Manager
    6、多进程同步问题
    7、进程池Pool

    python中多进程可以解决cpython解释器多线程中GIL存在的问题,可以利用CPU的多核资源,实现真的并发效果。操作系统中每个线程有自己的内存空间,数据并不共享。

    python中使用multiprocessing包提供的接口给我们创建多进程,multiprocessing与threading的使用方法相似。

    1、创建进程的两种方式

    1)通过multiprocessing.Process创建

      Process(group=None, target=None, name=None, args=(), kwargs={})

    • group    线程组,没什么用,默认为空就好
    • target    要执行的方法
    • name    进程的名字
    • args/kwargs 执行target方法要传入的参数
     1 def fn(word):
     2     # ------子进程的逻辑-----
     3     print("父进程", os.getppid())
     4     print("子进程", os.getpid())
     5     print("传入的参数:", word)
     6 
     7 
     8 if __name__ == "__main__":
     9     p = Process(target=fn, args=("haha",))  # 创建子进程
    10     p.start()  # 开启子进程
    11     print("主进程", os.getpid())
    12 
    13 
    14 # 输出结果
    15 主进程 5916
    16 父进程 5916
    17 子进程 2936
    18 传入的参数: haha
    通过Process创建一个子进程

    很简单这样就可以创建一个子进程了,可以看第3 4 11行打印的进程id可以知道这是不同的进程。

    这里需要说明的是:在所有的进程当中子进程都是由父进程创建出来的,在这个例子中,子进程的的父进程就是主进程,可以看到第3 11行打印的进程id。

    在windows下,创建进程一定要放在__main__中,不然会报错

    2)通过继承Process类创建子进程

     1 from multiprocessing import Process
     2 import os
     3 
     4 class MyProcess(Process):
     5     def __init__(self, name):
     6         super().__init__()  # 如果重写了初始化方法,在初始化方法中一定要调用父类的初始化方法
     7         self.name = name
     8 
     9     def run(self):
    10         # ------子进程的逻辑-----
    11         print("父进程", os.getppid())
    12         print("子进程", os.getpid())
    13         print("子进程的名字", self.name)
    14 
    15 
    16 if __name__ == "__main__":
    17     p = MyProcess("haha")  # 创建子进程
    18     p.start()  # 开启子进程
    19     print("主进程", os.getpid())
    20 
    21 
    22 # 输出结果
    23 主进程 740
    24 父进程 740
    25 子进程 7052
    26 子进程的名字 haha
    通过继承Process创建子进程

    通过继承Process创建子进程需要重写 run 方法,这是子进程逻辑的入口,当开启子进程时会自动调用这个方法

    上边这两种方式中,主进程执行完所有的逻辑后会等待子进程结束在一起结束,与fork函数创建的方式不一样

    2、Process的方法

    1)is_alive()

    is_alive方法判断指定对象进程的存活状态。进程 start 后一直到该进程结束都返回True。进程 start 前 或进程已经结束返回False

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(1)  # 模拟子进程执行需要消耗的时间
     7 
     8 
     9 if __name__ == "__main__":
    10     p = Process(target=fn)  # 创建子进程
    11     print("p.start前-->", p.is_alive())
    12     p.start()  # 开启子进程
    13     print("p.start后-->", p.is_alive())
    14     time.sleep(2)  # 模拟主进程执行需要消耗的时间,为了确保子进程先结束
    15     print("子进程结束后-->", p.is_alive())
    16 
    17 
    18 # 输出结果
    19 p.start前--> False
    20 p.start后--> True
    21 子进程结束后--> False
    is_alive方法

    2)join()

    join(timeout=None)方法,堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行。可设置timeout值,最多堵塞timeout时间(秒)。注意:join方法只能在start()后才可以使用

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(3)  # 模拟子进程执行需要消耗的时间
     7     print("子进程中-->", time.ctime())
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.start()  # 开启子进程
    13     print("p.join前-->", time.ctime())
    14     p.join()  # 堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行
    15     print("p.join后-->", time.ctime())
    16 
    17 
    18 # 输出内容
    19 p.join前--> Sat Sep 29 16:03:40 2018
    20 子进程中--> Sat Sep 29 16:03:43 2018
    21 p.join后--> Sat Sep 29 16:03:43 2018
    join方法

    这里可以看到第15行语句一直等到第7行执行完才输出

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
     7     print("子进程中-->", time.ctime())
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.start()  # 开启子进程
    13     print("p.join前-->", time.ctime())
    14     p.join(1)  # 设置超时1秒
    15     print("p.join后-->", time.ctime())
    16 
    17 
    18 # 输出内容
    19 p.join前--> Sat Sep 29 16:04:02 2018
    20 p.join后--> Sat Sep 29 16:04:03 2018
    21 子进程中--> Sat Sep 29 16:04:04 2018
    join设置timeout值

    这里可以看到第15行语句只堵塞了1秒的时间

    3)start(),进程准备就绪,等待cpu的执行(调度)

    4)run(),继承Process类的子类,需要重写的方法,当进程对象调用 start 方法时自动执行 run 方法,也是进程的入口

    5)terminate(),不管进程是否执行完,直接终止进程

    6)daemon属性True/False

    与线程的setDeamon()一样。将该进程对象设置为守护进程,效果:父进程将不再等待子进程,父进程结束时,子进程一起结束。注意:daemon属性只能在 start() 前设置

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
     7     print("----------")
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.daemon = True
    13     p.start()  # 开启子进程
    14     print("++++++++++")
    15 
    16 
    17 # 输出内容
    18 ++++++++++
    daemon属性

    子进程中的第7行语句并没有执行,即:子进程在父进程结束时也跟着结束了

    3、进程间的通讯1,进程队列Queue--先进先出

    Queue(maxsize=-1),maxsize=-1默认队列长度没有最大值,maxsize=5表示队列长度最大值为5

    1)put(obj, block=True, timeout=None)

    • obj  添加进队列的值,可以添加任意类型的值
    • block 默认为True,当队列满时,继续添加则发生堵塞,直到队列get()值出去;block=False,队列满时继续添加不堵塞,但会抛出queue.Full异常
    • timeout  堵塞超时,当队列满时,继续添加发生堵塞,堵塞超时timeout秒,超时则会抛出queue.Full异常
     1 >>> from multiprocessing import Process, Queue
     2 >>> q = Queue(3)  # 创建进程队列,队列最大长度为3
     3 >>> q.put("haha")  # 往队列添加字符串
     4 >>> q.put([])  # 往队列添加列表
     5 >>> q.put({})  # 往队列添加字典
     6 >>> q.put(5)  # 往队列添加数字
     7 _  # 满时继续添加,发生堵塞(光标会一直卡在这)
     8 
     9 >>> q.put(5,block=False)  # 往队列添加数字,不堵塞添加
    10 Traceback (most recent call last):
    11   File "<stdin>", line 1, in <module>
    12   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put
    13     raise Full
    14 queue.Full
    15 
    16 >>> q.put(5, timeout=3)  # 往队列添加数字,并设置超时3秒
    17 Traceback (most recent call last):
    18   File "<stdin>", line 1, in <module>
    19   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put
    20     raise Full
    21 queue.Full
    put方法简单使用,这里还不涉及进程间的通讯

    2)get(block=True, timeout=None)  使用方法与put一样的用法,返回先添加的值

     1 >>> q.get()
     2 'haha'
     3 >>> q.get()
     4 []
     5 >>> q.get()
     6 {}
     7 >>> q.get()
     8 _  # 为空时继续取值,发生堵塞(光标会一直卡在这)
     9 
    10 >>> q.get(timeout=3)
    11 Traceback (most recent call last):
    12   File "<stdin>", line 1, in <module>
    13   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 105, in get
    14     raise Empty
    15 _queue.Empty
    get方法简单使用,这里还不涉及进程间的通讯

    3)qsize()  返回当前队列的长度

     1 >>> from multiprocessing import Queue
     2 >>> q = Queue(3)
     3 >>> q.put("haha")
     4 >>> q.qsize()
     5 1
     6 >>> q.put(100)
     7 >>> q.qsize()
     8 2
     9 >>> q.get()
    10 'haha'
    11 >>> q.qsize()
    12 1
    qsize返回当前队列长度,这里还不涉及进程间的通讯

    4)put_nowait(obj)  相当于put(obj, block=False)

    5)get_nowait()  相当于get(block=False)

    6)empty()   判断队列是否为空,True:空,False:不为空

    7)full()   判断队列是否已满,True:已满,False:未满

    8)close()  关闭队列,关闭后,将不能添加或取出值

    9)通过队列实现多个进程间的通讯

     1 from multiprocessing import Process, Queue
     2 import time
     3 
     4 def write_queue(q):
     5     for i in range(5):
     6         q.put(i)  # 往队列添加值
     7         print("put %d" % i)
     8 
     9 
    10 def read_queue(q):
    11     time.sleep(1)  # 确保write_queue中队列已经有值
    12     while not q.empty():
    13         s = q.get()  # 取出队列的值
    14         print("get %s" % s)
    15 
    16 
    17 if __name__ == "__main__":
    18     q = Queue(5)
    19     p1 = Process(target=write_queue, args=(q,))  # 创建子进程
    20     p2 = Process(target=read_queue, args=(q,))  # 创建子进程
    21     p1.start()  # 开启子进程
    22     p2.start()  # 开启子进程
    23 
    24 
    25 # 输出结果
    26 put 0
    27 put 1
    28 put 2
    29 put 3
    30 put 4
    31 get 0
    32 get 1
    33 get 2
    34 get 3
    35 get 4
    通过队列的多进程间通信,target
     1 from multiprocessing import Process, Queue
     2 import time
     3 
     4 class WriteProcess(Process):
     5     def __init__(self, q):
     6         super().__init__()
     7         self.q = q
     8 
     9     def run(self):
    10         print("子进程WriteProcess-->")
    11         for i in range(5):
    12             self.q.put(i)
    13             print("put %d" % i)
    14 
    15 
    16 class ReadProcess(Process):
    17     def __init__(self, q):
    18         super().__init__()
    19         self.q = q
    20 
    21     def run(self):
    22         time.sleep(1)  # 确保write_queue中队列已经有值
    23         print("子进程ReadProcess-->")
    24         while not self.q.empty():
    25             s = self.q.get()
    26             print("get %s" % s)
    27 
    28 
    29 if __name__ == "__main__":
    30     q = Queue(5)
    31     p1 = WriteProcess(q)  # 创建子进程
    32     p2 = ReadProcess(q)  # 创建子进程
    33     p1.start()  # 开启子进程
    34     p2.start()  # 开启子进程
    35 
    36 
    37 # 输出结果
    38 子进程WriteProcess-->
    39 put 0
    40 put 1
    41 put 2
    42 put 3
    43 put 4
    44 子进程ReadProcess-->
    45 get 0
    46 get 1
    47 get 2
    48 get 3
    49 get 4
    通过队列的多进程间通讯--run方法

    4、进程间的通讯2,管道通讯  Pipe

    1)Pipe(duplex=True)

    Pipe是一个函数,返回元组(Connection(), Connection()).   即返回管道的两端。默认duplex=True为全双工模式,duplex=Fasle中第一个Connection只能接收信息,第二个Connection只能发送消息

    2)Connection常用方法

    • send(obj)     将对象obj发送到管道另一端,发送的数据必须是可序列化的对象。
    • recv()           从管道的另一端接收send()发送的数据。没有数据可接收,将发生堵塞。
    • send_bytes(buffer, offset=-1, size=-1)    发送字节缓冲区,buffer是支持支持字节缓冲的任意对象,offset为buffer的字节偏移量(可以当初下标),size为要发送的字节数。
    • recv_bytes(maxlength=-1)    接收send_bytes()发送的一次数据,maxlength指定接收长度,超出这个长度则抛出将引发IOError异常,没有数据可接收,将发生堵塞。
    • poll([timeout])      返回 True/False,判断管道内是否有数据可以接收,True:数据可接收。timeout为堵塞的时间秒,timeout=None时一直堵塞,直到有数据可以接收
    • close()        关闭链接,关闭链接后将不能继续使用管道,当不再使用管道时可将其关闭
     1 from multiprocessing import Process, Pipe
     2 import time
     3 
     4 class MyProcess1(Process):
     5     def __init__(self, con):
     6         super().__init__()
     7         self.con = con
     8 
     9     def run(self):
    10         self.con.send("12345")
    11         print("MyProcess1--send-->", "12345")
    12         msg = self.con.recv_bytes().decode(encoding="utf-8")
    13         print("MyProcess1--recv_bytes-->", msg)
    14 
    15 
    16 class MyProcess2(Process):
    17     def __init__(self, con):
    18         super().__init__()
    19         self.con = con
    20 
    21     def run(self):
    22         msg = self.con.recv()
    23         print("MyProcess2--recv-->", msg)
    24         self.con.send_bytes("哈哈".encode("UTF-8"))
    25         print("MyProcess2--send_bytes-->", "哈哈")
    26 
    27 
    28 if __name__ == "__main__":
    29     con1, con2 = Pipe()
    30     p1 = MyProcess1(con1)  # 创建子进程
    31     p2 = MyProcess2(con2)  # 创建子进程
    32     p1.start()  # 开启子进程
    33     p2.start()  # 开启子进程
    34 
    35 
    36 # 输出结果
    37 MyProcess1--send--> 12345
    38 MyProcess2--recv--> 12345
    39 MyProcess2--send_bytes--> 哈哈
    40 MyProcess1--recv_bytes--> 哈哈
    多进程间的管道通讯

    5、进程间的数据共享,Manager

    Manager提供多进程间的数据共享,Manager内的主要方法有  dict(mapping_or_sequence), list(sequence), Value(typecode, value), Array(typecode, sequence) 

     

     1 from multiprocessing import Process, Manager
     2 
     3 def fn(dic, li, a):
     4     dic['count'] -= 1
     5     print("子进程-->a", a.value)  # a 为Value对象,通过a.value将值取出、修改
     6     print("子进程-->li", li)
     7 
     8 
     9 if __name__ == '__main__':
    10     with Manager() as manager:
    11         dic = manager.dict({'count': 1000})  # 字典
    12         li = manager.list([1, 2, 3, 4])  # 列表
    13         a = manager.Value("int", 100)  #  传值
    14         p = Process(target=fn, args=(dic, li, a))
    15         p.start()
    16         p.join()
    17         print("主进程-->dic", dic)  # 打印子进程修改后的内容
    18 
    19 
    20 # 输出结果
    21 子进程-->a 100
    22 子进程-->li [1, 2, 3, 4]
    23 主进程-->dic {'count': 0}
    多进程数据共享Manager

     

    6、多进程同步问题

    当多个进程同时访问同一个资源时就得涉及到同步的问题,如:向控制台打印数据,必须要确保同一时刻只能有一个进程在打印数据,否则将会出现乱序的效果;多进程同时对一个文件进行读写等

     1 from multiprocessing import Process, Lock
     2 import time
     3 
     4 def fn(lock):
     5     # ------子进程的逻辑-----
     6     # lock.acquire()  # 未加锁
     7     for i in range(3):
     8         print("12", end="")
     9         time.sleep(0.005)  # 为了看到效果,添加的延迟
    10         print("34")
    11     # lock.release()
    12 
    13 
    14 if __name__ == "__main__":
    15     lock = Lock()
    16     for i in range(3):
    17         p = Process(target=fn, args=(lock,))  # 创建子进程
    18         p.start()  # 开启子进程
    19 
    20 
    21 # 输出结果
    22 121234
    23 1234
    24 1234
    25 121234
    26 1234
    27 34
    28 1234
    29 34
    30 1234
    未加锁
     1 from multiprocessing import Process, Lock
     2 import time
     3 
     4 def fn(lock):
     5     # ------子进程的逻辑-----
     6     lock.acquire()  # 加锁
     7     for i in range(3):
     8         print("12", end="")
     9         time.sleep(0.005)  # 为了看到效果,添加的延迟
    10         print("34")
    11     lock.release()  # 释放锁
    12 
    13 
    14 if __name__ == "__main__":
    15     lock = Lock()
    16     for i in range(3):
    17         p = Process(target=fn, args=(lock,))  # 创建子进程
    18         p.start()  # 开启子进程
    19 
    20 
    21 # 输出结果
    22 1234
    23 1234
    24 1234
    25 1234
    26 1234
    27 1234
    28 1234
    29 1234
    30 1234
    加锁后

    7、进程池Pool

     

     当任务不多时可以可以自己手动创建进程,但任务量大且任务量多的时候手动创建进程的工作量就会比较大,而多次创建进程消耗较大的资源,这个时候进程池就是一种比较好的解决方法。

    在程序一开始就创建好进程池指定进程的数量,当任务来临的时候就从进程池里拿出一个进程,任务完成后进程不会被销毁,而是继续回到进程池等等下一个任务。

    当任务数量大于进程数时,将按照任务添加的先后顺序执行任务,未执行的任务将在任务列表中等待。进程池中的进程用法跟上边的一致。

     

    Pool()创建进程池的时候可以指定进程池里的进程数量,不指定的将默认为cpu的核心数。

    主要方法介绍

     apply_async(func, args=(), kwds={}, callback=None, error_callback=None)   func:任务执行的方法入口;callback:任务执行完毕后执行的回调函数,回调函数执行在父进程里边,即调用apply_async的进程;error:任务执行出错后执行的回调函数

     apply(func, args=(), kwds={})

    close()   关闭进程池,关闭后将不能再继续添加任务

    join()     等所有进程结束后再退出

     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     8     time.sleep(0.01)
     9 
    10 
    11 if __name__ == "__main__":
    12     pool = Pool(4)
    13     for i in range(12):
    14         # pool.apply(fn, (i,))  # 同步方式
    15         pool.apply_async(fn, (i,))  # 异步方式
    16     pool.close()
    17     pool.join()
    18 
    19 
    20 # 输出结果
    21 进程pid-->  1176,参数-->  0
    22 进程pid-->  1556,参数-->  1
    23 进程pid-->  8864,参数-->  2
    24 进程pid-->  1176,参数-->  3
    25 进程pid-->  1556,参数-->  4
    26 进程pid-->  9552,参数-->  5
    27 进程pid-->  8864,参数-->  6
    28 进程pid-->  1556,参数-->  7
    29 进程pid-->  1176,参数-->  8
    30 进程pid-->  9552,参数-->  9
    31 进程pid-->  8864,参数--> 10
    32 进程pid-->  1176,参数--> 11
    进程池apply_async()异步方式
     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     8     time.sleep(1)
     9 
    10 
    11 if __name__ == "__main__":
    12     pool = Pool(4)
    13     for i in range(12):
    14         pool.apply(fn, (i,))  # 同步方式
    15         # pool.apply_async(fn, (i,))  # 异步方式
    16     pool.close()
    17     pool.join()
    18 
    19 
    20 # 输出结果
    21 进程pid--> 15056,参数-->  0
    22 进程pid--> 14380,参数-->  1
    23 进程pid-->  2312,参数-->  2
    24 进程pid--> 14360,参数-->  3
    25 进程pid--> 15056,参数-->  4
    26 进程pid--> 14380,参数-->  5
    27 进程pid-->  2312,参数-->  6
    28 进程pid--> 14360,参数-->  7
    进程池apply()同步方式
     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     time.sleep(1)
     8     print("子进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     9     if i == 2:
    10         raise ValueError("出错")
    11     return i
    12 
    13 
    14 def fc(args):
    15     print("callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args))
    16 
    17 
    18 def fce(args):
    19     print("error_callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args))
    20 
    21 
    22 if __name__ == "__main__":
    23     print("主进程pid:%d", os.getpid())
    24     pool = Pool(2)
    25     for i in range(4):
    26         # pool.apply(fn, (i,))  # 同步方式
    27         pool.apply_async(fn, (i,), callback=fc, error_callback=fce)  # 异步方式
    28     pool.close()
    29     pool.join()
    30 
    31 
    32 # 输出结果进程
    33 主进程pid:%d 7944
    34 子进程pid-->  9368,参数-->  0
    35 callback--> pid: 7944, type(args)<class 'int'>, args:0
    36 子进程pid--> 13724,参数-->  1
    37 callback--> pid: 7944, type(args)<class 'int'>, args:1
    38 子进程pid-->  9368,参数-->  2
    39 error_callback--> pid: 7944, type(args)<class 'ValueError'>, args:出错
    40 子进程pid--> 13724,参数-->  3
    41 callback--> pid: 7944, type(args)<class 'int'>, args:3
    多进程回调函数的使用
     1 from multiprocessing import Process, Pool, Manager
     2 import time
     3 import os
     4 
     5 def fn(i, li):
     6     # ------子进程的逻辑-----
     7     li[i] = i + 10
     8     print("子进程中-->", li[i])
     9 
    10 
    11 if __name__ == "__main__":
    12     with Manager() as manager:
    13         pool = Pool(2)
    14         li = manager.list([1, 2, 3, 4])  # 列表
    15         for i in range(4):
    16             pool.apply_async(fn, (i, li))  # 异步方式
    17         pool.close()
    18         pool.join()
    19         print("主进程-->", li)
    20 
    21 
    22 # 输出结果
    23 子进程中--> 10
    24 子进程中--> 12
    25 子进程中--> 11
    26 子进程中--> 13
    27 主进程--> [10, 11, 12, 13]
    进程池间的通讯

     

    在这里要说一下同步方式和异步方式的区别,同步方式即一个一个任务的执行,即使进程池有多个进程也会等上一个任务做完才执行下一个任务,达不到并发效果,一般不用 apply()

    注意点:① close()必须在join()前边;② 主进程默认不会等待进程池中的进程,必须要在主进程中添加close,join;③ 进程池间的通讯与进程的用法一致

     

  • 相关阅读:
    Codeforces899D Shovel Sale(思路)
    F
    Codeforces909D Colorful Points(缩点)
    LOD
    Instruments
    IO优化
    Unity JobSystem
    Android 设备指纹
    帧同步
    寻路
  • 原文地址:https://www.cnblogs.com/yhongji/p/9728634.html
Copyright © 2011-2022 走看看