小结
初始别线程
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
在工厂中,每个车间都有房子,而且每个车间默认就有一条流水线
操作系统=》工厂
进程=》车间
线程=》流水线
cpu=>电源
线程:cpu最小的执行单位
进程:资源集合/资源单位
线程运行 = 运行代码
进程运行 = 各种资源+线程
上节课回顾
'''
多道技术:
空间复用:多个程序共用一个内存条,彼此隔离,物理级别隔离
时间复用:共用一个cpu(实现并发)
切换的情况:
遇到io切换,占用时间过长也切换
串行:一个任务完完整整的运行结束后,再运行下一个任务
并发:看起来是同时执行多个任务 针对于单核
并行:真正的做到同时执行多个任务 针对于多核
'''
# 开启子进程方式一
# from multiprocessing import Process
# def foo():
# print('子进程 start')
# # 可以导入time模块让程序睡一会
# print('子进程 end')
#
# if __name__ == '__main__':
# p = Process(target=foo)
# p.start()
# print('主进程')
# 开启子进程方式二
# from multiprocessing import Process
# class Myp(Process):
# def run(self):
# print('子进程 start')
# print('子进程 end')
#
# if __name__ == '__main__':
# p = Myp()
# #p.run() 这是直接调用函数
# p.start() #这才是开启子进程 #给操作系统发送一个请求,操作系统去开启进程
# print('主进程')
##### 开启子进程, 申请一个新的内存空间,把父进程的代码完整copy一份放进去,然后去运行代码
##### join回顾
# from multiprocessing import Process
# import time
#
#
# def task(s):
# time.sleep(s)
#
# if __name__ == '__main__':
# p = Process(target=task, args=(1,))
# p1 = Process(target=task, args=(3,))
# p2 = Process(target=task, args=(5,))
#
# p.start()
# p1.start()
# p2.start()
#
# p.join()
# p1.join()
# p2.join() #join内部会调用wait()
# print(p.pid)
# 主进程要等待所有子进程结束后才会结束,因为主进程要在结束前回收僵尸进程#####
#####僵尸进程:没有死透的子进程
#孤儿进程:子进程运行的过程中父进程死了就变成了孤儿进程,最终会被系统的init接管回收
# 父进程一直不死,一直在开启子进程,意味着占用过多的pid并且不回收
# 解决方案:强制杀死这个父进程
##### 守护进程
# 本质也是一个子进程
# 主进程的代码执行完毕守护进程直接结束
# 大前提:主进程结束之前守护进程一直运行着
# from multiprocessing import Process
# import time
# def task():
# print('守护进程 start')
# time.sleep(4)
# print('守护进程 end')
#
# if __name__ == '__main__':
# p =Process(target=task)
# p.daemon = True
# p.start()
# time.sleep(2)
# print('主进程')
# 进程锁(***)
# 进程锁:是把锁住的代码变成了串行
# 队列(*****)
# 主要掌握 Put 和 get 用法
# 生产者消费者模型(*****)
# 自己敲一遍 重点掌握这种思想
# JoinableQueue(*)
# 看一下里面的守护进程
# 初始线程(好好理解一下)(*****)
优化抢票
from multiprocessing import Process, Lock
import json, time, os
def search():
time.sleep(1) # 模拟网络io
with open('db.txt', 'rt', encoding='utf-8')as f:
res = json.load(f)
print(f'还剩下{res["count"]}票')
def get():
with open('db.txt', 'rt', encoding='utf-8')as f:
res = json.load(f)
time.sleep(1)#模拟网络io
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'wt', encoding='utf-8')as f:
json.dump(res, f)
print(f'进程{os.getpid()}抢票成功')
time.sleep(1) #模拟网络io
else:
print('票已售空!!!')
def task(lock):
search()
#锁住
lock.acquire() # acquire(获得),对象拿到锁
get()
lock.release() # release(释放),对象打开锁
# 释放锁
if __name__ == '__main__':
lock = Lock() # 这一步写在主进程里是为了让所有的子进程拿到同一把锁
for i in range(15):
p = Process(target=task, args=(lock,))
p.start()
#### 进程锁:是把锁住的代码变成串行(相对提高效率)
#### join 是把所有的子进程都变成串行
# 为了数据安全,串行牺牲了效率
队列
'''
ipc 机制: 进程之间互相隔离,要实现进程之间的通讯(IPC),multiprocess模块支持两种形式:队列(Queue)和管道(Pipe),这两种方式都是使用消息传递的
创建队列的类(Queue(maxsize)(底层就是以管道+锁的方式实现)
maxsize是队列中允许最大项数,省略则无大小限制
队列:Queue 的 put 和 get 方法
get 取值时有先进先出的原则
'''
from multiprocessing import Process, Queue
###案例一
# q = Queue()
# q.put('鲁照山')
# q.put([1,2,3])
# q.put(2)
# print(q.get()) #鲁照山
# print(q.get()) #[1, 2, 3]
# print(q.get()) #2
# #队列里没有了,再取值会怎样?
# print(q.get()) #默认会一直卡在这里等着拿值
###案例二
# q = Queue(3)
# q.put('鲁照山')
# q.put([1,2,4])
# q.put(2)
# # 对列放满了再往里放会怎样?
# #q.put(111) #会卡主,等待队列空出位置往里放
#
# q.put(111,block=True,timeout=3) #队列满了再放值,会阻塞,利用timeout限制,等待时间超过后就会报错(这里表示会等待3秒,超时报错)
###案列3 (从这往下都是了解)
# q = Queue(3)
# q.put('鲁照山',block=True, timeout=2)
# q.put([1,2,4],block=True, timeout=2)
# q.put(2,block=True, timeout=2)
#
# q.put('zhao', block=True, timeout=5) # put里的 如果是block=True,队列满了会等待,timeout是最多等待ns, 如果 ns 后还是满的就会报错
###案例4
# q = Queue()
# q.put('yyy')
# print(q.get()) #yyy
#
# #如果队列中没有,还取值会怎样?
# q.get(block=True,timeout=3) #block = True,阻塞等待,timeout最多等待时间,超出报错
###案例5
# q = Queue(3)
# q.put('qwe')
# q.put('qwe')
# q.put('qwe')
#
# q.put('qwe',block= False) #对于put来说直接用block= False, 队列满了再放直接报错
# 同理,取值时用block= False,队列里没有了还取就是直接报错
# q = Queue()
# q.put(1)
# q.get()
# q.get(block= False) #用block=False ,拿不到不阻塞,直接报错
###案列6
q = Queue(1)
q.put('yyy')
q.put_nowait('666')
# q.put_nowait()相当于用了block= False
生产者消费者模型
'''
生产者:生产数据的任务
消费者:处理数据的任务
生产者---》队列(盆)---》消费者
生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,达到自己最大的消费效率
生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率
# 补充:queue不适合传大文件,通常传一些消息
'''
from multiprocessing import Process,Queue
def producer(q,name,food):
'''生产者'''
for i in range(10):
print(f'{name}生产了{food}{i}')
res = f'{food}{i}'
q.put(res)
q.put(None)
def consumer(q,name):
'''消费者'''
while True:
res = q.get(timeout=5)
if res is None:
break
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p = Process(target=producer, args=(q,'rocky', '包子'))
c = Process(target=consumer, args=(q,'成哥'))
p.start()
c.start()
生产者消费者模型2
'''
生产者: 生产数据的任务
消费者: 处理数据的任务
生产者--队列(盆)-->消费者
生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.
生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.
# 补充: queue不适合传大文件,通产传一些消息.
'''
from multiprocessing import Process, Queue
import time,random
def producer(q, name, food):
'''生产者'''
for i in range(3):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1,3))
res = f'{food}{i}'
q.put(res)
def consumer(q, name):
'''消费者'''
while True:
res = q.get(timeout=5)
if res is None:
break
time.sleep(random.randint(1,3))
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,'rocky','包子'))
p2 = Process(target=producer, args=(q,'mac','韭菜'))
p3 = Process(target=producer, args=(q, '成哥', '大蒜'))
c1 = Process(target=consumer, args=(q,'山鸡哥'))
c2 = Process(target=consumer, args=(q, '浩南哥'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # 保证生产者生完毕
q.put(None) # 有几个消费者就put几次
q.put(None)
生产者消费者模型03
'''
生产者: 生产数据的任务
消费者: 处理数据的任务
生产者--队列(盆)-->消费者
生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.
生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.
# 补充: queue不适合传大文件,通产传一些消息.
'''
from multiprocessing import Process,JoinableQueue
import time,random
def producer(q,name,food):
'''生产者'''
for i in range(3):
print(f'{name}生产了{food}{i}')
time.sleep(random.randint(1, 3))
res = f'{food}{i}'
q.put(res)
def consumer(q,name):
'''消费者'''
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print(f'{name}吃了{res}')
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, 'rocky', '包子'))
p2 = Process(target=producer, args=(q, 'mac', '韭菜'))
p3 = Process(target=producer, args=(q, 'nick', '蒜泥'))
c1 = Process(target=consumer, args=(q, '成哥'))
c2 = Process(target=consumer, args=(q, '浩南哥'))
p1.start()
p2.start()
p3.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # 生产者生产完毕
q.join() #分析
# 生产者生产完毕--这是主程序最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了
# 这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义,守护进程的概念
测试JoinableQueue
from multiprocessing import Process, Queue, JoinableQueue
q = JoinableQueue()
q.put('zhao') # 放队列一个任务
q.put('qian')
print(q.get())
q.task_done() # 完成了一次任务
print(q.get())
q.task_done() # 完成了一次任务
q.join() #计数器不为0时,阻塞等待计数器为0后通过
# 想象成一个计数器: put +1 task_done -1