守护进程
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止(主进程和子进程是异步的),当主进程停止,该守护进程不在继续执行.守护进程也是一种子进程.
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止.(但本质上是在主进程结束之前结束的,主进程需要负责回收资源)
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process import time import os def func(num): print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}') while True: print('is alive') time.sleep(0.5) def wahaha(): i = 0 while i < 5: i += 1 print(f'第{i}秒') time.sleep(1) if __name__ == '__main__': Process(target=wahaha).start() #子进程在主进程结束后仍然正常执行 p = Process(target=func,args=(1,)) p.daemon = True #主进程结束,该守护进程结束 p.start() time.sleep(3) print(f'pid:{os.getpid()},ppid:{os.getppid()}') print('主进程结束') ============================= 第1秒 1,pid:8200,ppid:2000 is alive is alive 第2秒 is alive is alive 第3秒 is alive is alive pid:2000,ppid:7244 主进程结束 第4秒 第5秒
多进程中的方法
p = Process(target=func,args=(1,)) #创建一个进程对象
p.start() 启动一个进程
p.daemon = True 设置进程为守护进程,随主进程结束而结束.
p.is_alive() 判断进程是否存活,返回bool值
p.terminate() 发送给操作系统指令,关闭进程
p.pid() 查看进程pid

from multiprocessing import Process import time import os def func(num): print(f'{num},pid:{os.getpid()},ppid:{os.getppid()}') while True: print('is alive') time.sleep(0.5) def wahaha(): i = 0 while i < 10: i += 1 print(f'第{i}秒') time.sleep(1) if __name__ == '__main__': p2 = Process(target=wahaha) p2.start() # p = Process(target=func,args=(1,)) p.daemon = True #主进程结束,该子进程结束 p.start() time.sleep(3) print(p.is_alive()) print(p2.is_alive()) p2.terminate() time.sleep(0.1) print(p.is_alive()) print(p2.is_alive()) print(f'pid:{os.getpid()},ppid:{os.getppid()}') print('主进程结束') p2.join()

import socket from multiprocessing import Process def talk(conn,addr): while True: msg_r = conn.recv(1024).decode('utf-8') print(addr,msg_r) msg_s = 'client{}登陆'.format(addr) conn.send(msg_s.encode('utf-8')) conn.close() if __name__ == '__main__': sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) sk.bind(('127.0.0.1',8091)) sk.listen(5) try: while True: conn,addr = sk.accept() Process(target=talk,args=(conn,addr)).start() finally: sk.close()

import socket sk = socket.socket() sk.connect(('127.0.0.1',8091)) while True: msg_s = input('请输入内容:') sk.send(msg_s.encode('utf-8')) msg_r = sk.recv(1024).decode('utf-8') print(msg_r) sk.close()
进程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event)
锁 multiprocessing.Lock (*****)
避免同一段代码被多个进程同时执行
lock = Lock() 创建锁对象
lock.acquire() 查询钥匙,如果有就拿走,如果没有就等待
lock.release() 归还钥匙
lock可以使用with上下文进行管理(类似于文件读取)
with lock:
print('hello' )

维护数据的安全
降低了程序的效率
所有的效率都是建立在数据安全的角度上的
但凡涉及到并发编程都要考虑数据的安全性
我们需要在并发部分对数据修改的操作格外小心,如果会涉及到数据的不安全,就需要进行加锁控制
lock acquire release的另外一种用法
lock 内部实现了进程之间的通信,使得谁acquire了谁release了能够在多个拥有lock参数的子进程中透明

from multiprocessing import Lock lock = Lock() #创建一个锁对象 lock.acquire() #想拿钥匙,如果有就拿,没有就一直等 print('拿到要钥匙了1') lock.release() #还钥匙 lock.acquire() #想拿钥匙 print('拿到要钥匙了2') lock.release() #还钥匙

#db文件内容 {"count": 0} import json import time from multiprocessing import Process,Lock def search(i): f =open('db') ticket_dic =json.load(f) f.close() print(f"{i} 正在查票,剩余票数{ticket_dic['count']}") def buy(i): with open('db') as f: ticket_dic = json.load(f) time.sleep(0.2) if ticket_dic['count'] > 0: ticket_dic['count'] -= 1 print(f'{i} 买到票了') time.sleep(0.2) with open('db','w') as f :json.dump(ticket_dic,f) else: print(f"{i} 太火爆被抢购一空了,剩余票数{ticket_dic['count']}") # def get_ticket(i,lock): # search(i) # lock.acquire() # buy(i) # lock.release() def get_ticket(i,lock): search(i) with lock: buy(i) if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=get_ticket,args=(i,lock)) p.start()
信号量(标志True False) multiprocessing.Semaphore(***) (锁+计数器)
有多个钥匙的锁

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。 假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。 实现: 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。 信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
sem = Semaphore(4) 创建锁对象,4把钥匙,可以被连续acquire4次
sem.acquire() 查询钥匙,如果有就拿走,如果没有就等待
sem.release() 归还钥匙
sem 可以使用with上下文进行管理(类似于文件读取)
with sem:
print('hello' )

from multiprocessing import Semaphore sem = Semaphore(4) #4把钥匙 sem.acquire() print(1) sem.acquire() print(2) sem.release() sem.acquire() print(3) sem.acquire() print(4) sem.acquire() print(5) sem.acquire() print(6)

from multiprocessing import Semaphore,Process import time import random # def ktv(sem,i): # sem.acquire() # print(f'{i}走进ktv') # time.sleep(random.randint(1,3)) # print(f'{i}走出ktv') # sem.release() def ktv(sem,i): with sem: print(f'{i}走进ktv') time.sleep(random.randint(1,3)) print(f'{i}走出ktv') if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv,args=(sem,i)) p.start()
事件 multiprocessing.Event(**)
控制子进程执行还是阻塞的一个机制
e = Event() 创建一个事件对象
Event方法 在事件中有一个信号(标志)
wait() 如果这个标志是True wait的执行效果就是pass ,如果是False,wait方法的效果就是阻塞,直到这个标志变成True
控制标志方法
is_set() 判断标志的状态,返回bool值
set() 将标志设置为True
clear() 将标志设置为False

from multiprocessing import Event e = Event() #阻塞,事件的创建之初标志的状态是False print(e.is_set()) e.set() #将标志改为True print(e.is_set()) e.wait() #当标志为True是pass,不阻塞

from multiprocessing import Event,Process import time def func1(e): print('start func1') print(e.is_set()) #事件创建之初是False e.wait(1) #不修改状态(网络测试,发送短信,发送邮件),超时后继续执行,不继续阻塞 print(e.is_set()) e.wait() #持续阻塞 print(e.is_set()) #主进程3(异步)s后修改信号标志为True ,继续执行 print('end func1') if __name__ == '__main__': e = Event() Process(target=func1,args=(e,)).start() time.sleep(3) e.set()

from multiprocessing import Event,Process import time import random def tarffic_light(e): while True: while e.is_set(): print('