线程
IPC机制
两个进程,产生数据先产生在内存中,数据交互就是一个进程把它在内存里的数据交给另一个进程,进程之间的内存空间是隔离的,所以一个进程没办法直接将数据交给另一个进程,内存空间是隔离的,可以找到一个共享的空间
主进程要想获取子进程的数据,将子进程数据输入管道内,也就是共享空间(硬盘(在输入输出数据的时候需要通过硬盘的io,速度比较慢),还有管道(在内存空间,速度快)),有时候需要修改不同的共享数据,为了确保数据的安全性,所以要加多把锁,能实现这两种需求的只有队列(内存空间中,并且可以解决锁的问题)
# 队列:先进先出
# 堆栈:先进后出
利用队列实现进程间通信
队列=管道+锁
#(一般队列中放小数据,文件放入需要转为二进制,视频就不建议放入队列中了)
from multiprocessing import Queue
# q=Queue() 不设置大小的话,可以无限放,其实也是有限制的,因为再怎么大也大不过内存空间
q=Queue(3) # 这是造出内存空间,括号内放入的是队列的大小
q.put(['first',]) # put中可放任意python类型,容器类型如果只有一个值,一定要加括号
q.put({'x':2})
q.put(3)
# q.put(4) 第四个,这里只是设置了3个队列大小,队列已满,会造成阻塞
print(q.get())# 会一直等,等到天荒地老,直到取到值为止,
print(q.get())
print(q.get())
# print(q.get()) 造成阻塞,等着需要放入的数据再取
结果:(先进先出)
['first',]
{'x':2}
3
了解:
# q=Queue(3)
# q.put(['first',],block=True,timeout=3) block=True表示队列满的时候就锁你,timeout=3表示就是阻你三秒
# q.put({'x':2},block=True,timeout=3)
# q.put(3,block=True,timeout=3)
# q.put(4,block=True,timeout=3) timeout=3阻你三秒,如果队列满了,block=True,就会阻塞,然后抛异常说队列满了,本来是可以干其他事,可现在不能了,因为阻塞在了原地,等到别人取走一个,我立马放入,放完才能生产数据,接着往里面放 block 和timeout是联合使用的,tomeout一般默认是-1表示就是我会一直等着,直到有数据放入
q.put_nowait(1) #q.put(1,block=False) 不用等
q.put_nowait(2)
q.put_nowait(3)
q.put_nowait(4) # 第四个,队列已满,所以会抛异常
print(q.get(block=True,timeout=3)) #如果取到值就不用等,如果没有就是等三秒,还没的话就会报错
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True))# 如果不写timeout就会一直等死
print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false) 连等都不等就报错了
生产者消费者模型
(三种运行方式:并发,并行,串行)
(生产者生产时,不能消费,消费时,不能生产,所以只能是并发,并发只有进程)
(两个进程:生产进程,消费进程)
'''
1. 什么是生产者消费者模型
生产者:代指生产数据的任务
消费者:代指处理数据的任务
该模型的工作方式:
生产者生产数据传递消费者处理
实现方式:
生产者---->队列<------消费者
2. 为何要用
当程序中出现明细的两类任务,一类负责生产数据,一类负责处理数据
就可以引入生产者消费者模型来实现生产者与消费者的解耦合,平衡生产能力与消费能力,从提升效率
3. 如何用
'''
# 在造数据的过程当中,消费者不能去消费,在消费的过程当中,是不能去造数据
#而以下数据就是我生产一个包子,要等它吃了才能再生产,这会造成效率问题,
# 在你吃包子的过程中,我应该是有能力去造包子的,而因为你,我还要等,白白浪费了生产的资源,
# 然后消费者同时吃完了还要等生产包子
# 以下就是耦合
def producers():
for i in range(10):
# 造数据
consumers(i)
def consumers(res):
pass
# 提高效率:消费者和生产者之间可以有一个沟通的介质(一个盆,生产包子的时候就往里丢),厨师在后台做,
# 消费者可以不停的吃(在盆里挑包子)
# 结耦合
import time, random
from multiprocessing import Queue, Process
def producer(name, food, q):
for i in range(10):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3)) #模拟生产数据的时间
q.put(res)
print('厨师%s生产了%s' % (name, res))
def consumer(name, q):
while True:
res = q.get() # 会一直等,等到取到值为止才会运行下面的代码,
# 所以不会出现包子产生了一个,我吃了两个
time.sleep(random.randint(1, 3))
print('吃货%s吃了%s' % (name, res))
if __name__ == '__main__':
q = Queue()
#生产者们
p1 = Process(target=producer, args=('egon', '包子', q)) #子进程是一个独立的内存空间,
# 现在的子进程功能只是主进程的一个相同功能
# 消费者
c1 = Process(target=consumer, args=('刘清政', q))
p1.start() # 是生产者先告诉操作系统的,所以操作系统会先造出生产者造出包子,才到消费
c1.start()
print('主') # 主进程包括这两个功能
#以上的代码是生产者生产完了(一个函数运行完就结束了),
# 消费者因为while True而没有结束函数循环,所以会执行res = q.get(),
# 此时生产者已经没有数据了,就会造成阻塞,一直在等
#以下优化
import time , random
from multiprocessing import Queue, Process
def produce(food, q, name):
for i in range(10):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print('厨师%s生产了%s' % (name, res))
q.put(None)
def cost(q, name):
while True:
res = q.get()
if res is None: break
time.sleep(random.randint(1, 3))
print('吃货%s吃了%s' % (name, res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=produce, args=('包子', q, 'egon'))
c1 = Process(target=cost, args=(q, '刘清政'))
p1.start()
c1.start()
print('主')
# 多用户
import time , random
from multiprocessing import Queue, Process
def made(name, q, food):
for i in range(5):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print('厨师%s生产了%s' % (name, res))
q.put(None)
def spend(name, q):
while True:
res = q.get()
if res is None: break
time.sleep(random.randint(1, 3))
print('吃货%s吃了%s' % (name, res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=made, args=('yaya', q, '包子'))
p2 = Process(target=made, args=('dd', q, '露水'))
p3 = Process(target=made, args=('ww', q, '串串'))
c1 = Process(target=spend, args=('吴三江', q))
c2 = Process(target=spend, args=('egon', q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
print('主')
# 以上代码多用户下,三个用户都是在一个队列中取值,不能保证同时都结束for循环 ,所以有些消费者不能消费完就就结束掉了
import time , random
from multiprocessing import Queue, Process
def made(name, q, food):
for i in range(5):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print('厨师%s生产了%s' % (name, res))
def spend(name, q):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('吃货%s吃了%s' % (name, res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=made, args=('yaya', q, '包子'))
p2 = Process(target=made, args=('dd', q, '露水'))
p3 = Process(target=made, args=('ww', q, '串串'))
c1 = Process(target=spend, args=('吴三江', q))
c2 = Process(target=spend, args=('egon', q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print('主')
# 以上是等三个生产者子进程结束运行之后,由于两个消费者因为while循环而没有接收,
# 需要传值,不然就一直在等,而主进程和所以的子进程都是共用一个队列的,
# 所以在等生产者子进程结束后就给两个消费者传值,停止运行
import time, random
from multiprocessing import JoinableQueue, Process
def product(q, name, food):
for i in range(5):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3))
q.put(res)
print('厨师%s生产了%s' % (name, res))
def client(q,name):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print()
print('厨师%s生产了%s' % (name, res))
q.task_done() # 给q.join发送消息。
# 每循环一次,
# 走到这行代码就代表了队列中的一个值被取走了,
# 然后这一代码就是告诉q.join,q.join就会计算本来队列中所存的值再相减
# q.join每收到一次信息就减一,直到清零后就会自动跳出子进程的运行,
# 所以下面的q.join是等三个生产者都运行完后才计算队列中的值个数,
# 主要是计算消费者取值的次数,待消费者取值结束后也就是运行结束后就跳出
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=made, args=('yaya', q, '包子'))
p2 = Process(target=made, args=('dd', q, '露水'))
p3 = Process(target=made, args=('ww', q, '串串'))
c1 = Process(target=spend, args=('吴三江', q))
c2 = Process(target=spend, args=('egon', q))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join() # 主进程等q结束,即q内数据被取干净了
print('主') # 运行这行代码就表示q.join()运行完了,
# 生产者正常死亡,消费者也消费完了
# 终极版本
import time,random
from multiprocessing import Process,JoinableQueue
def producer(name,food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3)) #模拟生产数据的时间
q.put(res)
print('厨师[%s]生产了<%s>' %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3)) #模拟处理数据的时间
print('吃货[%s]吃了<%s>' %(name,res))
q.task_done()
if __name__ == '__main__':
q=JoinableQueue()
# 生产者们
p1=Process(target=producer,args=('小Egon','泔水',q))
p2=Process(target=producer,args=('中Egon','屎包子',q))
p3=Process(target=producer,args=('大Egon','腰子汤',q))
# 消费者们
c1=Process(target=consumer,args=('刘清正',q))
c2=Process(target=consumer,args=('吴三江',q))
c1.daemon=True # 守护进程,代表消费者消费完后也应该跟着主进程死掉
c2.daemon=True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.join() # 主进程等q结束,即q内数据被取干净了
print('主')
线程理论
自己的理解(操作系统就是一个工厂,多个车间,每个车间就是进程,进程就是资源单位,主要就是开辟内存空间,而车间内都自带一条流水线,流水线就是线程,线程就是执行单位,就是比如修车功能,拆轮胎功能等,一个线程就是一个功能。一个进程中可以有多个线程,提高程序运行的执行效率,一个进程中的多个线程可以共用资源。)
1 什么是线程
进程其实一个资源单位,而进程内的线程才是cpu上的执行单位
线程其实指的就是代码的执行过程
2 为何要用线程
线程vs进程
2.1. 同一进程下的多个线程共享该进程内的资源
2.2. 创建线程的开销要远远小于进程
开启线程的两种方式
#开启线程的方式一:
from threading import Thread
import time
def task(name):
print('%s is running' %name)
time.sleep(2)
print('%s is done' %name)
if __name__ == '__main__':
t=Thread(target=task,args=('线程1',)) # 创建进程的multiprocessing类其实是复制threading的接口,所以创建进程和创建线程的方法基本一样
t.start()
print('主') #主线程是不存在的,线程之间部分主次,这是约定熟成的主线程
#开启线程的方式二:
from threading import Thread
import time
class Mythread(Thread):
def run(self):
print('%s is running' %self.name)
time.sleep(2)
print('%s is done' %self.name)
if __name__ == '__main__':
t=Mythread()
t.start()
print('主')
线程特性介绍
from threading import Thread
import time
n=100
def task():
global n
n=0
if __name__ == '__main__':
t=Thread(target=task)
t.start()
t.join()
print('主',n) #0 证明主线程和多个子线程共享进程内的所有资源
from threading import Thread
import time,os
def task():
print('%s is running' %os.getpid())
if __name__ == '__main__':
t=Thread(target=task)
t.start()
print('主',os.getpid()) # 获取这个线程所属进程的id
#
from threading import Thread,active_count,current_thread
import time,os
def task():
print('%s is running' % current_thread().name)
time.sleep(2)
if __name__ == '__main__':
t=Thread(target=task,)创建一个线程的默认名字thread-1代表是第一条线程,也可以自己取名name='momo'
t.start()
# t.join()
print('主', active_count()) # 查看进程存活的线程个数 ,如果没有创建线程,也会默认为1,因为一个进程中默认自带一个线程
print('主',current_thread().name) # 查看当前线程的父线程的名字
守护线程
# from threading import Thread
# import time
#
# def task(name):
# print('%s is running' %name)
# time.sleep(2)
# print('%s is done' %name)
#
# if __name__ == '__main__':
# t=Thread(target=task,args=('线程1',))
# t.daemon=True
# t.start()
# print('主')
from threading import Thread
from multiprocessing import Process
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
t1=Thread(target=foo)
t2=Thread(target=bar)
# t1=Process(target=foo)
# t2=Process(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
'''
123
main-------
456
end456
'''
'''
main-------
123
456
end456
'''
'''
main-------
456
end456
'''
线程互斥锁
from threading import Thread,Lock
import time
mutex=Lock()
n=100
def task():
global n
mutex.acquire()
temp=n
time.sleep(0.1)
n=temp-1
mutex.release()
if __name__ == '__main__':
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)