python并发编程之进程
part 1:
操作系统基础--I/O操作
I/O操作:相对内存来说
- 输入Input输出Output
- 输入是怎么输入: 键盘input
ead
ecv
- 输出是怎么输出: 显示器打印机播放音乐printwritesend
- 文件操作: read write
- 网络操作: send recv recvfrom
- 函数:print input
计算机的工作分为两个状态
- CPU工作:做计算(对内存中的数据进行操作)的时候工作
- CPU不工作:IO操作的时候
- CPU的工作效率500000条指令/ms
多道操作系统/分时操作系统
多道操作系统:一个程序遇到IO就把CPU让给别人
- 顺序的一个一个执行的思路变成
- 共同存在在一台计算机中,其中一个程序执行让出cpu之后,另一个程序能继续使用cpu
- 来提高cpu的利用率+
- 单纯的切换会不会占用时间:会
- 但是多道操作系统的原理整体上还是节省了时间,提高了CPU的利用率
时空复用的概念
分时操作系统:
- 单cpu分时操作系统:把时间分成很小很小的段,每一个时间都是一个时间片,
- 每一个程序轮流执行一个时间片的时间,自己的时间片到了就轮到下一个程序执行--时间片的轮转
- 没有提高CPU的利用率提高了用户体验
例子:
老教授24h全是计算没有io
先来先服务FCFS
研究生5min全是计算没有io
短作业优先
研究生25min全是计算没有io
并发概念
- 进程:进行中的程序就是一个进程
- 占用资源需要操作系统调度
- pid:能够唯一标识一个进程
- 计算机中最小的资源分配单位
- 线程: 首先由进程开辟空间等,线程是执行具体的代码
- 线程是进程中的一个单位,不能脱离进程存在
- 线程是计算机中能够被CPU调度的最小单位
- 并发:
- 多个程序同时执行:只有一个cpu,多个程序轮流在一个cpu上执行
- 宏观上:多个程序在同时执行
- 微观上:多个程序轮流在一个cpu上执行本质上还是串行
- 并行:
- 多个程序同时执行,并且同时在多个CPU上执行(能不能利用多核)
- 同步:
- 在做A事情的时候发起B件事,必须等待B件事情结束之后才能继续做A件事情
- 异步:
- 在做A事的时候发起B时间,不需要等待B事件结束就可以继续A事件
- 阻塞:
- 如果CPU不工作 input accept recv recv from sleep connect
- 非阻塞:
part 2:
进程的概念
- 进程的三状态图:
- windows操作系统下
- Linux系统操作下:
multiprocessing模块:
- 为什么要用if__name__=='main'?
- 能不能给子进程传递参数?能
- 能不能获取子进程的返回值?不能
- 能不能同时开启多个子进程?可以
- 同步阻塞异步非阻塞
- 同步阻塞:join()
- 异步非阻塞:start()
from multiprocessing import Process
import os
import time
def func(name,age):
print(f'{name}start')
time.sleep(1)
print(os.getpid(),os.getppid(),name,age)#pid(进程id):processidppid(父进程id):parentprocessid
#print('123')
if __name__=='__main__':
#只会在主进程中执行的所有的代码写在name=main下
print('main:',os.getpid(),os.getppid())
arg_lst=[('alex',84),('taibai',73),('wusir',96)]
for arg in arg_lst:
p=Process(target=func,args=arg)#可以给子进程传递参数
p.start()#开启子进程,异步非阻塞
#p=Process(target=func,args=('haha',23))#可以开启多个子进程
#p.start()
join()的用法:
from multiprocessing import Process
import os
import time
def func(name,age):
print(f'发送一封邮件给{age}的{name}')
time.sleep(1)
print('发送完毕')
if __name__=="__main__":
arg_lst=[('alex',84),('taibai',73),('wusir',96)]
p_lst=[]
for arg in arg_lst:
p=Process(target=func,args=arg)
p.start()
p_lst.append(p)
for p in p_lst:
p.join()#阻塞:直到p这个进程执行完毕才继续执行代码
print('所有邮件已经发送完毕')#要求:这句话最后打印,并且要求上面的能够异步执行
多进程之间的数据是否隔离
from multiprocessing import Process
n=0
def func():
global n
n += 1
if __name__=='__main__':
p_l=[]
for i in range(50):
p=Process(target=func)
p.start()
p_l.append(p)
for p in p_l:
p.join()
print(n)#0对主进程进行了隔离
使用多进程实现一个并发的socketserver
- 服务器端:
import socket
from multiprocessing import Process
def talk(conn):
while True:
msg=conn.recv(1024).decode('utf-8')
ret=msg.upper().encode('utf-8')
conn.send(ret)
conn.close()
if __name__=='__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',9001))
sk.listen()
while True:
conn,addr=sk.accept()
Process(target=talk,args=(conn,)).start()# args为元组
sk.close()
- 客户端:
import socket
import time
sk=socket.socket()
sk.connect(('127.0.0.1',9001))
while True:
sk.send(b'hello')
msg = sk.recv(1024).decode('utf-8')
print(msg)
time.sleep(0.5)
sk.close()
part 3:
开启进程的例外一种方式
- 面向对象的方法,通过继承和重写run方法完成了启动子进程
- 通过重写init和调用父类的inin完成了给子进程传参数
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
self.a=a
self.b=b
self.c=c
super().__init__()#调用上级的__init__方法
def run(self):#实现run的同名方法
time.sleep(1)
print(os.getppid(),os.getpid(),self.a)
if __name__=='__main__':
print('主进程-->',os.getpid())
for i in range(10):
p=MyProcess(1,2,3)#传参数
p.start()
Process类的其他方法
name pid ident daemon(在start之前设置)
terminate() is_alive()
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,a,b,c):#如果需要传参数,需要写__init__方法
self.a=a
self.b=b
self.c=c
super().__init__()#调用上级的__init__方法
def run(self):#实现run的同名方法
time.sleep(1)
print(os.getppid(),os.getpid(),self.a)
if __name__=='__main__':
p=MyProcess(1,2,3)
p.start()
print(p.pid, p.ident)#查看子进程的pid 22672 22672(同)
print(p.name)#查看子进程的名字
print(p.is_alive())#查看进程是否活着
p.terminate()#强制结束一个子进程异步非阻塞
print(p.is_alive()) #True?因为操作系统响应需要一定时间
time.sleep(0.01)
print(p.is_alive())
守护进程
- 守护进程
- 在start一个进程之前设置daemon=True
- 守护进程会等待主进程的代码结束就立即结束
- 为什么守护进程只守护主进程的代码?而不是等主进程结束之后才结束
- 因为主进程要最后结束,为了给守护进程回收资源
- 守护进程会等待其他子进程结束么?不会
import time
from multiprocessing import Process
def son1():
while True:
print('--->inson1')
time.sleep(1)
def son2():#10s
for i in range(10):
print('inson2')
time.sleep(1)
if __name__=='__main__':#3s
p1=Process(target=son1)
p1.daemon=True#表示设置p1是一个守护进程需要在start之前设置
p1.start()
p2=Process(target=son2)
p2.start()
time.sleep(3)
print('>>>>inmain')
p2.join()#等待p2结束之后才结束
- 等待p2结束-->主进程的代码才结束-->守护进程结束
- 主进程会等待所有的子进程结束,是为了回收子进程的资源
- 守护进程会等待主进程的代码执行结束之后再结束,而不是等待整个主进程结束
- 主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关
进程同步---Lock 锁 *****
锁:
import time
from multiprocessing import Lock,Process
def func(i,lock):
lock.acquire()#拿钥匙
print(f'{i}被锁起来的代码')
#time.sleep(1)
lock.release()#还钥匙
if __name__=='__main__':
lock=Lock()
for i in range(10):
p=Process(target=func,args=(i,lock))
p.start()
抢票的例子:加锁
import json
import time
from multiprocessing import Process,Lock
def search(i):
#查询余票
with open('ticket',encoding='utf-8') as f:
ticket=json.load(f)
print(f'{i}:当前的余票为{ticket["count"]}张')
def buy_ticket(name):
#查询余票
with open('ticket',encoding='utf-8') as f:
ticket=json.load(f)
if ticket['count']>0:
ticket['count'] -= 1
print(f'{name}买到票了')
time.sleep(0.1)
with open('ticket',mode='w',encoding='utf-8') as f:
json.dump(ticket,f)
def get_ticket(i,lock):
search(i)
with lock:#代替acquire和release并在此基础上做一些异常处理,保证即便一个进程的代码出错了,也会归还钥匙
buy_ticket(i)
#lock.acquire()
#buy_ticket(i)
#lock.release()
if __name__=='__main__':
lock=Lock()#互斥锁
for i in range(10):
p=Process(target=get_ticket,args=(i,lock))
p.start()
为啥叫互斥锁?
from multiprocessing import Lock#互斥锁不能再同一个进程中连续acquire多次
lock=Lock()
lock.acquire()
print(1) # 只能打印一个
lock.acquire()
print(2)
进程之间数据隔离---队列
- 进程之间通信(IPC) Inter Process Communication
- 生产者消费者模型
- 基于文件:同一台机器上的多个进程之间的通信
- 基于网络:同一台机器或者多台机器上的多进程间通信
- 第三方工具(消息中间件)
- memcache
- redis
- rabbitmq
- kafka
from multiprocessing import Queue, Process
def pro(q):
for i in range(11):
print('--->',q.get())
def son(q):
for i in range(10):
q.put(f'hello{i}')
if __name__=='__main__':
q=Queue()
Process(target=son,args=(q,)).start()
Process(target=pro,args=(q,)).start()
- 生产者消费者模型背下来
- 本质:就是让生产数据和消费数据的效率达到平衡并且最大化的效率
from multiprocessing import Queue,Process
import time
import random
def consumer(q,name):#消费者:通常取到数据之后还要进行某些操作
while True:
food=q.get()
if food:
print(f'{name}吃了{q.get()}')
else:
break
def producer(q,name,food):#生产者:通常再放数据之前先通过某些代码来获取数据
for i in range(10):
foodi = f'{food}{i}'
print(f'{name}生产了{foodi}')
time.sleep(random.random())
q.put(foodi)
if __name__ == '__main__':
q=Queue()
c1=Process(target=consumer,args=(q,'alex'))
p1=Process(target=producer,args=(q,'大壮','苹果'))
p2=Process(target=producer,args=(q,'b哥','梨子'))
c1.start()
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)#有多少个消费者就put多少个None进来