说明
在某些情况下,需要通过多进程或者多线程来提高程序的运行效率,那么python中的多进程和多线程是如何实现的呢,今天来详细讨论一下。
多进程
类似于C语言,可以通过系统的fork来进行多进程的创建(只可在linux下运行),以下是多进程运行的示例:
import os
pid = os.fork() # 父进程中返回值会是子进程PID值,子进程中返回值为0
print('父子进程都会输出。') # 运行两次,分别是父进程和子进程
if pid == 0:
print('子进程的PID是%d,父进程的PID是%d'%(os.getpid(),os.getppid()))
else:
print('父进程的PID是%d'%os.getpid()) #当父进程使用getpid的时候获得的是父进程的pid。
同样地,进程的fork也可以通过运行多次进行嵌套,以达到实现进程的进程的目的。示例代码如下所示:
import os
pid = os.fork() #创建子进程,接收pid的返回值。
if pid > 0: #判断是子进程还是父进程。
print('父进程') #当pid的返回值是0的时候,会运行父进程
else:
print('子进程') #否则就是子进程
pid =os.fork() #让之前的父子进程再次创建各自的子进程
if pid > 0: #判断父子进程
print('父进程的子进程') #这里会运行2次父进程
else:
print('子进程的子进程') #这里也会运行两次子进程
Process库实现多进程
在win上是默认不支持fork的,因此想要使用fork方法就必须通过其他的第三方库来实现:
def child():
while True:
print("this is child process!")
time.sleep(1)
def test_process():
p = Process(target=child)
p.start()
p.join(3) # 3秒后运行父进程
print('this is father process')
当然也可以通过自定义的方式来生成新的进程类,重写run方法以满足有不同需要的多进程。 示例代码如下:
class MyProcess(Process):
def run(self):
while True:
print("this is child process!")
time.sleep(1)
进程池Pool的用法
进程池会创建n个进程,指定进程的个数,这样在计算是就可以得到固定数量的子进程:
def child(num):
for i in range(2):
print('进程的pid是%d,进程值是%d'%(os.getpid(),num))
time.sleep(1)
def test_pool():
p = Pool(2) # 进程池大小
for i in range(6):
print('--%d--'%i)
p.apply_async(child, (i,))
p.close() # 关闭进程池
p.join()
进程的Queue用法
一个较为常见的问题为生产消费问题,在Process库中通过Queue来实现:
def write(q):
for v in range(10):
print('Put %s to Queue'%v)
q.put(v)
time.sleep(0.5)
def read(q):
while not q.empty():
v = q.get(True)
print('Get %s from Queue'%v)
time.sleep(1)
def test_queue():
q = Queue() # 可以声明队列的存储空间
pw = Process(target=write, args=(q, ))
pr = Process(target=read2, args=(q, ))
pw.start() #开始执行pw。
pr.start() #开始执行pr。
pw.join() #等待pw结束
pr.join() #等待pr结束
print('Over') #主进程结束
当生产者和消费者的速度不一致时,就会出现等待态的情景。
Pool中Queue的用法
使用Pool的Queue相当于多生产者多消费者的模式,Queue中可以有多个进程的读写。
def write(q, num):
for v in range(10):
print('Put %s to Queue'%v)
q.put(v)
print('write进程的pid是%d,进程值是%d'%(os.getpid(),num))
time.sleep(0.5)
def read(q, num):
while not q.empty():
v = q.get(True)
print('Get %s from Queue'%v)
print('read进程的pid是%d,进程值是%d'%(os.getpid(),num))
time.sleep(1)
def test_pool_queue():
q = Manager().Queue(2) #这里实例化的时候是使用Manager的Queue
p = Pool(10)
for i in range(6):
p.apply_async(write,(q,i)) #将任务加入Pool的进程池,注意这里的参数于Process不同。
p.apply_async(read,(q,i)) #将任务加入Pool的进程池,注意这里的参数于Process不同。
p.close() #关闭进程池,不再接收进程。
p.join() #子进程完毕,运行以下的主进程。
print('Over')
多线程
基本用法
多线程的用法和多进程基本类似,只不过将Process转化为
import threading
from threading import Thread, Lock
import time
num = 0
def work1():
global num
# time.sleep(0.9)
for i in range(int(1e6)):
num += 1
print("work1 的num为 %d"%(num))
def work2():
global num
# time.sleep(1)
for i in range(int(1e7)):
num += 1
print("work2 的num为 %d"%(num))
def test_thread():
t = Thread(target=work1) # 创建第一个线程内置的self.name属性为Thread-1,并指向work1
tt = Thread(target=work2) #创建第二个线程内置的self.name属性为Thread-2,并指向work2
t.start() #开始执行
tt.start() #开始执行
time.sleep(2) #主线程休息2秒
print('最后的num值是%d'%num) # 最后的num值是10263017
为什么最终的结果不是11000000,而是10263017,是因为同时读写,当拿到全局变量num时,work1和work2同时对num增加操作,所以最终结果不符合预期。
互斥锁
一种解决上述冲突的方法是加入互斥锁,即每次对变量进行读写的时候加锁,等到写操作完成后就释放锁:
def work1_lock(l):
global num
l.acquire() # 加锁
for i in range(int(1e6)):
num += 1
print("work1 的num为 %d"%(num))
l.release() # 释放锁
def work2_lock(l):
global num
l.acquire()
for i in range(int(1e7)):
num += 1
print("work2 的num为 %d"%(num))
l.release()
def test_lock():
print('最后的num值是%d'%num)
l = Lock() #实例化互斥锁,互斥锁是为了保护子线程不争抢数据而使用的一个类。
t = Thread(target=work1_lock, args=(l,))
tt = Thread(target=work2_lock, args=(l,))
t.start()
tt.start()
t.join()
tt.join()
print('最后的num值是%d'%num) # 11000000
threadlocal
threadlocal提供了一种线程内部使用工具的方法,在多个线程使用全局数据时,不可避免地出现锁的情况,而解决方法之一便是通过个线程之间各有自己私有的数据来解决,threadlocal虽然是一个全局量,但是能够为不同的线程提供私有数据的机会。以下是示例代码:
def work1_local(l, name):
l.name = name #将参数name传递给local实例对象的name属性。注意:这里的l.name是创建的对象属性。
work2_local(l) #调用work函数
print('work1: hello,%s,线程的name是%s'%(name,threading.current_thread().name))
def work2_local(l,):
name = l.name # 获取的name将是每个线程之间互不影响
print('work2: hello,%s,线程的name是%s'%(name,threading.current_thread().name))
def test_local():
l = threading.local()
t1 = threading.Thread(target=work1_local,args=(l, '小李')) #实例化线程对象,并调用work,参数name是小李。
t2 = threading.Thread(target=work1_local,args=(l, '小王'))#实例化线程对象,并调用work,参数name是小王。
t1.start()
t2.start()
t1.join()
t2.join()