几个模块:
队列-----multiprocess.Queue(重要)
进程池-------multiprocess.Pool(重要)
信号量 —— multiprocess.Semaphore(挺重要)
事件 —— multiprocess.Event(挺重要)
队列的扩展------multiprocess.JoinableQueue(挺重要)
管道---------multiprocess.pipe(挺重要)
数据共享------multiprocess.Mange(挺重要)
锁 —— multiprocess.Lock(挺重要)
为什么要有进程?
合理利用cpu,提高用户体验
什么是进程?
计算机中程序关于某集合上的一次运行活动,是系统进行资源分配的基本单位.
就是为了形容执行中的程序的一种称呼.
操作系统中资源分配的最小单位
进程之间数据是隔离的,占用操作系统资源相对多
独立存在的
进程的三大特性:
就绪:创建完成
阻塞:遇到IO
运行:系统调度
进程的调度?
要想要多个程序交替运行,操作系统必须对这些进程进行调度,这个调度也不是随机进行的,而是要遵循一定的法则---->调度法则
1.先来先服务
2.短作业优先调度优先算法
3.时间片轮转法
4.多级反馈队列(合理)
进程的 并行与并发和区别?
并行:两者同时执行,例如赛跑,都在跑,(资源够用)
并发:资源有限的情况下,两者交替轮流使用资源,目的是提高效率
区别:并行:从微观上看,也就是一个精确的时间片刻,有不同的程序在执行,这就必须要有多个处理器.
并发:从宏观上,在一个时间段上可以看出是同时执行的.
什么是阻塞:
recv,recvfrom,accept 这三个才是真正意义上的阻塞,
程序由不符合某个条件或者要等待某个条件满足在某个地方进行的等待状态.
IO操作:
简单来说: input:(输入到内存):read,load,accept,recvfrom,input output:(从内存输出):wlite,dump,send,connect,sendto,print
同步和异步:
同步:不是同时,按顺序做某些事,简单来说,就是N件事,做完一件,在做另一件.
异步:n件事,宏观上是同时进行,简单来说,同时做几件事,分配各自时间段都做,各不相干
进程的创建与结束:
创建:
1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)
无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的
结束:
1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出错退出(自愿,python a.py中a.py不存在)
3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
4. 被其他进程杀死(非自愿,如kill -9)
multiprocess 模块
仔细来说他并不是一个模块,而是python中的一个包,之所以叫multi是取自multiple,的多功能的意思
这个包中几乎包含了和进程有关的所有模块,为了方便记忆,可以大致将这几个部分分为四个部分,:创建进程部分,进程同步部分,进程池部分,进程之间的数据共享.
multiprocess.process
process: 是一个创建进程的模块,借助这个模块,就可以完成进程的创建
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.name = '' self.daemon = False self.authkey = None self.exitcode = None self.ident = 0 self.pid = 0 self.sentinel = None强调:
1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 5 name为子进程的名称
注意:
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,
使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start会自动调用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主线程')
进程间的数据隔离:
# n = [100] # def sub_n(): # global n # 子进程对于主进程中的全局变量的修改是不生效的 # n.append(1) # print('子进程n : ',n) # 子进程n : [100, 1] # if __name__ == '__main__': # p = Process(target = sub_n) # p.start() # p.join() # 阻塞 直到子进程p结束 # print('主进程n : ',n) # 子程序会新开辟一个空间,两者时间是独立的 ,
所以子程序该的只是他自己复制过去的那份数据 # 主进程n : [100]
守护进程:(设置子进程为守护进程,守护进程会随着主进程代码的结束而结束)
def alive():
while True:
print('连接监控程序,并且发送报活信息')
time.sleep(0.6)
def func():
'主进程中的核心代码'
# while True:
print('选择的项目')
time.sleep(1)
print('根据用户的选择做一些事儿')
if __name__ == '__main__':
p = Process(target=alive)
p.daemon = True # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
p.start()
p = Process(target=func)
p.start()
p.terminate()
time.sleep(0.2)
print(p.is_alive())
# 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
# 由于主进程要负责给所有的子进程收尸,所以主进程必须是最后结束,
守护进程只能在2主进程的代码结束之后就认为主进程结束了
# 守护进程在主进程的代码结束之后就结束了,不会等待其他子进程结束
socket聊天并发的例子:
sever端
import socket,time from multiprocessing import Process def talk(conn): conn, addr = sk.accept() print(conn) while True: msg = conn.recv(1024).decode() time.sleep(10) conn.send(msg.upper().encode()) if __name__ == '__main__': # 这句话下面的所有代码都只在主进程中执行 sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() Process(target=talk,args=(sk,)).start()
clinet端:
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'hello') print(sk.recv(1024))
锁 —— multiprocess.Lock
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
def search(name):
# 查询余票的功能
with open('ticket') as f:
dic = json.load(f)
print(name , dic['count'])
def buy(name):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['count'] > 0:
print(name,'买到票了')
dic['count'] -=1
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
def get_ticket(name,lock):
search(name)
lock.acquire() # 只有第一个到达的进程才能获取锁,剩下的其他人都需要在这里阻塞
buy(name)
lock.release() # 有一个人还锁,会有一个人再结束阻塞拿到钥匙
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=get_ticket,args=('name%s'%i,lock))
p.start()
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,
即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
进程之间的两种通信:(Queue和pipe)
队列-----multiprocess.Queue(重要) IPC----- Inter-Process Communication
概念:创建共享的进程队列,Queue是多进程安全的队列,可以它实现多进程之间的数据传递
方法介绍 :
Queue([maxsize]) # 设置队列中的最大存放数据量,不写默认为随便多少
Queue实例化的对象具有以下几种方法:
1. q.get() # 从队列中获取
2.q put() # 可以向队列中放东西
这三个都不够准确,因为队列有时候是动态的
3.q.qsize() 队列的大小
4 .q.full() 是否满了 满返回True
5.q.empty() 是否空了 空返回True
import time from multiprocessing import Queue,Process def producer(name,food,num,q): '''生产者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生产了%s'%(name,foodi)) q.put(foodi) def consumer(name,q): while True: food = q.get() # 等待接收数据 if food == None:break print('%s吃了%s'%(name,food)) time.sleep(1) if __name__ == '__main__': q = Queue(maxsize=10) p1 = Process(target=producer,args = ('宝元','泔水',20,q)) p2 = Process(target=producer,args = ('战山','鱼刺',10,q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) p1.start() # 开始生产 p2.start() # 开始生产 c1.start() c2.start() p1.join() # 生产者结束生产了 p2.join() # 生产者结束生产了 q.put(None) # put None 操作永远放在所有的生产者结束生产之后 q.put(None) # 有几个消费者 就put多少个None
管道---------multiprocess.pipe(不太重要)
例子:
from multiprocessing import Queue,Process def consumer(q): print( '子进程 :', q.get() # 从管道中获取 ) if __name__ == '__main__': q = Queue() p = Process(target=consumer,args=(q,)) p.start() q.put('hello,world') # 发送到管道
数据共享------multiprocess.Mange(不太重要)
进程之间数据是独立的,可以借助队列或者管道进行通信,二者都是基于消息传递的,
虽然进程数据是独立的,但是通过Manager实现数据共享,有人说他的功能远不止于此
from multiprocessing import Manager,Process,Lock def func(dic,lock): with lock:# 不加锁操作共享数据,肯定会出现错乱 dic['count']-=1 if __name__ == '__main__': lock=Lock() with Manager()as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=func,args=(dic,lock)) p_l.append((p)) p.start() for i in p_l: p.join() print(dic)
进程池-------multiprocess.Pool(重要)
池 :固定几个进程,执行无限任务,目的是节省创建和回收和切换任务的时间,可以有返回值.
常规的话:一个任务一个进程,每个任务都需要系统来分配资源创造进程,并且任务结束后,还要回收资源,多个进程切换也需要时间,进程池就可以避免多次创建和回收的时间.
参数介绍:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
主要方法:
p.apply()
p.apply_async(func,args,callback)
p.close() 关闭进程池
p.jion() 等待所有工作进程退出.此方法只能在close(),或teminate()之后调用
向进程池提交任务的三种方式:
同步 apply(没什么用)
异步提交: map(简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会作为参数被传递给子进程),能够传递的参数是有限的,所以比起apply_async限制性比较强
异步提交:apply_async ():首先 apply_async提交的任务和主进程完全异步,
可以通过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成
也可以通过get获取返回值的方式,来等待任务的返回值
异步 map
from multiprocessing import Pool # 池 def func(i): i -= 1 return i**2 # 你的池中打算放多少个进程,个数cpu的个数 * 1|2 if __name__ == '__main__': p = Pool(5) # 创建进程的个数 ret = p.map(func,range(100)) # 自动带join print(ret) # 可以获取函数的返回值
异步apply_async
import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(1) return i**2 # 你的池中打算放多少个进程,个数cpu的个数 * 1|2 if __name__ == '__main__': p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,)) # 异步的 apply_async异步提交任务 l.append(ret) for ret in l: print(ret.get())
回调函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:
我算完了,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
import os import time import random from multiprocessing import Pool # 池 def func(i): # [2,1,1,5,0,0.2] i -= 1 time.sleep(random.uniform(0,2)) return i**2 def back_func(args): print(args,os.getpid()) if __name__ == '__main__': print(os.getpid()) p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,),callback=back_func) # 5个任务 p.close() p.join()
主动执行func,然后在func执行完毕之后的返回值,直接传递给back_func作为参数,调用back_func
处理池中任务的返回值
回调函数是由谁执行的? 主进程