今日内容
一、进程的扩展
1.1IPC(进程之间的通信)
基于进程与进程之间的内存空间是隔离的所以进程与进程之间通信需要一块共享内存
1.1.1管道:
管道:linux系统中
tasklist | findstr xxx
ps aux | grep xxx
1.1.2队列(加锁的管道):
from multiprocessing import Queue
q = Queue(3) #设置队列中最多存3个值,超过最大值再存就会报错
q.put([1,2,3])
q.put("asdas")
q.put({"asd":123})
# 默认值为True:表示是超过会锁定
q.put(123,block=True,timeout=4) #timeout指的是锁定时间
print(q.get())
print(q.get())
print(q.get())
q.get(block=True,timeout=3)
1.2生产者与消费者模型
该模型有两种角色,一种是生产者,另一种是消费者
生产者负责产生数据,消费者负责取走数据进行处理
生产者与消费者通过队列通信
优点:解耦合,平衡了生产者的生产力与消费者的处理能力
#方式一:生产者全部生产完毕后再发送结束信号,消费者在接收到结束信号时结束进程
from multiprocessing import Process,Queue
import time,random
def producer(q,name,food):
for i in range(3):
res = "%s%s" %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print(("%s 生产力 %s") % (name,res))
def consumer(q,name):
while True:
res = q.get()
if res is None:
break
time.sleep(random.randint(1,3))
print(("%s 吃了 %s") % (name,res))
if __name__ == '__main__':
q=Queue()
p1 = Process(target=producer,args=(q,"厨师1","包子"))
p2 = Process(target=producer,args=(q,"厨师2","馒头"))
p3 = Process(target=producer,args=(q,"厨师3","花卷"))
c1 = Process(target=consumer,args=(q,"xxx"))
c2 = Process(target=consumer,args=(q,"yyy"))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print("主")
#方式二:设置守护进程,当消费者取出值会向生产者发送一次信号,当生产者产生的值全被取完之后才会结束进程,紧接着消费者作为守护进程也会结束
from multiprocessing import Process,JoinableQueue
import time,random
def producer(q,name,food):
for i in range(3):
res = "%s%s" %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print(("%s 生产力 %s") % (name,res))
q.join()
def consumer(q,name):
while True:
res = q.get()
if res is None:
break
time.sleep(random.randint(1,3))
print(("%s 吃了 %s") % (name,res))
q.task_done()
if __name__ == '__main__':
q=JoinableQueue()
p1 = Process(target=producer,args=(q,"厨师1","包子"))
p2 = Process(target=producer,args=(q,"厨师2","馒头"))
p3 = Process(target=producer,args=(q,"厨师3","花卷"))
c1 = Process(target=consumer,args=(q,"xxx"))
c2 = Process(target=consumer,args=(q,"yyy"))
c1.daemon =True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
二、线程
2.1线程介绍
线程:一个流水线的运行过程
进程内代码的运行过程
线程是一个执行单位,cpu执行的就是线程
进程是一个资源单位
线程与进程的区别:
1、同一进程下的多个线程共享该进程的内存资源
2、开启子线程的开销要远远小于开启子进程
2.2开启线程的两种方式:
#方式一:证明线程的运行速度很快
from threading import Thread,current_thread
import os
def task():
print("%s is running" % current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print("主线程",current_thread().name)
#证明一个进程内的线程资源共享
from threading import Thread
import os
n = 100
def task():
global n
n = 0
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join()
print("主线程",n)
#方式二:
from threading import Thread
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print("%s is running" % self.name)
if __name__ == '__main__':
t = Mythread("线程1")
t.start()
2.3线程相关对象与方法:
from threading import Thread,current_thread,active_count,enumerate
import os
def task():
print("%s is running" % current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
#print(t.is_alive()) #判断子线程是否存活
#print(active_count()) #查看活跃的线程
#print(enumerate()) #显示一个列表,里面放着所有线程对象
2.4守护线程:
#守护进程守护的是代码是否结束,守护线程守护的是线程的生命周期
from threading import Thread,current_thread
import os,time
def task(n):
print("%s is running" % current_thread().name)
time.sleep(n)
print("%s is end" % current_thread().name)
if __name__ == '__main__':
t1 = Thread(target=task,args=(3,))
t2 = Thread(target=task,args=(5,))
t3 = Thread(target=task,args=(100,))
t3.daemon = True #t3为守护线程,在主线程结束之后跟着结束
t1.start()
t2.start()
t3.start()
print("主线程",current_thread().name)
2.5线程的互斥锁
#为保证数据安全,不让所有线程同时修改数据,所以在修改数据时加上了一把锁
from threading import Thread,Lock
import os,time
n = 100
mutex = Lock()
def task():
global n
with mutex:
temp = n
time.sleep(0.1)
n = temp - 1
if __name__ == '__main__':
thread_l = []
for i in range(100):
t = Thread(target=task)
thread_l.append(t)
t.start()
for obj in thread_l:
obj.join()
print("主线程",n)