生产者消费者模型
1,生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题
2,生产者和消费者彼此之间不直接通讯,而阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
3,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
初级生产者消费者模型
import time
import random
from multiprocessing import Process, Queue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
if __name__ == '__main__':
q = Queue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
# 如果queue中没有数据了,消费者会一直卡住
import time
import random
from multiprocessing import Process, Queue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
q.put(None)
def consumer(name, q):
while True:
food = q.get()
if food is None:return # 当队列中取出None,之间结束
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
if __name__ == '__main__':
q = Queue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
# 制造两个消费者
import time
import random
from multiprocessing import Process, Queue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
if food is None:return # 当队列中取出None,之间结束
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
if __name__ == '__main__':
q = Queue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
c1.start()
# 生产者生产完毕,放两个None
p.join() # 等待p进程执行完成再放
q.put(None)
q.put(None)
多个生产者和多个消费者
制造两个消费者
import time
import random
from multiprocessing import Process, Queue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
if food is None:return # 当队列中取出None,之间结束
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
if __name__ == '__main__':
q = Queue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
p1 = Process(target=producer, args=('alex', '泔水', q))
p1.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
c1.start()
c2 = Process(target=consumer, args=('李铁柱', q))
c2.start()
# 生产者生产完毕,放两个None
p.join() # 等待p进程执行完成再放
p1.join() # 等待p1(另一个生产者)进程执行完成再放
q.put(None)
q.put(None)
q.put(None)
# 最终版本,不用放None
import time
import random
from multiprocessing import Process, Queue,JoinableQueue
def producer(name, food, q):
for i in range(10):
data = '%s 制造了%s' % (name, food)
# 模拟制造食物延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(food)
def consumer(name, q):
while True:
food = q.get()
# 模拟吃食物延迟
time.sleep(random.randint(1, 3))
print('%s消费了%s' % (name, food))
q.task_done() # 把队列中维护的数字减一
if __name__ == '__main__':
# q = Queue()
# 内部为何了一个数字,放一个数字会加一
# 消费一个数字减一
q = JoinableQueue()
# 创造一个生产者
p = Process(target=producer, args=('egon', '包子', q))
p.start()
p1 = Process(target=producer, args=('alex', '泔水', q))
p1.start()
# 创造一个消费者
c = Process(target=consumer, args=('鸡哥', q))
# c.daemon = True
c.start()
c1 = Process(target=consumer, args=('王铁蛋', q))
# c1.daemon = True
c1.start()
c2 = Process(target=consumer, args=('李铁柱', q))
# c2.daemon = True
c2.start()
# 主结束,消费进程也结束,把每个消费进程都设置成守护进程
# 等待所有生产者生产结束,主进程再结束
p.join()
p1.join()
q.join() # 会卡再者,一直等待q队列中数据没有了,才继续往下走
print('生产者结束了,主进程结束')
# JoinableQueue()
# 每放一个值,数字加一
# 取值不会减一,q.task_done()
# q.join() 一直阻塞,当q没有值了,才继续走
线程
进程是资源分配的最小单位,线程是cpu调度的最小单位,每一个进程中至少有一个线程
线程开销更小,更轻量级
特点:
1,轻型实体
2,独立调度和分派的基本单位
3,共享进程资源
4,可执行并发
线程和进程的区别
1,地址空间和其他资源(打开文件):进程间相互独立,同一进程的各线程间共享.某进程内的线程在其它进程不可见
2,通信:进程间通信ipc,线程间可以直接读写进程数据段(如全局变量)来进行通信-_需要进程同步和互斥手段的辅助,保证数据的一致性
3,调度和切换:线程上下文切换比进程上下文切换要快得多
4,在多线程操作系统中,进程不是一个可执行的实体
开启线程的两种方式
第一种
from threading import Thread
import time
def task():
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,) # 实例化得到一个对象
t.start() # 对象.start()启动线程
print('主')
第二种,通过类继承的方式
from threading import Thread
import time
class MyThread(Thread):
def run(self):
print('开始')
time.sleep(1)
print('结束')
if __name__ == '__main__':
t=MyThread()
t.start()
print('主')
线程对象join方法
# 1 等待子线程执行结束
from threading import Thread
import time
def task(n):
print('开始')
time.sleep(n)
print('结束')
if __name__ == '__main__':
t=Thread(target=task,args=(2,))
t.start()
t1=Thread(target=task,args=(3,))
t1.start()
t.join() # 等待子进程执行结束
t1.join()
print('主')
同一个进程下的多个线程数据共享
from threading import Thread
import time
money = 99
def task(n):
global money
money = n
print('开始')
print('结束')
if __name__=='__main__':
t = Thread(target=task,args=(2,))
t.start()
t1 = Thread(target=task,args=(45,))
t1.start()
t.join()
t1.join()
print(money)
print('主')
线程对象及其他方法
Thread实例对象的方法
t.name t.getName():线程名字
active_count 当前进程下有几个线程存活
is_alive() 当前线程是否存活
.ident 当做是线程id号
threading模块提供的一些方法:
threading.currentThread():返回当前的线程变量
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量 len(threading.enumerate())有相同的结果。
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
print(current_thread().name) # 线程名字
# 如果打印进程id号,会是什么
print(os.getpid())
time.sleep(n)
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='egon',args=(2,))
t2 = Thread(target=task,args=(8,))
t1.start()
t2.start()
t1.join()
print('---------',t1.is_alive())
print('---------',t2.is_alive())
# 当作线程id号
print('*********',t1.ident)
print('*********',t2.ident)
print(os.getpid())
print(active_count()) # 打印出3 ,开了两个线程,还有一个主线程
守护线程
from threading import Thread, current_thread,active_count
import time
import os
def task(n):
print('开始')
time.sleep(n)
# print('-----',active_count())
print('结束')
if __name__ == '__main__':
t1 = Thread(target=task,name='egon',args=(10,))
# t1.daemon = True
t1.setDaemon(True)
t1.start()
t2 = Thread(target=task,name='egon',args=(4,))
t2.start()
print('主')
线程互斥锁
from threading import Thread,Lock
import time
import random
money = 99
def task(n,mutex):
global money
# 在修改数据的时候,枷锁
mutex.acquire()
temp = money
time.sleep(0.1)
money = temp - 1
# 修改完以后,释放锁,其它线程就能再次抢到锁
mutex.release()
if __name__ == '__main__':
ll=[]
mutex=Lock()
for i in range(10):
t = Thread(target=task, args=(i,mutex))
t.start()
ll.append(t)
for i in ll:
i.join()
print(money)
GIL全局解释器锁理论
1 python的解释器有很多,cpython,jpython,pypy(python写的解释器)
2 python的库多,库都是基于cpython写起来的,其他解释器没有那么多的库
3 cpython中有一个全局大锁,每条线程要执行,必须获取到这个锁
4 为什么会有这个锁呢?python的垃圾回收机制
5 python的多线程其实就是单线程
6 某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行
7 总结:cpython解释器中有一个全局锁(GIL),线程必须获取到GIL才能执行,我们开的多线程,不管有几个cpu,同一时刻,只有一个线程在执行(python的多线程,不能利用多核优势)
8 如果是io密集型操作:开多线程
9如果是计算密集型:开多进程
以上两句话,只针对与cpython解释器