模型就是解决某个问题的固定方法和套路
1.1要解决什么问题
生产者:泛指产生数据的一方
消费者:泛指处理数据的一方
案例:
食堂饭店是生产者
吃饭的人就是消费者
问题:
效率低,双方速度不同,可能一个快一个慢,双方需要互相等待
具体解决方法:
-
-
提供一个共享容器,来平衡双方的能力,之所用进程队列是因为队列可以在进程间共享
案例:
from multiprocessing import Process,Queue
import requests
import re,os,time,random
# 生产者任务:
def product(urls,q):
i = 0
for url in urls:
response = requests.get(url)
text = response.text
time.sleep(random.random())
q.put(text)
i += 1
print(os.getpid(),f'生产了第{i}个数据')
def customer(q):
i = 0
while True:
text = q.get()
time.sleep(random.random())
res = re.findall('src=//(.*?) width',text)
i += 1
print(os.getpid(),f' 第{i}个任务获取到{len(res)}个img')
if __name__ == '__main__':
urls = [
"http://www.baidu.com",
"http://www.baidu.com",
"http://www.baidu.com",
"http://www.baidu.com",
]
# 创建一个双方都能共享的容器
q = Queue()
# 生产者进程
p1 = Process(target=product,args=(urls,q))
p1.start()
# 消费者进程
c = Process(target=customer,args=(q,))
c.start()
# *************************************
问题:消费不知道什么时候结束
joinableQueue继承自Queue 用法一致
增加了join 和taskDone
join是阻塞函数,会阻塞指导taskdone的调用次数等于存入元素个数 可以用来表示队列任务处理完成
from multiprocessing import Process,JoinableQueue
import requests
import re,os,time,random
"""
生产者 负责生产热狗
消费者 负责吃热狗
"""
# 生产者任务
def product(q,name):
for i in range(5):
dog = f'{name}的热狗{i+1}'
time.sleep(random.random())
print('ok',dog)
q.put(dog)
def customer(q):
while True:
dog = q.get()
time.sleep(random.random())
print(f'消费了{dog}')
q.task_done() # 标记这个任务完成
if __name__ == '__main__':
# 创建一个双方能共享的容器
q = JoinableQueue()
# 生产者进程
p1 = Process(target=product,args=(q,'上海分店'))
p2 = Process(target=product,args=(q,'北京分店'))
p1.start()
p2.start()
# 消费者进程
c = Process(target=product,args=(q,))
# c.daemon = True # 可以将消费者设置为守护进程,当主进程确定,任务全部完成时,可以随着主进程一起结束
c.start()
p1.join()
p2.join() # 代码走到这里意味着生产法完成
q.join()# 意味着队列中的任务都处理完成了
# 结束所有任务
c.terminate() # 直接终止消费者进程
# 如何判断今天的热狗真的吃完了
# 1.确定生产者任务完成
# 2.确定生出来的数据以及全部处理完成
redis 消息队列
MQ消息队列
用来做流量削峰,保证服务不会因为高并发而崩溃(比如活动假日的某些活动)
2.多线程
并发编程
1.什么是线程
回顾: 进程是操作系统可以调度的已经进行资源分配的基本单位,是一个资源单位,其中包含了运行这个程序所需的资源
线程是操作系统可以运算调度的最小单位,是真正的执行单位,琪包含在进程中,一个进程就是一条固定的控制流程
一个进程可以包含多个线程,同一进程中的线程共享进程内的资源
特点:系统会为每一个进程自动创建一条线程,称之为主线程,后续通过代码开启的线程称之为子线程
进程对比线程1
进程是资源单位 先是执行单位
创建进程的开销远大于线程
多个进程之间的内存是互相隔离的,而线程是共享进程内的所有资源
进程之间对于硬件资源是竞争关系,而线程是协作关系
当然开启线程也是需要消耗资源的
进程之间有层级关系。而线程之间没有是平等的
比喻:计算机是工厂 进程是车间 线程是流水线
为什么要是用线程
-
有多个任务要并发处理
-
当要并发处理的任务有很多的时,不能使用进程。进程资源开销太大,CPU吃不消,线程开销非常小,使用与任务数非常多的情况
使用线程
方法1 直接实例化Thread类
from threading import Thread
# def task():
# print('子线程 run')
#
# # 与进程1不同之处 不需要加判断 开启线程的代码放哪里都可以
# t = Thread(target=task)
# t.start()
# print('over')
# 使用方法2 继承Thread类
class MyTread(Thread):
def run(self):
# 把要在子线中执行的代码放入run中
print('子 run')
mt = MyTread()
mt.start()
print('over')
"""
子 run
over
"""
线程安全问题
只要并发访问了同一资源一定会产生安全问题,解读二方法和多进程一致,就是给操作公共资源代码加锁
from threading import Thread,Lock
import time
a = 10
l = Lock()
def task():
global a
l.acquire()
temp = a
time.sleep(0.1)
a = temp - 1
l.release()
ts = []
for i in range(10):
t = Thread(target=task)
t.start()
ts.append(t)
for t in ts:t.join()
print(a)
"""
0
"""
守护线程
一个线程a设置为b的守护线程,a会随着b的结束而结束
默认情况下,主线程级是代码执行完毕也会等待所有费守护线程完毕后程序才能技术,因为多个线程之间是协作关系
from threading import Thread
import time
# 妃子的一生
def task():
print('妃子 start')
time.sleep(5)
print('妃子 over')
def task2():
print('皇太后 over')
time.sleep(3)
print('皇太后 over')
# 皇帝的一生
print('主 start')
t = Thread(target=task)
t.daemon = True
t.start()
t2 = Thread(target=task2)
t2.start()
print('主 over')
"""
主 start
妃子 start
皇太后 over
主 over
皇太后 over
"""
线程中的常用属性和方法
from threading import Thread,currentThread,enumerate,activeCount
import time
# t = Thread()
# t.start()
# t.join()
# t.is_alive()
# t.isAlive()
# t.ident # 线程标识符 id
# t.daemon
# 获取当前线程对象
# print(currentThread())
# t = Thread(target=lambda :print(currentThread()))
# t.start()
t = Thread(target=lambda :time.sleep(1))
t.start()
t = Thread(target=lambda :time.sleep(1))
t.start()
t.join()
# 获取正在运行的所有线程对象 是一个列表
print(enumerate())
# 存活的线程数量
print(activeCount())
from threading import Thread
import time
# 妃子的一生
def task():
print('妃子 start')
time.sleep(5)
print('妃子 over')
def task2():
print('皇太后 over')
time.sleep(3)
print('皇太后 over')
# 皇帝的一生
print('主 start')
t = Thread(target=task)
t.daemon = True
t.start()
t2 = Thread(target=task2)
t2.start()
print('主 over')
"""
主 start
妃子 start
皇太后 over
主 over
皇太后 over
"""