前情回顾
1. 进程相关函数
os.getpid() 获取当前进程的PID
os.getppid() 获取父进程PID
os._exit() 退出进程
sys.exit() 退出进程
2. 孤儿进程和僵尸进程
如何避免僵尸进程
【1】 使用wait,waitpid处理僵尸进程
【2】 创建二级子进程处理僵尸进程
3. 聊天室程序
确定需求--> 基本的技术分析--》整体设计--》功能分析--》代码实现--》代码测试完善
**********************************************************************************************
一.群聊聊天室
1.退出功能
【1】服务端
* 接收消息确定消息类型
* 将用户从字典移除
* 将退出消息发送给其他人
* 给该用户发送特殊标志
【2】客户端
* 输入quit退出
* 将退出请求发送给服务器然后结束进程
* recv接收服务器信息后退出
2.管理员消息
二.multiprocessing 模块创建进程
1.流程特点
【1】将需要子进程执行的事件封装为函数
【2】通过模块的Process类创建进程对象,关联函数
【3】通过进程对象设置进程信息及属性
【4】通过进程对象调用Start启动进程
【5】通过进程对象调用join回收进程
2.基本接口使用
【1】Process()
功能:创建进程对象
参数:target(绑定要执行的目标函数)(必选)
args 元组,用于给target函数位置传参
kwargs 字典,给target函数键值传参
1 import multiprocessing as mp 2 from time import sleep 3 4 #编写进程函数 5 def fun(): 6 sleep(3) 7 print("子进程") 8 9 #创建进程对象 10 p= mp.Process(target=fun) 11 #启动进程 12 p.start() 13 14 sleep(2) 15 print("父进程") 16 #回收进程 17 p.join()
1 import multiprocessing as mp 2 from time import sleep 3 4 a = 1 5 6 #编写进程函数 7 def fun(): 8 sleep(3) 9 global a 10 print("a=",a) 11 a = 10000 12 print("子进程") 13 14 #创建进程对象 15 p= mp.Process(target=fun) 16 #启动进程 17 p.start() 18 19 sleep(2) 20 print("父进程") 21 #回收进程(处理僵尸进程) 22 p.join() 23 24 print("---------------") 25 print("parent a=",a)
1 from multiprocessing import Process 2 from time import sleep 3 import os 4 5 def th1(): 6 sleep(3) 7 print("茶饭") 8 print(os.getppid(),'----',os.getpid()) 9 def th2(): 10 sleep(2) 11 print("睡觉") 12 print(os.getppid(),'----',os.getpid()) 13 def th3(): 14 sleep(4) 15 print("xxx") 16 print(os.getppid(),'----',os.getpid()) 17 18 things = [th1,th2,th3] 19 processes = [] 20 for th in things: 21 p = Process(target = th) 22 processes.append(p)#用列表保存进程对象 23 p.start() 24 25 for i in processes: 26 i.join()
1 from multiprocessing import Process 2 from time import sleep 3 4 #带参数的进程函数 5 def Worker(sec,name): 6 for i in range(3): 7 sleep(sec) 8 print("I'm %s"%name) 9 print("I'm working...") 10 #3# p = Process(target = Worker,args = (2,"Levt")) 11 #4#p = Process(target = Worker,kwargs={'name':'Tom','sec':2})#两种方式 12 p = Process(target = Worker,args=(2,),kwargs={'name':'Tom'})#3种方式 13 p.start() 14 p.join()
1 from multiprocessing import Process 2 from time import sleep 3 4 #带参数的进程函数 5 def Worker(sec,name): 6 for i in range(3): 7 sleep(sec) 8 print("I'm %s"%name) 9 print("I'm working...") 10 #3# p = Process(target = Worker,args = (2,"Levt")) 11 #4#p = Process(target = Worker,kwargs={'name':'Tom','sec':2})#两种方式 12 p = Process(target = Worker,args=(2,),kwargs={'name':'Tom'})#3种方式 13 p.start() 14 p.join()
1 from multiprocessing import Process 2 from time import sleep,ctime 3 4 def tm(): 5 for i in range(3): 6 sleep(2) 7 print(ctime()) 8 9 # p = Process(target = tm) 10 p = Process(target = tm,name = "tedu") 11 12 p.daemon = True#主进程退出之后子进程接着退出,p.join()二选一 13 14 p.start() 15 print("Process name",p.name)#进程名称 16 print("Process pid",p.pid)#对应子进程的PID 17 print("alive:",p.is_alive())#查看子进程是否在声明周期 18 19 20 p.join(2)#如果daemon设置成True通常就不使用join 21 print("------------------------") 22
【2】p.start()
功能:启动进程
* 启动进程此时target绑定函数开始执行,该函数作为子进程执行内容,
此时进程真正被创建
【3】p.join([timeout])
功能:阻塞等待回收进程
参数:超时时间
注意:* multiprocessing创建进程同样是子进程复制父进程空间代码段,父子进程运行互不影响
* 子进程只运行target绑定的函数部分,其余均是父进程的执行内容
* 父进程执行内容
multiprocessing中父进程往往只用来创建子进程回收子进程,
具体事情由子进程完成
* multiprocessing 创建的子进程中无法使用标准输入
3.进程对象属性
p.name 进程名称
p.pid 对应子进程的PID
p.is_alive() 查看子进程是否在声明周期
p.daemon 设置父子进程的退出关系
*如果设置为True则子进程会随父进程的退出而结束
*要求必须在start()前设置
*如果daemon设置成True通常就不使用join
4.自定义进程类
编写流程:
【1】定义进程类继承Process
【2】编写__init__方法,使用super重新加载父类的__init__方法
【3】重写Process中的run方法
使用方法:
【1】使用自定义类,实例化对象,
【2】通过对象调用start创建进程,自动运行run方法
【3】实例化对象调用jion()回收子进程
1 from multiprocessing import Process 2 import time 3 4 #自定义进程类 5 class ClockProcess(Process): 6 def __init__(self,value): 7 self.value = value 8 super().__init__() 9 10 #重写run 方法 11 def run(self): 12 for i in range(5): 13 print("The time is %s"%time.ctime()) 14 time.sleep(self.value) 15 16 #创建进程对象 17 p = ClockProcess(2) 18 #启动新的进程 19 p.start() 20 p.join()
三.进程池技术
1.必要性:
[1]:进程的创建和销毁过程消耗的资源较多
[2]:当任务众多,每个任务在很短时间内完成时,需要频繁创建和销毁进程,此时对计算机压力较大
[3]:进程池技术很好的解决了以上问题
2.原理:
创建一定数量的进程类处理事件,事件处理完进程不退出而是继续处理其他事件,
直到所有事件全部处理完毕同一销毁。进价进程重新利用,降低资源消耗。
3.实现
【1】创建进程对象,放入适当的进程
from multiprocessing import pool
Pool(process)
功能:创建进程池对象
参数:指定进程数量,默认根据系统自动判断
【2】事件加入进程池队列执行
pool.apply_async(func,args,kwds)
功能:使用进程池执行func事件
参数:func 事件函数
args 元组 给func按位置传参
kwds 字典 给func按键值传参
返回值:返回函数事件对象
【3】关闭进程池
pool.close()
【4】回收进程池中的进程
pool.join()
【5】通过map添加进程池事件
pool.map(func,iter)
map执行自动调用func,参数为:执行iter的可迭代
功能:将要做的事件加入进程池
参数:func 事件函数
iter 迭代对象,将迭代值传给func
返回值:得到函数返回值列表
1 from multiprocessing import Pool 2 from time import sleep,ctime 3 4 #进程事件 5 def worker(msg): 6 sleep(2) 7 print(msg)#参数不同 8 return ctime() 9 #创建进程池 10 pool = Pool() 11 # pool = Pool(4)#同事启动四个进程 12 13 result = [] 14 #向进程池添加事件 15 for i in range(10): 16 msg = "hello %d"%i 17 r =pool.apply_async(func=worker,args=(msg,))#传给形参-->(msg) 18 result.append(r)#存储事件对象 19 20 #关闭进程池 21 pool.close() 22 #回收进程池 23 pool.join() 24 for i in result: 25 print(i.get())#通过对象get()可以获取事件函数返回值
1 from multiprocessing import Pool 2 import time 3 4 def fun(n): 5 time.sleep(1) 6 return n * n 7 pool = Pool() 8 #使用map将事件放入进程池 9 r = pool.map(fun,[1,2,3,4,5]) 10 pool.close() 11 pool.join() 12 13 print("结果",r) 14 #3s
四.进程间通信(IPC)
1.必要性:进程间空间独立,资源不共享,
此时在需要进程间数据传输时就需要特定的手段进行数据通信
2.进程间通信方法:
管道 消息队列 共享内存 信号 信号量 套接字
3.管道通信(Pipe)
[1]通信原理:在内存中开辟管道空间,生成管道操作对象,
多个进程使用同一个管道对象进行读写即可实现通信
[2]实现方法:
from multiprocessing import Pipe
fd1,fd2 = Pipe(duplex = True)
功能:创建管道
参数:默认True表示双向管道(都可读可写)
如果为False 表示单向管道
返回值:表示管道两端从的读写对象
如果是双向管道均可读写
如果是单向管道fd1只读 fd2只写
fd.recv()
功能:从管道获取内容
返回值:获取到的数据
fd.send(data)
功能:向管道写入内容
参数:要写入的数据
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 #创建管道对象 5 fd1,fd2 = Pipe() 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send(name) 11 12 13 jobs =[] 14 for i in range(5): 15 p = Process(target = fun,args = (i,)) 16 jobs.append(p) 17 p.start() 18 19 20 for i in range(5): 21 #读取管道内容 22 data = fd2.recv() 23 print(data) 24 25 26 for i in jobs: 27 i.join()
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 5 fd1,fd2 = Pipe() 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send({name:os.getpid()}) 11 12 jobs =[] 13 for i in range(5): 14 p = Process(target = fun,args = (i,)) 15 jobs.append(p) 16 p.start() 17 18 19 for i in range(5): 20 #读取管道内容 21 data = fd2.recv() 22 print(data) 23 24 25 for i in jobs: 26 i.join()
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 5 fd1,fd2 = Pipe(False) 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send({name:os.getpid()}) 11 12 jobs =[] 13 for i in range(5): 14 p = Process(target = fun,args = (i,)) 15 jobs.append(p) 16 p.start() 17 18 19 for i in range(5): 20 #读取管道内容 21 data = fd2.recv() 22 print(data) 23 24 25 for i in jobs: 26 i.join()
4.消息队列
【1】通信原理:在内存中建立通信模型,进程通过队列将消息存入,
或者从队列取出完成进程间通信
【2】实现方法:
from multiprocessing import Queue
q = Queue(maxsize=0)
功能:创建队列队列
参数:最多存放的消息个数
返回值:队列对象
q.put(data,[block,timeout])
功能:向队列存入消息
参数:data 要存入的内容
block 设置是否阻塞 ,默认True阻塞,False为非阻塞
timeout 超时检测
q.get([block,timeout])
功能:从队列取出消息
参数:data 要存入的内容
block 设置是否阻塞 ,默认True阻塞,False为非阻塞
timeout 超时检测
返回值:返回获取到的内容
q.full()判断队列是否为满
q.empty()判断队列是否为空
q.qsize()获取队列的消息个数
q.close()关闭队列
1 from multiprocessing import Queue,Process 2 from time import sleep 3 4 #创建消息对列 5 q = Queue(3) 6 #写 7 def fun1(): 8 for i in range(3): 9 sleep(1) 10 q.put((1,3)) 11 #读 12 def fun2(): 13 for i in range(4): 14 a,b = q.get(timeout=3) 15 print("sum=",a+b) 16 17 p1 = Process(target=fun1) 18 p2 = Process(target=fun2) 19 p1.start() 20 p2.start() 21 p1.join() 22 p2.join()
作业:1.父子进程共同复制一个文件,分别复制文件上半部分和下半部分到另一新文件中。
2.类的设计和函数传参
1 import os 2 3 filename = 'xl.txt' 4 5 #获取文件大小 6 size = os.path.getsize(filename) 7 8 # 父子进程共用一个文件对象偏移量会相互影响 9 # f = open(filename,'rb') 10 11 pid = os.fork() 12 if pid < 0: 13 print("Error") 14 elif pid == 0: 15 #复制上半部分 16 f = open(filename,'rb') 17 fw = open("1",'wb') 18 n = size // 2 19 while True: 20 if n < 1024: 21 data = f.read(n) 22 fw.write(data) 23 break 24 data = f.read(1024) 25 fw.write(data) 26 n -= 1024 27 f.close() 28 fw.close() 29 else: 30 #复制下半部分 31 f = open(filename,'rb') 32 fw = open("2",'wb') 33 f.seek(size//2,0) 34 while True: 35 data = f.read(1024) 36 if not data: 37 break 38 fw.write(data) 39 f.close() 40 fw.close() 41 42