Process
进程之间的数据隔离问题
守护进程 报活
几个进程模型 ----- 进程同步工具
有先后顺序的 就是同步
进程之间 就是异步
希望原本异步的多进程操作,维持一个顺序 --- 同步工具
1 锁 Lock 重要 买票
2信号量 Semaphore ktv
3事件 Event 信号灯
一 进程之间的数隔离
进程与进程之间的数据是隔离的
内存空间是不能共享的
所以要想进行通信,必须借助其他手段
且这两个进程都是自愿的
子进程的执行结果父进程获取不到
父进程依赖子进程的执行结果
父进程如何获取子进程的执行结果:父进程之间通过socket通信
守护进程:
会随着主进程的结束而结束
主进程创建守护进程:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,负责抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是相互独立的,主进程代码运行结束,守护进程随即终止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p.start() time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id print('主') 守护进程的启动
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止. 主进程代码执行结束守护进程立即结束
import time from multiprocessing import Process # 例一 # def func1(): # print('begin') # time.sleep(3) # print('wahaha') # if __name__ == '__main__': # p = Process(target=func1) # p.daemon = True # # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程 # # 设置守护进程的操作应该在开启子进程之前 # p.start() # # time.sleep(1) # print('主进程')
# 例二 def func1(): print('begin') time.sleep(3) print('wahaha') def func2(): while True: print('in func2') time.sleep(0.5) if __name__ == '__main__': Process(target=func1).start() p = Process(target=func2) p.daemon = True # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程 # 设置守护进程的操作应该在开启子进程之前 p.start() time.sleep(1) print('主进程')
设置守护进程之后,会有什么效果呢?
守护进程会在主进程的代码执行完毕后直接结束,无论守护进程是否执行完毕
应用:
报活 主进程还活着
100台机器 100个进程 10000进程
应用是否在正常工作 ----- 任务管理器来查看
守护进程如何向监测机制报活呢?
为什么要用守护进程来报活呢?为什么不用主进程来工作呢??
守护进程报活几乎不占用cpu ,也不需要操作系统去调度
主进程能不能严格的每60s就发送一条信息
使用并发的实例:
socet聊天并发实例:
所有的进程的基本使用
# 进程 : 同一时刻可以做多件事情 互相之间不影响
# socket tcp server
# 采用多进程的知识点 来解决原生socket同一时刻只能和一个conn通信的弊端
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start进程一定要写到这下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多进程实现socket聊天并发-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) client端
import socket from multiprocessing import Process def talk(conn): try: while True: conn.send(b'hello') print(conn.recv(1024)) finally: conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9091)) sk.listen() try: while True: conn,addr = sk.accept() Process(target=talk,args=(conn,)).start() finally: sk.close()
server
import socket import os sk = socket.socket() sk.connect(('127.0.0.1',9091)) while True: print(sk.recv(1024)) sk.send(str(os.getpid()).encode('utf-8'))
client
进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
锁 —— multiprocess.Lock
对它的认识是数据的保护,在多人同时使用一个数据库时查看用不到,但是稍微更改一点点就需要一个一个来改,不能一起更改,会出现数据的错误,使用时的大方向是多人,但是只能一个一个的确认后才能执行接下来的人:
多进程共享一段数据的时候,数据会出现不安全的现象:
需要锁来维护数据的安全性:
lock= Lock()
lock.acquire() # 拿钥匙
print(111)
lock.release() # 还钥匙
lock.acquire() # 阻塞
print(222)
from multiprocessing import Lock from multiprocessing import Process # 锁 # lock = Lock() # 创造了一把锁 # lock.acquire() # 获取了这把锁的钥匙 # lock.release() # 归还这把锁的钥匙 # 抢票的例子 # 每个人都能 # 查看余票 # 买票 import json import time from multiprocessing import Lock from multiprocessing import Process def search(i): with open('db','r') as f:count_dic = json.load(f) time.sleep(0.2) print('person %s 余票 : %s张'%(i,count_dic['count'])) def buy(i): with open('db','r') as f:count_dic = json.load(f) time.sleep(0.2) if count_dic['count'] > 0: count_dic['count'] -= 1 print('person %s 购票成功'%i) time.sleep(0.2) with open('db','w') as f:json.dump(count_dic,f) def task(i,lock): search(i) lock.acquire() # 如果之前已经被acquire了 且 没有被release 那么进程会在这里阻塞 buy(i) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=task,args=(i,lock)) p.start()
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start() 多进程抢占输出资源
# 由并发变成了串行,牺牲了运行效率,但避免了竞争 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start() 使用锁维护执行顺序
上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。
接下来,我们以模拟抢票为例,来看看数据安全的重要性。
#文件db的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 #并发运行,效率高,但竞争写同一文件,数据写入错乱 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print(' 33[43m剩余票数%s 33[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模拟读数据的网络延迟 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open('db','w')) print(' 33[43m购票成功 33[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task) p.start() 多进程同时抢购余票
#文件db的内容为:{"count":5} #注意一定要用双引号,不然json无法识别 #并发运行,效率高,但竞争写同一文件,数据写入错乱 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print(' 33[43m剩余票数%s 33[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模拟读数据的网络延迟 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模拟写数据的网络延迟 json.dump(dic,open('db','w')) print(' 33[32m购票成功 33[0m') else: print(' 33[31m购票失败 33[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) p.start() 使用锁来保证数据安全
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
信号量:
# 信号量的本质
# 多把钥匙对应一把锁
# lock+count计数
它是锁的派生,原理是锁+计数器:
可以控制多人的修改,但是修改的分别是不同的修改物'
# from multiprocessing import Process # from multiprocessing import Semaphore # 信号量 # ktv # 4个小房子 # 10个人站在房子外面要进去玩儿 # sem = Semaphore(3) # sem.acquire() # print(1) # sem.acquire() # print(2) # sem.acquire() # print(3) # sem.release() # sem.acquire() # 阻塞 # print(4) import time import random from multiprocessing import Process,Semaphore def ktv(num,sem): sem.acquire() print('person%s进入了ktv' % num) time.sleep(random.randint(1,4)) print('person%s走出了ktv' % num) sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv,args=(i,sem)) p.start()
互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
from multiprocessing import Process,Semaphore import time,random def go_ktv(sem,user): sem.acquire() print('%s 占到一间ktv小屋' %user) time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同 sem.release() if __name__ == '__main__': sem=Semaphore(4) p_l=[] for i in range(13): p=Process(target=go_ktv,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() print('============》') 例子
事件,
事件的会将事物分的更为细致:
is_set 查看此时的状态 状态有两种True 和Flase
set 设置为True
clear 设置为Flase
wait(参数) 在不设置参数时,是True为pass,而Flase阻塞,可以通过改变来改变
默认是阻塞,可以设置阻塞的时间按秒来进行,设置后时间过后不论是True或Flase都会pass
# 并发的时候 # 很多模型 # 事件 from multiprocessing import Event,Process # wait() 方法 等待 # 阻塞 如果这个标志是False 那么就阻塞 # 非阻塞 如果这个标志是True 那么就非阻塞 # 查看标志 is_set() # 修改标志 set()将标志设置为True #clear() 将标志设置为False # e = Event() # print(e.is_set()) # 在事件的创建之初 默认是False # e.set() # 将标志设置为True # print(e.is_set()) # e.wait() # 相当于什么都没做pass # e.clear() # 将标志设置为False # # e.wait() # 永远阻塞 # e.wait(timeout=10) # 如果信号在阻塞10s之内变为True,那么不继续阻塞直接pass, # # 如果就阻塞10s之后状态还是没变,那么继续, # print(e.is_set()) # 无论前面的wait的timeout是否通过,我的状态都不会因此改变 # 红绿灯模型 # 控制交通灯的进程 import time import random def traffic_light(e): print(' 33[1;31m 红灯亮 33[0m') while True: time.sleep(2) if e.is_set(): print(' 33[1;31m 红灯亮 33[0m') e.clear() else: print(' 33[1;32m 绿灯亮 33[0m') e.set() # 车 等或者通过 def car(id,e): if not e.is_set(): print('car %s 等待' % id) e.wait() print('car %s 通过'%id) def police_car(id,e): if not e.is_set(): e.wait(timeout = 0.5) print('police car %s 通过' % id) # 主进程 启动交通控制灯 启动车的进程 if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args=(e,)) p.start() car_lst = [car,police_car] for i in range(20): p = Process(target=random.choice(car_lst), args=(i,e)) p.start() time.sleep(random.randrange(0,3,2))
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。 clear:将“Flag”设置为False set:将“Flag”设置为True 事件介绍
总结:
进程之间是数据隔离的
进程与进程之间是不能自由的交换内存数据的
全局变量在子进程中修改,其他进程是感知不到的
守护进程
特点 生命周期值和主进程的代码有关系,和其他子进程没关系
用处 报活
多进程启动tcp协议的socket来完成并发
进程的同步控制 -进程之间有一些简单的信号传递,但是用户并不能感知,且用户不能传递自己想传递的内容
锁
信号量 10个进程 4把钥匙
锁加 计数器实现的
事件 wait