并发编程 二、多进程 要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。 Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。 Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程: import os print('Process (%s) start...' % os.getpid())# Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid)) 运行结果如下: Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876. 由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python! 有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。 补充: 同一个程序执行多次是多个进程 每一个进程有一个PID import os print('爹是:',os.getppid()) #父的pid (pycharm.exe) print('me是: ',os.getpid()) 1.开启进程的两种方式: indows (createprocess) 创建子进程时子辈除了拷贝父辈的信息还创建了些自己的东西 unix (fork) 创建子进程时拷贝父辈的信息,子进程的初始状态和父辈一致 第一种(比较常用) from multiprocessing import Process import time def task(name): print('%s is running' %name) time.sleep(3) print('%s is done' %name) if __name__ == '__main__': # 在windows系统之上,开启子进程的操作一定要放到这下面 # Process(target=task,kwargs={'name':'egon'}) #两种传参方式皆可 p=Process(target=task,args=('egon',)) #两种传参方式皆可 p.start() # 向操作系统发送请求,操作系统会申请内存空间,然后把父进程的数据拷贝给子进程,作为子进程的初始状态 print('======主') 第二种 from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super(MyProcess,self).__init__() #Process在init里面有相应设置,要遗传下来,否则报错 self.name=name def run(self): print('%s is running' %self.name) time.sleep(3) print('%s is done' %self.name) if __name__ == '__main__': p=MyProcess('egon') p.start() #p.start()调用了类中的run()方法(规定) print('主') 2.进程的内存空间相互隔离 from multiprocessing import Process import time x=1000 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了 def task(): time.sleep(3) global x x=0 print('儿子死啦',x) #在之前最好只有函数或变量的定义,没有具体的执行(print等) if __name__ == '__main__': p=Process(target=task) p.start() time.sleep(5) print(x) #在子进程中对变量x的修改不影响父进程中x的值 3.父进程等待子进程结束 p1.join() #笨 from multiprocessing import Process import time x=1000 def task(n): print('%s is runing' %n) time.sleep(n) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p1.start() p2.start() p3.start() p3.join() #3s p1.join() p2.join( ) print('主',(time.time() - start_time)) #3.01637601852417 #用循环 from multiprocessing import Process import time x=1000 def task(n): print('%s is runing' %n) time.sleep(n) if __name__ == '__main__': start_time=time.time() p_l=[] for i in range(1,4): p=Process(target=task,args=(i,)) p_l.append(p) p.start() for p in p_l: p.join() print('主',(time.time() - start_time)) #3.0141923427581787 4.进程对象的其他属性 from multiprocessing import Process import time def task(n): print('%s is runing' %n) time.sleep(n) if __name__ == '__main__': start_time=time.time() p1=Process(target=task,args=(1,),name='任务1') p1.start() print(p1.pid) print(p1.name) #如前面不定义name,默认process-1 etc p1.terminate() #向操作系统发请求,关闭需要一点时间 p1.join() print(p1.is_alive()) print('主') from multiprocessing import Process import time,os def task(): print('self:%s parent:%s' %(os.getpid(),os.getppid())) time.sleep(3) if __name__ == '__main__': p1=Process(target=task,) p1.start() print(p1.pid) print('主',os.getpid()) 5.、僵尸进程与孤儿进程 在unix系统中init是所有进程的爹;创建进程用fork,回收进程用waitpid 僵尸进程(有害:占用pid):子代先于父代终结,其部分信息(pid等)没有从系统中删除,需要父代回收。join中含有回收子代信息的功能。 孤儿进程(无害):父代先于子代终结,子代终结后的部分信息由init代收。 from multiprocessing import Process import time,os def task(n): print('%s is running' %n) time.sleep(n) if __name__ == '__main__': p1=Process(target=task,args=(1,)) p1.start() p1.join() # join中含有回收子代信息的功能(wait) print('======主',os.getpid()) time.sleep(10000) 6.守护进程 from multiprocessing import Process import time def task(name): print('%s is running' % name) time.sleep(3) if __name__ == '__main__': obj = Process(target=task, args=('egon',)) obj.daemon=True #将obj变成守护进程,主进程执行完毕后子进程跟着结束 obj.start() # 发送信号给操作系统 print('主') 7.互斥锁 强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。 互斥锁vs join: 大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行) 区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行 区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统) from multiprocessing import Process,Lock import time,random mutex=Lock() def task1(lock): lock.acquire() print('task1:名字是egon') time.sleep(random.randint(1,3)) print('task1:性别是male') time.sleep(random.randint(1,3)) print('task1:年龄是18') lock.release() def task2(lock): lock.acquire() print('task2:名字是alex') time.sleep(random.randint(1,3)) print('task2:性别是male') time.sleep(random.randint(1,3)) print('task2:年龄是78') lock.release() def task3(lock): lock.acquire() print('task3:名字是lxx') time.sleep(random.randint(1,3)) print('task3:性别是female') time.sleep(random.randint(1,3)) print('task3:年龄是30') lock.release() if __name__ == '__main__': p1=Process(target=task1,args=(mutex,)) p2=Process(target=task2,args=(mutex,)) p3=Process(target=task3,args=(mutex,)) p1.start() p2.start() p3.start() 8.互斥锁之抢票系统 import json import time import random import os from multiprocessing import Process,Lock mutex=Lock() def search(): time.sleep(random.randint(1,3)) with open('db.json','r',encoding='utf-8') as f: dic=json.load(f) print('%s 剩余票数:%s' %(os.getpid(),dic['count'])) def get(): with open('db.json','r',encoding='utf-8') as f: dic=json.load(f) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) with open('db.json','w',encoding='utf-8') as f: json.dump(dic,f) print('%s 购票成功' %os.getpid()) def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': for i in range(10): p=Process(target=task,args=(mutex,)) p.start()