并发编程之多进程
1.进程的三种状态
阻塞 运行 就绪
其中运行和就绪是非阻塞状态
2.进程的创建和销毁
对于通用计算机而言.必须具备创建和销毁进程的能力
创建
1.用户的交互式请求 ,鼠标双击
2.由一个正在运行的程序 调用了开启进程的接口 . 例如subprocess
3.一个批处理作业开始
4.系统初始化
销毁:
1.任务完成 自愿退出
2.强制结束 taskkill kill 非自愿
3.程序遇到了异常
4.严重错误 比如访问了不该访问的内存
3.进程和程序的区别
程序是 一堆代码放在一个文件中
进程是将代码从硬盘读取到内存然后执行产生的
进程是由程序产生的 ,一个程序可以产生多个进程
4.进程的层次结构
在linux中 进程具备父子关系,是一个树状结构 ,可以互相查找到对方
在windows 没有层级关系 , 父进程可以将子进程的句柄转让
5.PID 和 PPID
PID 是当前进程的编号,PPID 是父进程的编号
访问PID与PPID :
import os
os.getpid()
os.getppid()
6.python如何使用多进程
创建子进程的方式
1.导入multiprocessing中的Process类,实例化这个类指定要执行的任务 target
2.导入multiprocessing 中的Process类 继承这个类 覆盖run方法 将要执行的任务放入run中开启进程时会自动执行该函数
from multiprocessing import Process
def task(name):
print(name,'子进程')
if __name__ == '__main__':
p=Process(target=task,args=('zi',)) # 参数元组形式,元素以字符串形式
p.start() # 开启进程,触发实例化的p内run函数
print('主进程')
class SubProcess(Process): # 继承类
def run(self): # 重写run
print('子进程')
if __name__ == '__main__':
p=SubProcess()
p.start() # 触发run
print('主进程')
7.join函数
实例化子进程使用join函数会让主进程等待子进程执行完毕再继续执行
from multiprocessing import Process
import time,random
def task1(name):
time.sleep(random.random())
print(name,'子run')
def task2(name):
time.sleep(random.random())
print(name,'子run')
if __name__ == '__main__':
p1=Process(target=task1,args=('sub1',))
p2=Process(target=task2,args=('sub2',))
p=[p1,p2]
for i in p:
i.start()
for i in p:
i.join() # join是阻塞函数,直到子进程结束继续执行后面的
print('主over')
8.进程对象的常用属性
if __name__ == '__main__':
p = Process(target=task,name="老司机进程")
p.start()
p.join()
# print(p.name) #进程名
# p.daemon #守护进程
# p.join()
# print(p.exitcode) # 获取进程的退出码 就是exit()函数中传入的值
# print(p.is_alive()) # 查看进程是否存活
# print("zi",p.pid) # 获取进程id
# print(os.getpid())
# p.terminate() #终止进程 与strat 相同的是 不会立即终止,因为操作系统有很多事情要做
# print(p.is_alive())
9.僵尸进程与孤儿进程
孤儿进程 :当父进程已经结束 而子进程还在运行子进程就称为孤儿进程 ,有其存在的必要性,没有不良影响 。
僵尸进程 :当一个进程已经结束了但是,它仍然还有一些数据存在 ,此时称之为僵尸进程 。
10.守护进程
通过daemon这个函数可以给主进程设置守护进程
注意要在子进程开启start()前设置为守护进程
守护进程会在主进程代码执行结束后就终止
import time
from multiprocessing import Process
def task():
print('zi run')
if __name__ == '__main__':
p=Process(target=task)
p.daemon=True # 守护进程
p.start()
time.sleep(1)
print('zhu over')
11.进程安全问题
进程之间数据不共享,但是共享同一套文件系统。
所以在访问同一个文件,或同一个打印终端,是没有问题的,
但是共享带来的是竞争,竞争带来的结果就是错乱。
解决的方法:是将并发操作公共资源的代码 由并发变为串行 解决安全问题,但是牺牲效率
这里引入了互斥锁。
12.互斥锁
通过multiprocessing 模块中的Lock类可以为公共资源进行加锁,可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改
lock.acquire() # 加锁
lock.release() # 解锁
与join函数相比,join函数时执行完一个进程再执行下一个,而互斥锁只是在锁起来的部分起作用,不用对于整个进程使用,比join函数效率高。
import json
from multiprocessing import Process,Lock
def show():
with open('05.db.json') as fr:
data=json.load(fr)
print(f'剩余票数:{data["count"]}')
def buy():
with open('05.db.json') as fr:
data=json.load(fr)
if data['count']>0:
data['count']-=1
print('抢票成功!')
with open('05.db.json','w') as fw:
json.dump(data,fw)
def lock(l):
show()
l.acquire()
buy()
l.release()
if __name__ == '__main__':
l = Lock()
for i in range(10):
p=Process(target=lock,args=(l,))
p.start()
13.Manager的使用
虽然进程间数据独立,但可以通过multiprocessing 模块中的Manager类创建一个同步管理器来实现数据共享。,但是不加锁操作共享的数据会出现数据错乱。
from multiprocessing import Manager,Process,Lock
def add(num_list,lock):
lock.acquire()
a=num_list[-1]+1
num_list.append(a)
lock.release()
if __name__ == '__main__':
lock=Lock()
m=Manager() # 创建一个同步器
num_list=m.list([1,2,3]) # 同步列表或字典
p_list=[]
for i in range(5):
p=Process(target=add,args=(num_list,lock))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(num_list)
14.队列Queue
通过multiprocessing模块中的Queue类创建共享的进程队列实现多进程之间的数据传递,Queue是多进程安全的队列,其效率高,而且已经处理好锁的问题。
创建队列时,多个进程可以通过put函数将元素放入队列,取时通过get函数获取元素。
注意:创建队列时可以指定放入多少元素,默认为不限。当有限定个数时,队列中没有了位置则会阻塞。若存放超过限定个数,且设置了timeout参数,超过设置的时间则会捕捉异常queue.Full
在获取存放的元素时,有多少便可以取多少,在取空之后若还有get命令则会出现阻塞,若设置了timeout参数,超时会捕捉异常queue.Empty
15.函数的调用栈
栈也是一种特殊的容器,特殊在于存取顺序为先进后出
函数调用栈:调用函数时称之为函数入栈,函数执行结束称之为函数出栈
def a():
b()
def b():
c()
def c():
print('c')
raise Exception
a()
入栈出栈的顺序:最先执行的最后结束,最后执行的最先结束
16.生产者和消费者模式
生产者消费者模型总结:
程序中有两类角色
一类是负责生产数据,我们称为生产者
一类是负责处理数据,我们称为消费者
引入生产者消费者模型为了:
平衡生产者与消费者之间的工作能力,从而来提高程序整体处理数据的速度
具体的解决方法:
1.先将双方解开耦合,让不同的进程负责不同的任务
2.提供一个共享的容器来平衡双方的能力,这里可以使用进程队列,因为队列可以在进程间共享数据,而且不用担心安全问题。
但是仍旧存在一个问题,就是使用进程队列时消费者不知道何时结束。
可以使用JoinableQueue:joinableQueue继承了Queue,用法一致,而且增加了join 和taskDone
task_done标记进程队列里的调用次数等于存入的元素个数时表示任务处理完成。
join函数阻塞直到收到task_done函数的处理完成信息,继续往下执行进程。
通过这种方法可以直接结束消费者进程,解决了所有问题。
import os
import time
from multiprocessing import Process,JoinableQueue
# 进程队列可以共享数据,并且不用考虑安全问题
def customer(q):
while True:
res=q.get()
time.sleep(0.1)
print(os.getpid(),'吃了包子',res)
time.sleep(0.1)
q.task_done()
def product(q):
for i in range(5):
time.sleep(0.1)
print(os.getpid(),'生产了包子',i+1)
time.sleep(0.1)
q.put(i+1)
if __name__ == '__main__':
q=JoinableQueue()
c=Process(target=customer,args=(q,))
p=Process(target=product,args=(q,))
c.start()
p.start()
p.join()
q.join() # 等p先放完包子,否者没有p.join()时,当生产3个包子,task_done正好发送结束命令给q.join()执行完
print('over')
c.terminate()