day35
进程:生产者消费者模型
编程思想,模型,设计模式,理论等等,都是交给你一种编程的方法,以后你遇到类似的情况,套用即可
生产者消费者模型的三要素
- 生产者:产生数据的
- 消费者:接收数据做进一步处理的
- 容器:盆(队列)
队列容器起到什么作用?
- 起到缓冲的作用,平衡生产力与消费力,解耦
from multiprocessing import Process
from multiprocessing import Queue
import time
import random
def producer(q, name):
for i in range(1,6):
time.sleep(random.randint(1, 2))
res = f"{i}号包子"
q.put(res)
print(f"生产者{name} 生产了{res}")
def consumer(q, name):
while 1:
try:
food = q.get(timeout=3)
time.sleep(random.randint(1, 3))
print(f' 33[31;0m消费者{name} 吃了{food} 33[0m')
except Exception:
return
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, "孙宇"))
p2 = Process(target=consumer, args=(q, "海狗"))
p1.start()
p2.start()
线程的理论知识
什么是线程
一条流水线的工作流程
进程:在内存中开启一个进程空间,然后将主进程的所有资源数据复制一份,然后调用cpu去执行这些代码
之前的描述不够具体:
开启一个进程:在内存中开启一个进程空间,然后将主进程的所有的资源数据复制一份,然后调用线程去执行代码
进程是最小的资源单位,线程是最小的执行单位
以后你描述开启一个进程:
开启一个进程:进程会在内存中开辟一个进程空间,将主进程的资料数据全部复制一份,线程会执行里面的代码
线程vs进程
- 开启进程的开销非常大,比开启线程的开销大很多
- 开启线程的速度非常快,要快几十倍到上百倍
- 线程与线程之间可以共享数据,进程与进程之间需借助队列等方法实现通信
线程的应用
并发:一个cpu看起来像是同时执行多个任务
-
单个进程开启三个线程,并发的执行任务
-
开启三个进程并发的执行任务
-
文本编辑器:
1、输入文字
2、在屏幕上显示
3、保存在磁盘中
开启多线程就非常好了:数据共享、开销小、速度快
-
线程没有地位之分,但是一个进程谁在干活?
只是我们自己的意思:我们把执行主程序的线程当作主线程
主线程在执行代码,当结束之后,你得等待其他线程结束之后,才能结束本进程
开启线程的两种方式
# 第一种方式
from threading import Thread
import time
def task(name):
print(f"{name} is running")
time.sleep(1)
print(f"{name} is gone")
if __name__ == '__main__':
t1 = Thread(target=task, args=("海狗",))
t1.start()
print("===主线程") # 线程是没有主次之分的
# 第二种方式
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, name, l1, s1):
super().__init__()
self.name = name
self.l1 = l1
self.s1 = s1
def run(self):
print(f"{self.name} is running")
time.sleep(1)
print(f"{self.name} is gone")
if __name__ == '__main__':
t1 = MyThread("李业", [1,2,3], "180")
t1.start()
print("===主线程")
线程vs进程的代码对比
开启速度对比
多线程
from threading import Thread
import time
def task():
print("hello")
if __name__ == '__main__':
start_time = time.time()
# 在主进程下开启线程
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
print("主线程/主进程")
print(time.time() - start_time)
时间:0.0004723072052001953
多进程
from multiprocessing import Process
import time
def work():
print("hellow")
if __name__ == '__main__':
start_time = time.time()
# 在主进程下开启线程
p1 = Process(target=work)
p2 = Process(target=work)
p1.start()
p2.start()
print("主线程/主进程")
print(time.time() - start_time)
时间:0.023804903030395508
对比pid
进程
主进程和每个子进程的pid都不一样
from multiprocessing import Process
import os
def task():
print(f"子进程:{os.getpid()}")
print(f"主进程:{os.getppid()}")
if __name__ == '__main__':
p1 = Process(target=task) # 创建一个进程对象
p2 = Process(target=task) # 创建一个进程对象
p1.start()
p2.start()
print(f"==主{os.getpid()}")
结果:
==主12832
子进程:14176
主进程:12832
子进程:11756
主进程:12832
线程
只要是在一个进程内,主线程和每个线程都一样
from threading import Thread
import os
def task():
print(os.getpid())
if __name__ == '__main__':
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
print(f"===主线程{os.getpid()}")
结果:
14480
14480
===主线程14480
同一个进程内线程共享内部数据
from threading import Thread
x = 3
def task():
global x
x = 100
if __name__ == '__main__':
t1 = Thread(target=task)
t1.start()
t1.join()
print(f"===主进程{x}")
线程的相关其他方法(了解)
from threading import Thread
from threading import activeCount
from threading import currentThread
from threading import enumerate
import os
import time
# x = 3
def task():
# print(currentThread()) # 获取当前线程对象
# time.sleep(1)
print(333)
if __name__ == '__main__':
t1 = Thread(target=task, name="线程1")
# t2 = Thread(target=task, name="线程2")
t1.start()
# t1.setName("朱凡宇") # 添加name属性
# print(t1.getName()) # 查看name属性
# print(t1.name) # 查看name属性 ****
# print(t1.isAlive()) # 判断线程是否活着
# threading方法
# print(currentThread()) # 获取当前线程对象
# print(enumerate()) # 返回一个列表,包含所有的线程对象
print(activeCount()) # 返回存活线程的数量 ****
# t2.start()
join与守护进程(考点)
join:阻塞 告知主线程要等待我子线程执行完毕之后再执行下面的代码
from threading import Thread
import time
def task(name):
print(f"{name} is running")
time.sleep(1)
print(f"{name} is gone")
if __name__ == '__main__':
start_time = time.time()
t1 = Thread(target=task, args=("海狗1",))
t2 = Thread(target=task, args=("海狗2",))
t3 = Thread(target=task, args=("海狗3",))
t1.start()
t1.join()
t2.start()
t2.join()
t3.start()
t3.join()
print(f"===主线程{time.time() - start_time}")
结果:
海狗1 is running
海狗1 is gone
海狗2 is running
海狗2 is gone
海狗3 is running
海狗3 is gone
===主线程3.0027503967285156
守护进程
# 守护进程
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(2)
print("end456")
if __name__ == '__main__':
p1 = Process(target=foo)
p2 = Process(target=bar)
p1.daemon = True
p1.start()
p2.start()
print("===主")
结果:
===主
456
end456
守护线程
守护线程:如果守护线程的生命周期小于其他线程,则他肯定结束,否则等待其他非守护线程和主线程结束之后结束
# 单线程
from threading import Thread
import time
def sayhi(name):
# print("你滚!")
time.sleep(2)
print(f"{name} say hello")
if __name__ == '__main__':
t = Thread(target=sayhi, args=("egon",))
t.daemon = True
t.start()
print("主线程")
结果:
主线程
# 多线程一
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("main------------")
结果:
123
456
main------------
end123
end456
# 多线程二
from threading import Thread
import time
def foo():
print(123)
time.sleep(3)
print("end123")
def bar():
print(456)
time.sleep(1)
print("end456")
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("main------------")
结果:
123
456
main------------
end456
互斥锁(考点)
问题
from threading import Thread
import time
import random
x = 100
def task():
# time.sleep(random.randint(1, 2))
global x
temp = x
time.sleep(random.randint(1, 3))
x = temp -1
if __name__ == '__main__':
l1 = []
for i in range(100):
t = Thread(target=task)
l1.append(t)
t.start()
for i in l1: # 使主线程无法先运行print(f"主线程{x}")
i.join()
print(f"主线程{x}")
结果: 一直是99
解决:
from threading import Thread
from threading import Lock
import time
import random
x = 100
def task(lock):
lock.acquire()
# time.sleep(random.randint(1, 2))
global x
temp = x
# time.sleep(random.randint(1, 3))
x = temp -1
lock.release()
if __name__ == '__main__':
mutex = Lock()
l1 = []
for i in range(100):
t = Thread(target=task, args=(mutex,))
l1.append(t)
t.start()
for i in l1: # 使主线程无法先运行print(f"主线程{x}")
i.join()
print(f"主线程{x}")
结果:一直是0