多进程编程 import os pid = os .fork() 功能 :创建新的进程 参数: 无 返回值 :失败返回一个负数 成功:在原有进程中返回新的进程的PID号 在新进程中返回为0
* 子进程会复制父进程传问代码段,包括fork之前产生的内存空间
* 子进程从fork的下一句开始执行,与父进程互不干扰
* 父子进程的执行顺序是不一定的,父子进程公用一个终端显示
* 父子进程通常会根据fork返回值得差异选择执行不同的代码。所以if 结构几乎是fork 的固定搭配
* 父子进程空间独立,操作的都是本空间的内容,互不影响
* 子进程也有自己的特性,比如PID号,PCB,命令集等
获取进程PID
os.getpid()
功能: 获取当前进程的进程号
返回值: 返回进程号
os.getppid()
功能: 当前进程父进程的PID号
返回值: 返回进程号
进程退出
os._exit(status) 结束进程之后不在执行后面的内容
功能:进程退出状态
参数:进程的退出状态
sys.exit([status]) sys.exit('这是退出信息') 退出时,打印退出信息
功能: 进程退出
参数: 数字表示退出状态,不写默认为0
字符串,表示退出时打印的内容
此函数是抛出异常退出,这个异常可以通过【except SystemExit as e】被捕获,并显示出来,之后程序将不会退出
Z :该程序应该已经终止,但是其父程序却无法正常的终止他,造成 zombie (疆尸) 程序的状态 (僵尸进程,孤儿进程)
孤儿进程:定义
父进程先于子进程退出,此时子进程就称为孤儿进程。
* 孤儿进程会被操作系统指定的进程收养,系统进程就成为孤儿进程的新的父进程
僵尸进程 :定义
子进程先于父进程退出,但是父进程没有处理子进程的退出状态,此时子进程就会成为僵尸进程
* 僵尸进程会存留少量PCB信息在内存中,大量的僵尸进程会消耗系统资源,应该避免僵尸进程的产生
如何避免僵尸进程产生
* 处理子进程退出状态
1 pid,status = os.wait()
功能 :在父进程中阻塞等待处理子进程退出

