zoukankan      html  css  js  c++  java
  • python 进程

    导航:

    1、创建进程的两种方式
    2、Process的方法
    3、进程间的通讯1,进程队列Queue--先进先出
    4、进程间的通讯2,管道通讯 Pipe
    5、进程间的数据共享,Manager
    6、多进程同步问题
    7、进程池Pool

    python中多进程可以解决cpython解释器多线程中GIL存在的问题,可以利用CPU的多核资源,实现真的并发效果。操作系统中每个线程有自己的内存空间,数据并不共享。

    python中使用multiprocessing包提供的接口给我们创建多进程,multiprocessing与threading的使用方法相似。

    1、创建进程的两种方式

    1)通过multiprocessing.Process创建

      Process(group=None, target=None, name=None, args=(), kwargs={})

    • group    线程组,没什么用,默认为空就好
    • target    要执行的方法
    • name    进程的名字
    • args/kwargs 执行target方法要传入的参数
     1 def fn(word):
     2     # ------子进程的逻辑-----
     3     print("父进程", os.getppid())
     4     print("子进程", os.getpid())
     5     print("传入的参数:", word)
     6 
     7 
     8 if __name__ == "__main__":
     9     p = Process(target=fn, args=("haha",))  # 创建子进程
    10     p.start()  # 开启子进程
    11     print("主进程", os.getpid())
    12 
    13 
    14 # 输出结果
    15 主进程 5916
    16 父进程 5916
    17 子进程 2936
    18 传入的参数: haha
    通过Process创建一个子进程

    很简单这样就可以创建一个子进程了,可以看第3 4 11行打印的进程id可以知道这是不同的进程。

    这里需要说明的是:在所有的进程当中子进程都是由父进程创建出来的,在这个例子中,子进程的的父进程就是主进程,可以看到第3 11行打印的进程id。

    在windows下,创建进程一定要放在__main__中,不然会报错

    2)通过继承Process类创建子进程

     1 from multiprocessing import Process
     2 import os
     3 
     4 class MyProcess(Process):
     5     def __init__(self, name):
     6         super().__init__()  # 如果重写了初始化方法,在初始化方法中一定要调用父类的初始化方法
     7         self.name = name
     8 
     9     def run(self):
    10         # ------子进程的逻辑-----
    11         print("父进程", os.getppid())
    12         print("子进程", os.getpid())
    13         print("子进程的名字", self.name)
    14 
    15 
    16 if __name__ == "__main__":
    17     p = MyProcess("haha")  # 创建子进程
    18     p.start()  # 开启子进程
    19     print("主进程", os.getpid())
    20 
    21 
    22 # 输出结果
    23 主进程 740
    24 父进程 740
    25 子进程 7052
    26 子进程的名字 haha
    通过继承Process创建子进程

    通过继承Process创建子进程需要重写 run 方法,这是子进程逻辑的入口,当开启子进程时会自动调用这个方法

    上边这两种方式中,主进程执行完所有的逻辑后会等待子进程结束在一起结束,与fork函数创建的方式不一样

    2、Process的方法

    1)is_alive()

    is_alive方法判断指定对象进程的存活状态。进程 start 后一直到该进程结束都返回True。进程 start 前 或进程已经结束返回False

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(1)  # 模拟子进程执行需要消耗的时间
     7 
     8 
     9 if __name__ == "__main__":
    10     p = Process(target=fn)  # 创建子进程
    11     print("p.start前-->", p.is_alive())
    12     p.start()  # 开启子进程
    13     print("p.start后-->", p.is_alive())
    14     time.sleep(2)  # 模拟主进程执行需要消耗的时间,为了确保子进程先结束
    15     print("子进程结束后-->", p.is_alive())
    16 
    17 
    18 # 输出结果
    19 p.start前--> False
    20 p.start后--> True
    21 子进程结束后--> False
    is_alive方法

    2)join()

    join(timeout=None)方法,堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行。可设置timeout值,最多堵塞timeout时间(秒)。注意:join方法只能在start()后才可以使用

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(3)  # 模拟子进程执行需要消耗的时间
     7     print("子进程中-->", time.ctime())
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.start()  # 开启子进程
    13     print("p.join前-->", time.ctime())
    14     p.join()  # 堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行
    15     print("p.join后-->", time.ctime())
    16 
    17 
    18 # 输出内容
    19 p.join前--> Sat Sep 29 16:03:40 2018
    20 子进程中--> Sat Sep 29 16:03:43 2018
    21 p.join后--> Sat Sep 29 16:03:43 2018
    join方法

    这里可以看到第15行语句一直等到第7行执行完才输出

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
     7     print("子进程中-->", time.ctime())
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.start()  # 开启子进程
    13     print("p.join前-->", time.ctime())
    14     p.join(1)  # 设置超时1秒
    15     print("p.join后-->", time.ctime())
    16 
    17 
    18 # 输出内容
    19 p.join前--> Sat Sep 29 16:04:02 2018
    20 p.join后--> Sat Sep 29 16:04:03 2018
    21 子进程中--> Sat Sep 29 16:04:04 2018
    join设置timeout值

    这里可以看到第15行语句只堵塞了1秒的时间

    3)start(),进程准备就绪,等待cpu的执行(调度)

    4)run(),继承Process类的子类,需要重写的方法,当进程对象调用 start 方法时自动执行 run 方法,也是进程的入口

    5)terminate(),不管进程是否执行完,直接终止进程

    6)daemon属性True/False

    与线程的setDeamon()一样。将该进程对象设置为守护进程,效果:父进程将不再等待子进程,父进程结束时,子进程一起结束。注意:daemon属性只能在 start() 前设置

     1 from multiprocessing import Process
     2 import time
     3 
     4 def fn():
     5     # ------子进程的逻辑-----
     6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
     7     print("----------")
     8 
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=fn)  # 创建子进程
    12     p.daemon = True
    13     p.start()  # 开启子进程
    14     print("++++++++++")
    15 
    16 
    17 # 输出内容
    18 ++++++++++
    daemon属性

    子进程中的第7行语句并没有执行,即:子进程在父进程结束时也跟着结束了

    3、进程间的通讯1,进程队列Queue--先进先出

    Queue(maxsize=-1),maxsize=-1默认队列长度没有最大值,maxsize=5表示队列长度最大值为5

    1)put(obj, block=True, timeout=None)

    • obj  添加进队列的值,可以添加任意类型的值
    • block 默认为True,当队列满时,继续添加则发生堵塞,直到队列get()值出去;block=False,队列满时继续添加不堵塞,但会抛出queue.Full异常
    • timeout  堵塞超时,当队列满时,继续添加发生堵塞,堵塞超时timeout秒,超时则会抛出queue.Full异常
     1 >>> from multiprocessing import Process, Queue
     2 >>> q = Queue(3)  # 创建进程队列,队列最大长度为3
     3 >>> q.put("haha")  # 往队列添加字符串
     4 >>> q.put([])  # 往队列添加列表
     5 >>> q.put({})  # 往队列添加字典
     6 >>> q.put(5)  # 往队列添加数字
     7 _  # 满时继续添加,发生堵塞(光标会一直卡在这)
     8 
     9 >>> q.put(5,block=False)  # 往队列添加数字,不堵塞添加
    10 Traceback (most recent call last):
    11   File "<stdin>", line 1, in <module>
    12   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put
    13     raise Full
    14 queue.Full
    15 
    16 >>> q.put(5, timeout=3)  # 往队列添加数字,并设置超时3秒
    17 Traceback (most recent call last):
    18   File "<stdin>", line 1, in <module>
    19   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put
    20     raise Full
    21 queue.Full
    put方法简单使用,这里还不涉及进程间的通讯

    2)get(block=True, timeout=None)  使用方法与put一样的用法,返回先添加的值

     1 >>> q.get()
     2 'haha'
     3 >>> q.get()
     4 []
     5 >>> q.get()
     6 {}
     7 >>> q.get()
     8 _  # 为空时继续取值,发生堵塞(光标会一直卡在这)
     9 
    10 >>> q.get(timeout=3)
    11 Traceback (most recent call last):
    12   File "<stdin>", line 1, in <module>
    13   File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 105, in get
    14     raise Empty
    15 _queue.Empty
    get方法简单使用,这里还不涉及进程间的通讯

    3)qsize()  返回当前队列的长度

     1 >>> from multiprocessing import Queue
     2 >>> q = Queue(3)
     3 >>> q.put("haha")
     4 >>> q.qsize()
     5 1
     6 >>> q.put(100)
     7 >>> q.qsize()
     8 2
     9 >>> q.get()
    10 'haha'
    11 >>> q.qsize()
    12 1
    qsize返回当前队列长度,这里还不涉及进程间的通讯

    4)put_nowait(obj)  相当于put(obj, block=False)

    5)get_nowait()  相当于get(block=False)

    6)empty()   判断队列是否为空,True:空,False:不为空

    7)full()   判断队列是否已满,True:已满,False:未满

    8)close()  关闭队列,关闭后,将不能添加或取出值

    9)通过队列实现多个进程间的通讯

     1 from multiprocessing import Process, Queue
     2 import time
     3 
     4 def write_queue(q):
     5     for i in range(5):
     6         q.put(i)  # 往队列添加值
     7         print("put %d" % i)
     8 
     9 
    10 def read_queue(q):
    11     time.sleep(1)  # 确保write_queue中队列已经有值
    12     while not q.empty():
    13         s = q.get()  # 取出队列的值
    14         print("get %s" % s)
    15 
    16 
    17 if __name__ == "__main__":
    18     q = Queue(5)
    19     p1 = Process(target=write_queue, args=(q,))  # 创建子进程
    20     p2 = Process(target=read_queue, args=(q,))  # 创建子进程
    21     p1.start()  # 开启子进程
    22     p2.start()  # 开启子进程
    23 
    24 
    25 # 输出结果
    26 put 0
    27 put 1
    28 put 2
    29 put 3
    30 put 4
    31 get 0
    32 get 1
    33 get 2
    34 get 3
    35 get 4
    通过队列的多进程间通信,target
     1 from multiprocessing import Process, Queue
     2 import time
     3 
     4 class WriteProcess(Process):
     5     def __init__(self, q):
     6         super().__init__()
     7         self.q = q
     8 
     9     def run(self):
    10         print("子进程WriteProcess-->")
    11         for i in range(5):
    12             self.q.put(i)
    13             print("put %d" % i)
    14 
    15 
    16 class ReadProcess(Process):
    17     def __init__(self, q):
    18         super().__init__()
    19         self.q = q
    20 
    21     def run(self):
    22         time.sleep(1)  # 确保write_queue中队列已经有值
    23         print("子进程ReadProcess-->")
    24         while not self.q.empty():
    25             s = self.q.get()
    26             print("get %s" % s)
    27 
    28 
    29 if __name__ == "__main__":
    30     q = Queue(5)
    31     p1 = WriteProcess(q)  # 创建子进程
    32     p2 = ReadProcess(q)  # 创建子进程
    33     p1.start()  # 开启子进程
    34     p2.start()  # 开启子进程
    35 
    36 
    37 # 输出结果
    38 子进程WriteProcess-->
    39 put 0
    40 put 1
    41 put 2
    42 put 3
    43 put 4
    44 子进程ReadProcess-->
    45 get 0
    46 get 1
    47 get 2
    48 get 3
    49 get 4
    通过队列的多进程间通讯--run方法

    4、进程间的通讯2,管道通讯  Pipe

    1)Pipe(duplex=True)

    Pipe是一个函数,返回元组(Connection(), Connection()).   即返回管道的两端。默认duplex=True为全双工模式,duplex=Fasle中第一个Connection只能接收信息,第二个Connection只能发送消息

    2)Connection常用方法

    • send(obj)     将对象obj发送到管道另一端,发送的数据必须是可序列化的对象。
    • recv()           从管道的另一端接收send()发送的数据。没有数据可接收,将发生堵塞。
    • send_bytes(buffer, offset=-1, size=-1)    发送字节缓冲区,buffer是支持支持字节缓冲的任意对象,offset为buffer的字节偏移量(可以当初下标),size为要发送的字节数。
    • recv_bytes(maxlength=-1)    接收send_bytes()发送的一次数据,maxlength指定接收长度,超出这个长度则抛出将引发IOError异常,没有数据可接收,将发生堵塞。
    • poll([timeout])      返回 True/False,判断管道内是否有数据可以接收,True:数据可接收。timeout为堵塞的时间秒,timeout=None时一直堵塞,直到有数据可以接收
    • close()        关闭链接,关闭链接后将不能继续使用管道,当不再使用管道时可将其关闭
     1 from multiprocessing import Process, Pipe
     2 import time
     3 
     4 class MyProcess1(Process):
     5     def __init__(self, con):
     6         super().__init__()
     7         self.con = con
     8 
     9     def run(self):
    10         self.con.send("12345")
    11         print("MyProcess1--send-->", "12345")
    12         msg = self.con.recv_bytes().decode(encoding="utf-8")
    13         print("MyProcess1--recv_bytes-->", msg)
    14 
    15 
    16 class MyProcess2(Process):
    17     def __init__(self, con):
    18         super().__init__()
    19         self.con = con
    20 
    21     def run(self):
    22         msg = self.con.recv()
    23         print("MyProcess2--recv-->", msg)
    24         self.con.send_bytes("哈哈".encode("UTF-8"))
    25         print("MyProcess2--send_bytes-->", "哈哈")
    26 
    27 
    28 if __name__ == "__main__":
    29     con1, con2 = Pipe()
    30     p1 = MyProcess1(con1)  # 创建子进程
    31     p2 = MyProcess2(con2)  # 创建子进程
    32     p1.start()  # 开启子进程
    33     p2.start()  # 开启子进程
    34 
    35 
    36 # 输出结果
    37 MyProcess1--send--> 12345
    38 MyProcess2--recv--> 12345
    39 MyProcess2--send_bytes--> 哈哈
    40 MyProcess1--recv_bytes--> 哈哈
    多进程间的管道通讯

    5、进程间的数据共享,Manager

    Manager提供多进程间的数据共享,Manager内的主要方法有  dict(mapping_or_sequence), list(sequence), Value(typecode, value), Array(typecode, sequence) 

     

     1 from multiprocessing import Process, Manager
     2 
     3 def fn(dic, li, a):
     4     dic['count'] -= 1
     5     print("子进程-->a", a.value)  # a 为Value对象,通过a.value将值取出、修改
     6     print("子进程-->li", li)
     7 
     8 
     9 if __name__ == '__main__':
    10     with Manager() as manager:
    11         dic = manager.dict({'count': 1000})  # 字典
    12         li = manager.list([1, 2, 3, 4])  # 列表
    13         a = manager.Value("int", 100)  #  传值
    14         p = Process(target=fn, args=(dic, li, a))
    15         p.start()
    16         p.join()
    17         print("主进程-->dic", dic)  # 打印子进程修改后的内容
    18 
    19 
    20 # 输出结果
    21 子进程-->a 100
    22 子进程-->li [1, 2, 3, 4]
    23 主进程-->dic {'count': 0}
    多进程数据共享Manager

     

    6、多进程同步问题

    当多个进程同时访问同一个资源时就得涉及到同步的问题,如:向控制台打印数据,必须要确保同一时刻只能有一个进程在打印数据,否则将会出现乱序的效果;多进程同时对一个文件进行读写等

     1 from multiprocessing import Process, Lock
     2 import time
     3 
     4 def fn(lock):
     5     # ------子进程的逻辑-----
     6     # lock.acquire()  # 未加锁
     7     for i in range(3):
     8         print("12", end="")
     9         time.sleep(0.005)  # 为了看到效果,添加的延迟
    10         print("34")
    11     # lock.release()
    12 
    13 
    14 if __name__ == "__main__":
    15     lock = Lock()
    16     for i in range(3):
    17         p = Process(target=fn, args=(lock,))  # 创建子进程
    18         p.start()  # 开启子进程
    19 
    20 
    21 # 输出结果
    22 121234
    23 1234
    24 1234
    25 121234
    26 1234
    27 34
    28 1234
    29 34
    30 1234
    未加锁
     1 from multiprocessing import Process, Lock
     2 import time
     3 
     4 def fn(lock):
     5     # ------子进程的逻辑-----
     6     lock.acquire()  # 加锁
     7     for i in range(3):
     8         print("12", end="")
     9         time.sleep(0.005)  # 为了看到效果,添加的延迟
    10         print("34")
    11     lock.release()  # 释放锁
    12 
    13 
    14 if __name__ == "__main__":
    15     lock = Lock()
    16     for i in range(3):
    17         p = Process(target=fn, args=(lock,))  # 创建子进程
    18         p.start()  # 开启子进程
    19 
    20 
    21 # 输出结果
    22 1234
    23 1234
    24 1234
    25 1234
    26 1234
    27 1234
    28 1234
    29 1234
    30 1234
    加锁后

    7、进程池Pool

     

     当任务不多时可以可以自己手动创建进程,但任务量大且任务量多的时候手动创建进程的工作量就会比较大,而多次创建进程消耗较大的资源,这个时候进程池就是一种比较好的解决方法。

    在程序一开始就创建好进程池指定进程的数量,当任务来临的时候就从进程池里拿出一个进程,任务完成后进程不会被销毁,而是继续回到进程池等等下一个任务。

    当任务数量大于进程数时,将按照任务添加的先后顺序执行任务,未执行的任务将在任务列表中等待。进程池中的进程用法跟上边的一致。

     

    Pool()创建进程池的时候可以指定进程池里的进程数量,不指定的将默认为cpu的核心数。

    主要方法介绍

     apply_async(func, args=(), kwds={}, callback=None, error_callback=None)   func:任务执行的方法入口;callback:任务执行完毕后执行的回调函数,回调函数执行在父进程里边,即调用apply_async的进程;error:任务执行出错后执行的回调函数

     apply(func, args=(), kwds={})

    close()   关闭进程池,关闭后将不能再继续添加任务

    join()     等所有进程结束后再退出

     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     8     time.sleep(0.01)
     9 
    10 
    11 if __name__ == "__main__":
    12     pool = Pool(4)
    13     for i in range(12):
    14         # pool.apply(fn, (i,))  # 同步方式
    15         pool.apply_async(fn, (i,))  # 异步方式
    16     pool.close()
    17     pool.join()
    18 
    19 
    20 # 输出结果
    21 进程pid-->  1176,参数-->  0
    22 进程pid-->  1556,参数-->  1
    23 进程pid-->  8864,参数-->  2
    24 进程pid-->  1176,参数-->  3
    25 进程pid-->  1556,参数-->  4
    26 进程pid-->  9552,参数-->  5
    27 进程pid-->  8864,参数-->  6
    28 进程pid-->  1556,参数-->  7
    29 进程pid-->  1176,参数-->  8
    30 进程pid-->  9552,参数-->  9
    31 进程pid-->  8864,参数--> 10
    32 进程pid-->  1176,参数--> 11
    进程池apply_async()异步方式
     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     8     time.sleep(1)
     9 
    10 
    11 if __name__ == "__main__":
    12     pool = Pool(4)
    13     for i in range(12):
    14         pool.apply(fn, (i,))  # 同步方式
    15         # pool.apply_async(fn, (i,))  # 异步方式
    16     pool.close()
    17     pool.join()
    18 
    19 
    20 # 输出结果
    21 进程pid--> 15056,参数-->  0
    22 进程pid--> 14380,参数-->  1
    23 进程pid-->  2312,参数-->  2
    24 进程pid--> 14360,参数-->  3
    25 进程pid--> 15056,参数-->  4
    26 进程pid--> 14380,参数-->  5
    27 进程pid-->  2312,参数-->  6
    28 进程pid--> 14360,参数-->  7
    进程池apply()同步方式
     1 from multiprocessing import Process, Pool
     2 import time
     3 import os
     4 
     5 def fn(i):
     6     # ------子进程的逻辑-----
     7     time.sleep(1)
     8     print("子进程pid--> %5d,参数--> %2d" % (os.getpid(), i))
     9     if i == 2:
    10         raise ValueError("出错")
    11     return i
    12 
    13 
    14 def fc(args):
    15     print("callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args))
    16 
    17 
    18 def fce(args):
    19     print("error_callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args))
    20 
    21 
    22 if __name__ == "__main__":
    23     print("主进程pid:%d", os.getpid())
    24     pool = Pool(2)
    25     for i in range(4):
    26         # pool.apply(fn, (i,))  # 同步方式
    27         pool.apply_async(fn, (i,), callback=fc, error_callback=fce)  # 异步方式
    28     pool.close()
    29     pool.join()
    30 
    31 
    32 # 输出结果进程
    33 主进程pid:%d 7944
    34 子进程pid-->  9368,参数-->  0
    35 callback--> pid: 7944, type(args)<class 'int'>, args:0
    36 子进程pid--> 13724,参数-->  1
    37 callback--> pid: 7944, type(args)<class 'int'>, args:1
    38 子进程pid-->  9368,参数-->  2
    39 error_callback--> pid: 7944, type(args)<class 'ValueError'>, args:出错
    40 子进程pid--> 13724,参数-->  3
    41 callback--> pid: 7944, type(args)<class 'int'>, args:3
    多进程回调函数的使用
     1 from multiprocessing import Process, Pool, Manager
     2 import time
     3 import os
     4 
     5 def fn(i, li):
     6     # ------子进程的逻辑-----
     7     li[i] = i + 10
     8     print("子进程中-->", li[i])
     9 
    10 
    11 if __name__ == "__main__":
    12     with Manager() as manager:
    13         pool = Pool(2)
    14         li = manager.list([1, 2, 3, 4])  # 列表
    15         for i in range(4):
    16             pool.apply_async(fn, (i, li))  # 异步方式
    17         pool.close()
    18         pool.join()
    19         print("主进程-->", li)
    20 
    21 
    22 # 输出结果
    23 子进程中--> 10
    24 子进程中--> 12
    25 子进程中--> 11
    26 子进程中--> 13
    27 主进程--> [10, 11, 12, 13]
    进程池间的通讯

     

    在这里要说一下同步方式和异步方式的区别,同步方式即一个一个任务的执行,即使进程池有多个进程也会等上一个任务做完才执行下一个任务,达不到并发效果,一般不用 apply()

    注意点:① close()必须在join()前边;② 主进程默认不会等待进程池中的进程,必须要在主进程中添加close,join;③ 进程池间的通讯与进程的用法一致

     

  • 相关阅读:
    修复PLSQL Developer 与 Office 2010的集成导出Excel 功能
    Using svn in CLI with Batch
    mysql 备份数据库 mysqldump
    Red Hat 5.8 CentOS 6.5 共用 输入法
    HP 4411s Install Red Hat Enterprise Linux 5.8) Wireless Driver
    变更RHEL(Red Hat Enterprise Linux 5.8)更新源使之自动更新
    RedHat 5.6 问题简记
    Weblogic 9.2和10.3 改密码 一站完成
    ExtJS Tab里放Grid高度自适应问题,官方Perfect方案。
    文件和目录之utime函数
  • 原文地址:https://www.cnblogs.com/yhongji/p/9728634.html
Copyright © 2011-2022 走看看