目录
一 .进程互斥锁
并发变串行
让并发变成串行,牺牲了执行效率,保证了数据的安全在程序并发执行时,需要修改数据时使用.
运行时出现全部购票成功的情况(余票为1),是因为并发编程,每个子进程都获得res为1,每个都以自己的res执行get,最后成功购票,可以在p.start后面加上p.join使其变为串行,前面结束才能运行后面的
from multiprocessing import Process
import json,time
def search():
'''查询余票'''
time.sleep(1)
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
print(f"还剩{res.get('count')}")
def get():
'''购票'''
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟网络io延迟
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'w', encoding='utf-8') as f:
json.dump(res,f)
print('抢票成功')
time.sleep(1)
else:
print('车票已经售罄')
def task():
search()
get()
if __name__ == '__main__':
for i in range(10):
p =Process(target=task)
p.start()
# 加上join运行结果正确
p.join()
lock
进程锁/互斥锁
使第一个进程进去第二个必须等待结束才能进,把锁住的代码变成了串行.
lock.acquire()
加锁, lock.release()
解锁
# 设置进程串行,进程锁/互斥锁,使第一个进程进去第二个必须等待结束才能进
from multiprocessing import Process
import json,time
from multiprocessing import Lock
def search():
time.sleep(1)
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
print(f"还剩{res.get('count')}")
def get():
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟网络io
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'w', encoding='utf-8') as f:
json.dump(res,f)
print('抢票成功')
time.sleep(1)
else:
print('车票已经售罄')
def task(lock):
search()
# lock = Lock() # 写在主进程时为了让子进程拿到同一把锁
# 锁住
lock.acquire()
get() # 同一时间只能一个进程执行get()
# 释放锁
lock.release()
if __name__ == '__main__':
lock = Lock() # 写在主进程时为了让子进程拿到同一把锁
for i in range(10):
p =Process(target=task,args=(lock,)) # 将lock当做参数传入
p.start()
总结
进程锁: 是把锁住的代码变成了串行
join: 是把所有的子进程变成了串行
二 .IPO机制
实现进程间的通信:队列和管道
1.管道
pipe: 基于共享的内存空间
2.队列
Queue: pipe+锁 from multiprocessing import Queue
队列:先进先出
相当于内存中的产生一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面
堆栈:先进后出
1.Queue()
调用队列类,实例化对象q
q = Queue(5) #若传参队列中可以放5个数据
q = Queue() #若不传参,队列中可以存放无限大的数据,前提是硬件跟得上
2.put()
添加数据,若队列中的数据满了放值会阻塞
from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3]) # 阻塞住
========================================================
from multiprocessing import Queue
q = Queue(2)
# block=true 是默认会阻塞, timeout = 2 等待时间超时2s
q.put('风',block=True,timeout=2)
q.put('风',block=True,timeout=2)
# 如果满了会进行等待,但最多等待2s否则报错
q.put('风',block=True,timeout=2)
3.q.get()
遵循先进先出
from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])
print(q.get()) # 山
print(q.get()) # 水
print(q.get()) # [1, 2, 3]
print(q.get()) # 默认存值没有就会阻塞在这
4.empty()
判断队列是否为空
print(q.empty()) # False
5.q.get_nowait()
获取数据,队列中若没有则会报错
print(q.get_nowait())
6.q.put_nowait()
添加数据 若队列满了, 则会报错
q.put_nowait(6)
7.q.full()
判断队列是否满
print(q.full()) # True
3.进程间通信
进程间数据是相互隔离的,若想实现进程间通信,可以利用队列.
可以利用进程的队列,创建数据以及获取数据(Queue不适合传大文件)
from multiprocessing import Process,Queue
def test1(q):
data = f'你好啊,赛利亚'
# 将数据data传入队列中
q.put(data)
print('进程1开始添加数据到队列中...')
def test2(q):
data = q.get()
print(f'进程2从队列中获取数据[{data}]')
if __name__ == '__main__':
# 获得队列的对象
q = Queue()
# 获取进程对象,并将队列对象传入
p1 = Process(target=test1,args=(q,))
p2 = Process(target=test2,args=(q,))
# 启动进程1,2
p1.start()
p2.start()
'''
进程1开始添加数据到队列中...
进程2从队列中获取数据[你好啊,赛利亚]
'''
三 . 生产者消费者模型
生产者:生产数据的
消费者:使用数据的
生产者 --- 队列(容器) --- 消费者
通过队列,生产者把数据添加进去,消费者从队列中获取(不适合传大文件)
生产者可以不停的生产,并可以存放在容器队列中,消费者也可以不停的取,没有因生产者的生产效率低下而等待(一种设计思想)
简单终结生产版
from multiprocessing import Queue,Process
def producer(q,name,food):
'''生产者'''
for i in range(10):
print(f'{name}生产出{food}数据{i}')
# 传输的消息
res = f'{food}{i}'
# 放入队列中
q.put(res)
# 循环结束,传入q结束生产
q.put('q')
def consumer(q,name):
'''消费者'''
# 消费者不停的获取
while True:
# 从队列中获取
res = q.get()
if res == 'q':
break
print(f'{name}消费使用了{res}数据')
if __name__ == '__main__':
q = Queue() # 创建队列
p1 = Process(target=producer,args=(q,'小明','电影')) # 传入子进程中
p2 = Process(target=consumer,args=(q,'小红'))
p1.start()
p2.start()
加强版
from multiprocessing import Queue,Process
def producer(q,name,food):
'''生产者'''
for i in range(3):
print(f'{name}生产出{food}数据{i}')
# 传输的消息
res = food,i
# 放入队列中
q.put(res)
def consumer(q,name):
'''消费者'''
# 消费者不停的获取
while True:
# 从队列中获取
res = q.get()
# 判断是否有数据
if not res:
break
print(f'{name}消费使用了{res}数据')
if __name__ == '__main__':
q = Queue() # 创建队列
p1 = Process(target=producer,args=(q,'小明','电影')) # 传入子进程中
p2 = Process(target=producer,args=(q,'小黑','书籍')) # 传入子进程中
p3 = Process(target=producer,args=(q,'小亮','动漫')) # 传入子进程中
c1 = Process(target=consumer,args=(q,'小红'))
c2 = Process(target=consumer,args=(q,'小绿'))
p1.start()
p2.start()
p3.start()
# 将c1,2设置为守护者模式
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
# join 设置子程序结束后主程序再结束
p3.join()
print('主')
四 .线程
1.定义
线程与进程都是虚拟单位,目的是为了更好的描述某种事物
进程:资源单位
进程是程序的分配资源的最小单元;一个程序可以有多个进程,但只有一个主进程;进程由程序、数据集、控制器三部分组成。
线程:执行单位
线程是程序最小的执行单元;一个进程可以有多个线程,但是只有一个主线程;线程切换分为两种:一种是I/O切换,一种是时间切换
开启一个进程一定会有一个线程,线程才是真正执行单位
2.为什么要使用线程
节省内存资源
开启进程
-
开辟一个名称空间,每开启一个进程都会占用一份内存资源
-
会自带一个线程
开启线程
- 一个线程可以开启多个线程,
- 线程的开销远小于进程,
注意: 线程不能实现并行,线程只能实现并发.
3.线程的创建
from threading import Thread
调用线程
线程不需要在if __name__ == '__main__':
下启动也可以执行.
方式1
实例化线程对象
from threading import Thread
import time
def task():
print('线程开启')
time.sleep(1)
print('线程结束')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
t.start()
方式2
创建类
from threading import Thread
import time
class MyTh(Thread):
def run(self):
print('线程开启')
time.sleep(1)
print('线程结束')
if __name__ == '__main__':
t = MyTh()
t.start()
4.属性方法
主进程像是工厂,子进程是车间,线程则是流水线
from threading import current_thread
导入属性方法
1.current_thread.name()
获得线程号
from threading import Thread
import time
from threading import current_thread
def task():
print(f'线程开启{current_thread().name}')
time.sleep(1)
print(f'线程结束{current_thread().name}')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
for i in range(3):
# 实例化线程t
t = Thread(target=task)
t.start()
2.isAlive
判断线程是否存活
def task():
print(f'线程开启{current_thread().name}')
time.sleep(1)
print(f'线程结束{current_thread().name}')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
print(t.isAlive()) # false
t.start()
print(t.isAlive()) # True
print(t.is_alive()) # True
3. .daemon = True
守护的是进程的运程周期
from threading import Thread
import time
def task():
print('线程开启')
time.sleep(0.1)
print('线程结束')
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
t.daemon = True
t.start()
print('主')
'''# 主进程结束线程即结束
线程开启
主'''
4.线程互斥锁
线程之间的数据是共享的
# 每个线程都会执行task修改操作,但每个人都是并发的,所以使用加锁进来串行操作
from threading import Thread
import time
'''
线程之间数据是共享的.
'''
from threading import Thread, Lock
import time
mutex = Lock()
n = 100
def task(i):
print(f'线程{i}启动...')
global n
mutex.acquire()
temp = n
time.sleep(0.1) # 一共等待10秒
n = temp-1
print(n)
mutex.release()
if __name__ == '__main__':
t_l=[]
for i in range(100):
t = Thread(target=task, args=(i, ))
t_l.append(t)
t.start()
for t in t_l:
t.join()
# 100个线程都是在100-1
print(n)