############### 守护进程 ##############
""" 守护进程 父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。 主进程创建守护进程 其一:守护进程会在主进程代码执行结束后就终止 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止 """ # 第一版:主进程结束了,子进程还没有结束, # import time # from multiprocessing import Process # # def func(): # while True: # time.sleep(1) # print("我还活着") # # # if __name__ == '__main__': # p=Process(target=func) # p.start() # i = 0 # while i<10: # time.sleep(1) # i+=1 # print("主进程结束") # 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束, import time from multiprocessing import Process def func(): while True: time.sleep(1) print("我还活着") if __name__ == '__main__': p = Process(target=func) p.daemon = True # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程 p.start() i = 0 while i < 5: time.sleep(1) i += 1 print("主进程代码结束")
其他的方法:
from multiprocessing import Process import time def func(name): print("%s在test...."%name) if __name__ == "__main__": p = Process(target=func,args=("andy",)) p.start() print(p.is_alive()) # # 判断一个进程是否活着 p.terminate() # 结束一个进程, time.sleep(1) print(p.is_alive())
################## 进程锁 #####################
""" 互斥锁:
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理, 他们之间的运行没有顺序,一旦开启也不受我们控制。 尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。 当多个进程使用同一份数据资源的时候,就会因为竞争而引发数据安全或顺序混乱问题。 """
下面的代码演示了不同的任务争抢一个资源(终端输出)的场景。
from multiprocessing import Process import time import random def task1(): print('这是 task1 任务'.center(30, '-')) print('task1 进了洗手间') time.sleep(random.randint(1, 3)) print('task1 办事呢...') time.sleep(random.randint(1, 3)) print('task1 走出了洗手间') def task2(): print('这是 task2 任务'.center(30, '-')) print('task2 进了洗手间') time.sleep(random.randint(1, 3)) print('task2 办事呢...') time.sleep(random.randint(1, 3)) print('task2 走出了洗手间') def task3(): print('这是 task3 任务'.center(30, '-')) print('task3 进了洗手间') time.sleep(random.randint(1, 3)) print('task3 办事呢...') time.sleep(random.randint(1, 3)) print('task3 走出了洗手间') if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p1.start() p2.start() p3.start() """ ---------这是 task1 任务---------- task1 进了洗手间 ---------这是 task2 任务---------- task2 进了洗手间 ---------这是 task3 任务---------- task3 进了洗手间 task3 办事呢... task1 办事呢... task3 走出了洗手间 task2 办事呢... task2 走出了洗手间 task1 走出了洗手间 """
通过加锁来控制
from multiprocessing import Process, Lock import time import random # 生成一个互斥锁 mutex_lock = Lock() def task1(lock): # 锁门 lock.acquire() print('这是 task1 任务'.center(30, '-')) print('task1 进了洗手间') time.sleep(random.randint(1, 3)) print('task1 办事呢...') time.sleep(random.randint(1, 3)) print('task1 走出了洗手间') # 释放锁 lock.release() def task2(lock): # 锁门 lock.acquire() print('这是 task2 任务'.center(30, '-')) print('task2 进了洗手间') time.sleep(random.randint(1, 3)) print('task2 办事呢...') time.sleep(random.randint(1, 3)) print('task2 走出了洗手间') # 释放锁 lock.release() def task3(lock): # 锁门 lock.acquire() print('这是 task3 任务'.center(30, '-')) print('task3 进了洗手间') time.sleep(random.randint(1, 3)) print('task3 办事呢...') time.sleep(random.randint(1, 3)) print('task3 走出了洗手间') # 释放锁 lock.release() if __name__ == '__main__': p1 = Process(target=task1, args=(mutex_lock, )) p2 = Process(target=task2, args=(mutex_lock, )) p3 = Process(target=task3, args=(mutex_lock, )) # 释放新建进程的信号,具体谁先启动无法确定 p1.start() p2.start() p3.start() """ ---------这是 task2 任务---------- task2 进了洗手间 task2 办事呢... task2 走出了洗手间 ---------这是 task1 任务---------- task1 进了洗手间 task1 办事呢... task1 走出了洗手间 ---------这是 task3 任务---------- task3 进了洗手间 task3 办事呢... task3 走出了洗手间 """
买票的案例:
并发出错:
from multiprocessing import Process, Lock import json import time import random import os def search(): time.sleep(0.5) with open('db.json', 'r', encoding='utf8') as f: data = json.load(f) print('剩余票数:{}'.format(data.get('count'))) def buy(): with open('db.json', 'r', encoding='utf8') as f: data = json.load(f) if data.get('count', 0) > 0: data['count'] -= 1 time.sleep(random.randint(1, 3)) with open('db.json', 'w', encoding='utf8') as f2: json.dump(data, f2) print('{}购票成功!'.format(os.getpid())) else: print('购票失败') def task(): search() # 查票并发 buy() # 串行买票 if __name__ == '__main__': for i in range(10): p = Process(target=task) p.start()
加上锁
from multiprocessing import Process, Lock import json import time import random import os # 设置互斥锁 mutex_lock = Lock() def search(): time.sleep(0.5) with open('db.json', 'r', encoding='utf8') as f: data = json.load(f) print('剩余票数:{}'.format(data.get('count'))) def buy(): with open('db.json', 'r', encoding='utf8') as f: data = json.load(f) if data.get('count', 0) > 0: data['count'] -= 1 time.sleep(random.randint(1, 3)) with open('db.json', 'w', encoding='utf8') as f2: json.dump(data, f2) print('{}购票成功!'.format(os.getpid())) else: print('购票失败') def task(lock): search() # 查票并发 lock.acquire() buy() # 串行买票 lock.release() if __name__ == '__main__': for i in range(10): p = Process(target=task, args=(mutex_lock, )) p.start()
############### 进程间的通信 ##############
""" 进程间的三种通信(IPC)方式: 方式一:队列(推荐使用) 方式二:管道(不推荐使用,了解即可) 管道相当于队列,但是管道不自动加锁 方式三:共享数据(不推荐使用,了解即可) 共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据 """
############### 进程间的通信---队列 ##############
""" Queue介绍
我们可以创建一个共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 Queue的实例q常用方法: ################################### Queue([maxsize]) 创建共享的进程队列。 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。 q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。 block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。 timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。 block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。 timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。 此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。 在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。 也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。 """
基本的队列操作:
''' multiprocessing模块支持进程间通信的两种主要形式:管道和队列 都是基于消息传递实现的,但是队列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 # 如果队列中的数据一直不被取走,程序就会永远停在这里。 try: q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。 print('队列已经满了') # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。 print(q.full()) #满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print('队列已经空了') print(q.empty()) #空了
上面还没有设计到进程间的通信,下面看一个简单的主进程和子进程之间通信的例子:
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'hi', 'hello']) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == '__main__': q = Queue() #创建一个Queue对象 p = Process(target=f, args=(q,)) #创建一个进程 p.start() print(q.get()) p.join()
############ 生产者消费者模型 ##############
""" 什么是生产者消费者模式?两个角色、一个场所 两个角色: 产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者; 一个场所: 生产者和消费者之间的中介就叫做缓冲区。 为什么要使用生产者和消费者模式? 如果不使用这种模式, 那么生产者就必须等待消费者处理完,才能继续生产数据。这就阻塞了,不能并发, 同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。 为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。 使用了这种模式: 解决生产者和消费者的强耦合问题,生产者不需要等待消费者消费完了才可以生产了,而是直接扔给阻塞队列, 消费者也不需要等待生产者了,直接到阻塞队列取数据, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力 生产者/消费者模型的优点: 1、解耦,即降低生产者和消费者之间的依赖关系。 2、支持并发,即生产者和消费者可以是两个独立的并发主体,互不干扰的运行。 3、支持忙闲不均,平衡了生产者和消费者的处理能力 """
# 队列的生产者和消费者模型
# 买包子的例子
# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,
# 实际中,可能会有数据供需不平衡的问题,
# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产
# 数据消费的多了,我们要增加生产者,来解决这个问题,
# 基于队列实现生产者消费者模型 from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('