2019.09.16 学习整理
进程锁
问题:当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
优先抢票
from multiprocessing import Process,Lock
import json,time,os
def search():
time.sleep(1) # 模拟网络io
with open('db.txt',mode='rt',encoding='utf-8') as f:
res = json.load(f)
print(f'还剩{res["count"]}')
def get():
with open('db.txt',mode='rt',encoding='utf-8') as f:
res = json.load(f)
# print(f'还剩{res["count"]}')
time.sleep(1) # 模拟网络io
if res['count'] > 0:
res['count'] -= 1
with open('db.txt',mode='wt',encoding='utf-8') as f:
json.dump(res,f)
print(f'进程{os.getpid()} 抢票成功')
time.sleep(1.5) # 模拟网络io
else:
print('票已经售空啦!!!!!!!!!!!')
def task(lock):
search()
# 锁住
lock.acquire()
get()
lock.release()
# 释放锁头
if __name__ == '__main__':
lock = Lock() # 写在主进程是为了让子进程拿到同一把锁.
for i in range(15):
p = Process(target=task,args=(lock,))
p.start()
# p.join()
# 进程锁 是把锁住的代码变成了串行
# join 是把所有的子进程变成了串行
# 为了保证数据的安全,串行牺牲掉效率.
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改
队列
ipc机制 进程通讯
管道:pipe 基于共享的内存空间
队列:pipe+锁 queue
Queue
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。
2.1.1 方法介绍
Queue([maxsize])
:创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
:返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
:如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
:如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()
方法)。
2.1.2 其他方法(了解)
q.close()
:关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()
操作上,关闭生产者中的队列不会导致get()
方法返回错误。
q.cancel_join_thread()
:不会再进程退出时自动连接后台线程。这可以防止join_thread()
方法阻塞。
q.join_thread()
:连接队列的后台线程。此方法用于在调用q.close()
方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()
方法可以禁止这种行为
案例
### 案例一
# q = Queue()
# q.put('鲁照山')
# q.put([1,2,4])
# q.put(2)
# print(q.get())
# print(q.get())
# print(q.get())
# # q.put(5)
# # q.put(5)
# print(q.get()) # 默认就会一直等着拿值
### 案例2
# q = Queue(4)
# q.put('鲁照山')
# q.put([1,2,4])
# q.put([1,2,4])
# q.put(2)
#
# q.put('乔碧萝') #队列满了的情况再放值,会阻塞
### 案例3 (从这往下都是了解)
# q = Queue(3)
# q.put('zhao',block=True,timeout=2) #
# q.put('zhao',block=True,timeout=2) #
# q.put('zhao',block=True,timeout=2) #
#
# q.put('zhao',block=True,timeout=5) # put里的 block=True 如果满了会等待,timeout最多等待n s,如果ns还是队列还是满的就报错了
### 案例4
# q = Queue()
# q.put('yyyy')
# q.get()
# q.get(block=True,timeout=5) # block=True 阻塞等待,timeout最多等5s, 剩下同上
### 案例5
# q = Queue(3)
# q.put('qwe')
# q.put('qwe')
# q.put('qwe')
#
# q.put('qwe',block=False) # 对于put来说block=False 如果队列满了就直接报错
# q = Queue(3)
# q.put('qwe')
# q.get()
#
#
# q.get(block=False)
# block = Flase 拿不到不阻塞,直接报错
### 案例6
q = Queue(1)
q.put('123')
# q.get()
q.put_nowait('666') # block = False
# q.get_nowait() # block = False
#
生产者消费者模型
生产者: 生产数据的任务
消费者: 处理数据的任务
生产者--队列(盆)-->消费者
生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.
生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率
补充: queue不适合传大文件,通产传一些消息.
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Process,Queue,JoinableQueue
import time,random
def producer(q,name,food):
'''生产者'''
for i in range(3):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1, 3))
res = f'{food}{i}'
q.put(res)
# q.put(None)
def consumer(q,name):
'''消费者'''
while True:
res = q.get()
# if res is None:break
time.sleep(random.randint(1,3))
print(f'{name}吃了{res}')
q.task_done() #
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'rocky','包子'))
p2 = Process(target=producer,args=(q,'mac','韭菜'))
p3 = Process(target=producer,args=(q,'nick','蒜泥'))
c1 = Process(target=consumer,args=(q,'成哥'))
c2 = Process(target=consumer,args=(q,'浩南哥'))
p1.start()
p2.start()
p3.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # 生产者生产完毕
# q.put(None)# 几个消费者put几次
# q.put(None)
q.join() # 分析
# 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
#这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
JoinableQueue
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
方法介绍
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
q.task_done()
:使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
q.join()
:生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
from multiprocessing import Process,Queue,JoinableQueue
q = JoinableQueue()
q.put('zhao') # 放队列里一个任务
q.put('qian')
print(q.get())
q.task_done() # 完成了一次任务
print(q.get())
q.task_done() # 完成了一次任务
q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
# 想象成一个计数器 :put +1 task_done -1
初识线程
初识别线程.
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
在工厂中, 每个车间都有房子,而且每个车间默认就有一条流水线.
操作系统 ===> 工厂
进程 ===> 车间
线程 ===> 流水线
cpu ===> 电源
线程:cpu最小的执行单位
进程:资源集合/资源单位.
线程运行 = 运行代码
进程运行 = 各种资源 + 线程