import os from time import sleep pid = os.fork() if pid < 0: print('创建失败') elif pid == 0 : print('创建成功PID,',os.getpid()) os._exit(9) #返回退出状态[状态码*256] else: print('主进程PID,',os.getpid()) pid,status = os.wait() #获取子进程退出PID和进程状态,阻塞函数 print(pid,status) #打印 print(os.WEXITSTATUS(status)) #获取子进程退出状态 while True: sleep(2)
2 返回值 : PID退出的子进程的PID号 ,,stastus 获取子进程退出状态
pid,status = os.waitpid(pid,option)
功能 : 在父进程中阻塞等待处理子进程退出
参数 : pid -1 表示等待任意子进程退出
>0 表示等待对应PID号的子进程退出
option 0 表示阻塞等待
WNOHANG 表示非阻塞
返回值 : pid 退出的子进程的PID号
status 获取子进程退出状态
waitpid(-1,0) ==> wait(

import os from time import sleep pid = os.fork() if pid < 0: print('创建失败') elif pid == 0 : print('创建成功PID,',os.getpid()) os._exit(9) #返回退出状态[状态码*256] else: print('主进程PID,',os.getpid()) while True: pid,status = os.waitpid(-1,os.WNOHANG) print(pid,status) print(os.WEXITSTATUS(status)) sleep(1)
multiprocessing 模块创建进程
1 需要将要执行的事情封装为函数
2 使用 multiprocessing模块中 Process类创建进程对象
3 通过对象属性设置和 Process的初始化函数对进程进行设置,绑定要执行的函数
4 启动进程,会自动执行进程绑定的函数
5 完成进程的回收
Process()
功能 :创建进程对象
参数 : name 进程名称 (默认为Process-1)
target 绑定函数 (必填)
args 元组 给target函数按照位置传参
kwargs 字典 给target函数按照键值对传参
返回需要创建进程对象
p = Process(target = * , args(*,))
p.start
功能 :启动进程
*target 函数会自动执行,此时进程真正被创建
p.join([timeout])
功能 : 阻塞等待回收子进程
参数 : 超时时间
* 使用 multiprocessing 创建子进程,同样子进程复制父进程的全部代码段,父子进程各自执行互不影响,
父子进程有各自的运行空间
* 如果不使用join回收子进程则子进程退出后会成为僵尸进程
* 使用 multiprocessing 创建子进程往往父进程只是用来创建进程回收进程
****************
注意
1 如果子进程从父进程拷贝对象,对象和网络或者文件相关联,那么父子进程会使用同一套对象属性,相互有一定的关联性
2 如果在子进程中单独创建对象,则和父进程完全没有关联
****************
p.is_alive() 判断进程生命周期状态,处于生命周期得到True ,否则返回False
p.name 进程名称 默认为Process-1
p.pid 进程的PID 号
p.daemon
默认为False ,主进程退出不会影响子进程执行
如果设置为True 则子进程会随着主进程结束而结束
* 要在start 前设置
* 一般不和 join一起使用
创建算定义进程类
1 继承Process
2 编写自己的__init__,同时加载父类的init方法
3 重写run方法,可以通过生成的对象调用start自动运行

class Process_1(Process): def __init__(self,value): self.value = value super().__init__() def run(self): for i in range(3): print('%s--%s-'%(i,self.value)) p=Process_1(5) p.start() p.join()
多进程
优点:
1 可以使用计算机多核,进行任务的并发执行,提高执行效率!
2 空间独立,数据安全
3 运行不受其他进程影响,创建方便
缺点
进程的创建和删除消耗的系统资源较多
进程池技术:
产生原因 : 如果有大量任务需要多进程完成,则可能需要频繁的创建删除进程,给计算机带来较多的资源消耗。
原理: 创建适当的进程被放入进程池,用来处理待处理事件,处理完毕后进程不销毁,仍然在进程池中等待处理其他事件。
进程的利用降低了资源的消耗
使用方法:
1 创建进程池,在池内放入适当的进程
2 将事件加入到进程池等待队列
3 不断取进程执行事件,直到所有事件执行完毕
4 关闭进程池,回收进程
from multiprocessing import Pool
Poll(process)
功能 :创建进程池对象
参数 :表示进程池中有多少进程
pool.apply_async(func,args,kwds)
功能:将事件放入到进程池队列
参数:func事件函数
args:以元组形式给func传参
kwds: 以字典形式给func传参
pool.close() 关闭进程池
poll.join() 回收进程池

from multiprocessing import Process,Pool import time def we(msg): time.sleep(2) print(msg) return time.time() pool = Pool(4) a=[] #创建列表 for i in range(10): msg = 'hell ___%s:::'%i c=pool.apply_async(we,(msg,)) #返回的是一个result对象 a.append(c) pool.close() pool.join() for i in a: print(i.get())##或获取进程的返回值
poll.map(func,iter)
功能:将要做的事件放入进程池
参数:func 要执行的函数
iter 迭代对象
返回值 :返回事件函数的返回值列表

from multiprocessing import Process,Pool import time def we(msg): time.sleep(2) print(msg,Process.__name__) return msg*2 pool = Pool(4) a=pool.map(we,range(10)) #返回可迭代对象,此处map跟高阶map函数相差不多 pool.close() pool.join() for i in a: print(i)
进程间通信(IPC)
原因 :进程空间相对独立,资源无法相互获取,此时在不同进程间通信需要专门的方法
进程间通信的方法: 管道 ,消息队列,共享内存,信号,信号量,套接字
管道通信 pipe
通信原理 :在内存中开辟管道空间,生成管道操作对象,多个进程使用‘同一个’管道对象进行操作即可实现通信
multiprocessing --> Pipe
fd1,fd2 = Pipe(duplex = True)
功能 :创建管道
参数 :默认表示双向管道
如果设置为False则为单向管道
返回值 :表示管道的两端
如果是双向管理 都可以读写
如果是单向管道 ,则fd1 只读, fd2只写
fd.recv() 没有参数
功能 :从管道读取信息
返回值:读取到的内容
*如果管道为空则阻塞
fd.send(data)
功能:向管道写入内容
参数: 要写入的内容
* 可以发送python 任意数据类型

from multiprocessing import Process,Pipe from time import sleep fd1,fd2 = Pipe() #建立管道 def f1(fd1): a=0 while a<5: date=fd1.recv() print(date) if date: a+=1 for i in range(5): #单进程控制5条进程信息,所以 发5次 fd1.send('完成') print('正在结束!') def f2(fd2): fd2.send('这是f2的name') date=fd2.recv() print(date) p1 = Process(target=f1,args=(fd1,)) #创建接收管道,单进程进行读写 p1.start() a=[] for i in range(5): p = Process(target=f2,args=(fd2,)) a.append(p) p.start() for i in a: i.join() print('子进程已结束,等待收进程结束') p1.join()
消息队列
队列 :先进先出
通信原理 :在内存中建立队列数据结构模型。多个进程都可以通过队列存入内容,取出内容的顺序和存入顺序保持一致
创建队列
q = Queue(maxsize = 0 )
功能 : 创建消息队列、
参数 : 表示最多存放多少消息,默认表示根据内存分配存储
返回值:队列对象
q.put(data,[block,timeout])
功能:向队列存储消息
参数 :data 要存的内容
block 默认队列满时会阻塞,设置为False 则非阻塞
timeout 超时时间
data = q.get([block,timeout])
功能 :获取队列消息
参数 : block 默认队列空时会阻塞,设置为False则非阻塞
timeout 超时时间
返回值 :返回取出的内容
q.full() 判断队列是否为满
q.empty() 判断队列是否为空
q.qsize() 判断队列中消息数量
q.close() 关闭队列
共享内存
通信原理:在内存中开辟一块空间,对多个进程可见,进程可以写入输入,但是每次写入的内容会覆盖之前的内容。
obj = Value(ctype,obj)
功能 :开辟共享内存空间
参数 :ctype 要存储的数据类型('c','i')
obj 共享内存的初始化数据
返回 :共享内存对象
obj.value 即为共享内存值,对其修改即修改共享内存

from multiprocessing import Process,Value import time import random money =Value('i',2000) def deposite(): for i in range(100): time.sleep(0.01) money.value += random.randint(1,200) def withdraw(): for i in range(100): time.sleep(0.02) money.value -= random.randint(1,100) p1 = Process(target=deposite) p2 = Process(target=withdraw) p1.start() p2.start() p1.join() p2.join() print(money.value)
obj = Array(ctype,obj)
功能:开辟共享内存空间
参数: ctype 要存储的数据格式
obj 初始化存入的内容,比如列表,字符串
如果是数组则表示开辟空间的个数
返回值 :返回共享内存对象
* 可以通过遍历获取每个元素的值
* 如果存入的是字符串,可以通过obj.value 表示字符串的首地址

from multiprocessing import Process,Array import time shm = Array('i', [1,2,3,4])#创建共享内存,开辟4个整形空间 print(shm) def f1(shm): for i in shm: print(i) shm[3]=145 # p = Process(target=f1,args=(shm,)) p.start() p.join()

from multiprocessing import Process,Array import time shm = Array('c', b'hello') print(shm) def f1(shm): for i in shm: print(i) shm[0]=b'H' p = Process(target=f1,args=(shm,)) p.start() p.join() print(shm.value)#打印字符串
开辟空间 | 管道内存 | 消息队列 | 共享内存 |
读写方式 |
两端读写 双向/单向 |
先进先出 | 覆盖之前内容 |
效率 | 一般 | 一般 | 较高 |
应用 | 多用于父子进程 | 广泛灵活 | 需要注意进行互斥操作 |
信号通信
一个进程向另一个进程发送一个信号来传递某种讯息,接收者根据接收到的信号进行相应的行为
信号见《https://www.cnblogs.com/Skyda/p/9397913.html》
os.signal
os.kill(pid,sig)
功能:发送信号
参数 :PID 目标进程
SIG 要发送的信号
import signal
signal.alarm(sec)
功能:向自身改善时钟信号 --》SIGALRM
参数:sec 时钟时间
同步执行:按照顺序逐句执行,一步完成再做下一步
异步执行:在执行过程中利用内核记录延迟发生或者准备处理的事件。这样不影响应用层的持续执行
当事件发生时再由内核告知应用层处理
* 信号是唯一的异步通信方法
进程中只能有一个时钟,第二个会覆盖第一个时钟
signal.pause()
功能:阻塞等待接收一个信号
signal.signal(signum,handler)
功能:处理信号
参数:signum 要处理的信号
handler 信号的处理方法
SIG_DFL 表示使用默认的方法处理
SIG_IGN 表示忽略这个信号
func 传入一个函数表示用指定函数处理
def func(sig,frame)
sig: 捕获到的信号
frame: 信号对象
常用的的信号,,,
HUP 1 终端断线
INT 2 中断(同 Ctrl + C)
QUIT 3 退出(同 Ctrl + )
TERM 15 终止
KILL 9 强制终止一个进程
CONT 18 继续(与STOP相反, fg/bg命令)
STOP 19 暂停(同 Ctrl + Z)
ALRM 14 时钟信号
CHLD 17 子进程状态改变时给父进程发出 【在父进程中忽略子进程状态改变,子进程退出自动系统处理】
SIGTSTP 20 (Ctrl + Z )

import signal import time signal.alarm(3) #异步加载到内核如果还有alarm信号,依然忽略 signal.signal(14,signal.SIG_IGN) while True: print('1') time.sleep(1)

import signal import time def a(sig,framer): if sig == 14: print('这是时钟信号') elif sig == signal.SIGINT: print('结束不了') signal.alarm(3) signal.signal(signal.SIGALRM,a) signal.signal(signal.SIGINT,a) while True: time.sleep(1) print('1')

from multiprocessing import Process
import os,sys
from signal import *
from time import sleep
def hand_(sig,frame):
print(sig)#获取信号并判断
if sig == SIGINT:
os.kill(os.getppid(),SIGUSR1)
elif sig == SIGQUIT:
os.kill(os.getppid(),SIGUSR2)
elif sig == SIGUSR1:
print('这里是exit')
os._exit(9) #退出发出状态码
#子进程
def son():
signal(SIGINT,hand_)#需要处理的信号
signal(SIGQUIT,hand_)
signal(SIGUSR1,hand_)
signal(SIGTSTP,SIG_IGN)#需要忽略的信号
while True:#不让子进程结束
sleep(2)
p = Process(target=son)
p.start()
#父进程
def parent(sig,frame):
if sig == SIGUSR1:
print('开车了')
if sig == SIGUSR2:
print('车速有点快')
if sig == SIGTSTP:
os.kill(p.pid,SIGUSR1)
#使用信号的时候,需要注意,主进程和子进程 都能接收到发出的信号
#所以需要在主进程上进行信号忽略处理
signal(SIGUSR1,parent) #需要处理的信号
signal(SIGUSR2,parent)#需要处理的信号
signal(SIGTSTP,parent)#需要处理的信号
signal(SIGINT,SIG_IGN)#需要忽略的信号
signal(SIGQUIT,SIG_IGN)#需要忽略的信号
while True:#不让主进程结束
pid,status = os.wait()#获取子进程退出状态码,并判断主进程退出
if status:
sys.exit()
信号量(信号灯)
原理:给定一个数量,对多个进程可见,且多个进程都可以操作。进程通过对数量多少的判断执行各自的行为
multiprocessing --> Semaphore
sem = Semaphore(num)
功能 :创建信号量
参数: 信号量初始值
返回:信号量对象
sem.get_value() 获取信号量值
sem.acquire() 将信号量减1 当信号量为0会阻塞
sem.release() 将信号量加1
进程的同步互斥
临界资源:多个进程或者线程都能够操作的共享资源
临界区: 操作临界资源的代码段
同步:同步是一种合作关系,为完成某个任务,多进程或者多线程之间形成一种协调,按照约定或条件执行操作临界资源。
互斥:互斥是一种制约关系,当一个进程或者线程使用临界资源时进行上锁处理,当另一个进程使用时会阻塞等待,直到解锁后才能继续使用
Event 事件
from multiprocessing import Event
创建事件对象
e = Event
设置事件阻塞
e.wait([timeout])
事件设置 --当事件被设置后 e.wait()不再阻塞
e.set()
清除设置 ,当事件设置被clear后 e.wait又会阻塞
e.clear()
事件状态判断
e.is_set()

from multiprocessing import Event,Process from time import sleep def wait_(e): print('临界区操作') e.wait() print('这里是临界资源1') with open('file.txt','rb') as f: print(f.read()) e = Event() p1 = Process(target=wait_,args=(e,)) p1.start() print('主进程执行') with open('file.txt','wb') as f: print('这里正在写') f.write(b'i l looo') e.set() p1.join()
Lock 锁
创建对象 lock = lock()
lock.acquire()上锁
如果锁已经是上锁状态调用此函数会阻塞
lock.release()解锁