前情回顾
1. multiprocessing模块 创建进程
基本创建进程: Process(target,args,kwargs,name)
start()
join([timeout])
进程对象属性:p.name p.pid p.daemon p.is_alive()
自定义进程类:1. 继承Process,重写__init__
2. 重写run方法
进程池: 大量进程事件需要较多进程处理,
此时进程池可以避免进程频繁创建销毁带来的系统消耗。
Pool() 创建进程池对象
apply_async() 将事件使用进程池执行,async(异步)
close() 关闭进程池,不能再添加新的事件了
join() 回收进程池
map() 使用迭代对象为指定函数传参后,加入进程池
2. 进程间通信
【1】管道 : Pipe() fd.recv() fd.send()
【2】消息队列:Queue() q.get()获取消息 q.put() 存入消息
q.full() q.empty() q.qsize() q.close()
1 import os 2 from multiprocessing import Process 3 4 filename = './he.jpg' 5 6 #获取文件大小 7 size = os.path.getsize(filename) 8 9 #复制上半部分 10 def top(): 11 f = open(filename,'rb') 12 n = size // 2 13 fw = open("half_top.jpg",'wb') 14 while True: 15 if n < 1024: 16 data = f.read(n) 17 fw.write(data) 18 break 19 data = f.read(1024) 20 fw.write(data) 21 n -=1024 22 f.close() 23 fw.close() 24 25 #复制下半部分 26 def boot(): 27 f = open(filename,'rb') 28 fw = open('half_bottom.jpg','wb') 29 f.seek(size//2,0) 30 31 while True: 32 data = f.read(1024) 33 if not data: 34 break 35 fw.write(data) 36 f.close() 37 fw.close() 38 39 t = Process(target = top) 40 b = Process(target = boot) 41 t.start() 42 b.start() 43 t.join() 44 b.join()
************************************************************************************************************
一.进程间通信(续)
1.共享内存
【1】通信原理:在内存中开辟一块空间,进程可以写入内容和读取内容完成通信,
但是每次写入内容会覆盖之前的内容
【2】实现方法:
from multiprocessing import Value,Array
obj = Value(ctype,data)
功能:开辟共享内存
参数:ctype 表示共享内存空间类型'i'整 'f'浮点 'c'字符
data 共享内存空间初始数据
返回值:共享内存对象
obj.value 对该属性的修改或查看即对共享内存的读写
obj = Array(ctype,data)
功能:开辟共享内存空间
参数:ctype 表示共享内存数据类型
data 整数 则表示开辟空间的大小,
其他数据 开辟空间存放的初始化数据
返回值:共享内存对象
Array共享内存读写:通过遍历obj可以得到的每个值,直接可以通过索引号修改任意值。
1 from multiprocessing import Process,Value 2 import time 3 import random 4 5 #创共享内存 6 money = Value('i',5000) 7 8 #操作共享内存 9 def man(): 10 for i in range(30): 11 time.sleep(0.2) 12 money.value += random.randint(1,1000) 13 14 def girl(): 15 for i in range(30): 16 time.sleep(0.16) 17 money.value -= random.randint(100,900) 18 19 m = Process(target=man) 20 g = Process(target=girl) 21 m.start() 22 g.start() 23 m.join() 24 g.join() 25 26 print("一月余额:",money.value)
1 from multiprocessing import Process,Array 2 import time 3 4 #创建共享内存 5 #shm = Array('i',[1,2,3,4,5]) 6 7 #创建共享内存,指定开辟空间大小 8 # shm =Array('i',6) 9 10 11 #存如字符串 12 shm = Array('c',b'Hello') 13 14 15 def fun(): 16 for i in shm: 17 print(i) 18 # shm[2]=3000 19 shm[0] = b'h' 20 21 p = Process(target = fun) 22 p.start() 23 p.join() 24 25 for i in shm: 26 print(i,end=" ") 27 print(shm.value)#打印字符串
1 from multiprocessing import Process,Array 2 import time 3 4 #创建共享内存 5 #shm = Array('i',[1,2,3,4,5]) 6 7 #创建共享内存,指定开辟空间大小 8 # shm =Array('i',6) 9 10 11 #存如字符串 12 shm = Array('c',b'Hello') 13 14 15 def fun(): 16 for i in shm: 17 print(i) 18 # shm[2]=3000 19 shm[0] = b'h' 20 21 p = Process(target = fun) 22 p.start() 23 p.join() 24 25 for i in shm: 26 print(i,end=" ") 27 print(shm.value)#打印字符串
1 from multiprocessing import Process,Array 2 import time 3 4 5 #创建共享内存,指定开辟空间大小 6 shm =Array('i',6) 7 8 9 10 def fun(): 11 for i in shm: 12 print(i) 13 shm[2]=3000 14 15 16 p = Process(target = fun) 17 p.start() 18 p.join() 19 20 for i in shm: 21 print(i,end=" ") 22 print()
2.信号量(信号灯集)
[1]通信原理:给定一个数量对多个进程可见。多个进程都可以操作该数量增减,并根据数量决定自己的行为
[2]实现方法
from multiprocessing import Semaphore
sem = Semaphore(num)
功能:创建信号量对象
参数:信号量的初始值
返回值:信号量对象
sem.acqire() 将信号量减1 当信号量为0时阻塞
sem.release() 将信号量+1
sem.get_value()获取信号量数量
补充:当在父进程中创建套接字,进程间通信对象,文件对象,子进程从父进程获取这些对象时,那么对对象的操作
会有属性的相互关联影响。如果在各自进程中单独创建这些对象,则各自互不影响。
1 from multiprocessing import Semaphore,Process 2 from time import sleep 3 import os 4 #创建信号量 5 sem = Semaphore(3) 6 7 def fun(): 8 print("%d 想执行事件"%os.getpid()) 9 #想执行事件必须得到信号资源 10 sem.acquire() 11 print("%s 抢到了一个信号量,可以执行操作"%os.getpid()) 12 sleep(3) 13 print("%d执行完事件再增加信号量"%os.getgid()) 14 sem.release() 15 jobs=[] 16 for i in range(5): 17 p = Process(target = fun) 18 jobs.append(p) 19 p.start() 20 21 for i in jobs: 22 i.join() 23 24 print(sem.get_value())
二.线程编程(Thread)
1.什么是线程
【1】线程被称为轻量级的进程
【2】线程也可以使用计算机多核资源,是多任务编程方式
【3】线程是系统分配内核的最小单元
【4】线程可以理解为进程的分支任务
2.线程特征
【1】一个进程中可以包含多个线程
【2】线程也是一个运行行为,消耗计算机资源
【3】一个进程中的所有线程共享这个进程的资源
【4】多个线程之间的运行互不影响各自运行
【5】线程的创建和销毁消耗资源远小于进程
【6】各个线程有自己的ID等特征
3.创建线程threading模块
【1】创建线程对象
from threading import Thread
t = Thread()
功能:创建线程对象
参数:target 绑定线程对象
args 元组 给线程函数位置传参
kwargs 字典 给线程函数键值传参
【2】启动线程
t.start()
【3】回收线程
t.join([timeout])
4.线程对象属性:
t.name 线程名称
t.setName() 设置线程名称
t.getName() 获取线程名称
t.is_alive() 查看线程是否在生命周期
t.daemon 设置主线程和分支线程的 关系
t.setDaemon() 设置daemon属性值
t.isDaemon() 查看daemon属性值
*daemon为True时主线程退出分支线程也退出。
要在start前设置,通常不和join一起使用
1 import threading 2 from time import sleep 3 import os 4 5 a=1 6 #线程函数 7 def music(): 8 global a 9 print("a=",a) 10 for i in range(5): 11 sleep(2) 12 print("播放鱼",os.getpid()) 13 a = 10000 14 15 #创建线程对象 16 t = threading.Thread(target = music) 17 t.start() 18 19 #主线程运行任务 20 for i in range(3): 21 sleep(3) 22 print("《大哥》",os.getpid()) 23 t.join() 24 print("Main thread a:",a)
1 from threading import Thread 2 from time import sleep 3 4 #含有参数的线程函数 5 def fun(sec,name): 6 print("线程函数传参") 7 sleep(sec) 8 print("%s 线程执行完毕"%name) 9 10 #创建多个线程 11 threads = [] 12 for i in range(5): 13 t = Thread(target = fun,args=(2,),kwargs={'name':'T%d'%i}) 14 threads.append(t) 15 t.start() 16 17 for i in threads: 18 i.join()
1 from threading import Thread 2 from time import sleep 3 4 def fun(): 5 sleep(3) 6 print("线程属性测试") 7 8 # t = Thread(target = fun) 9 t = Thread(target = fun,name = 'Tarena') 10 11 12 13 #线程名称 14 t.setName('Tedu')#设置完会覆盖之间前的名称 15 print('Thread name:',t.name) 16 print("Thread name:",t.getName()) 17 18 19 #设置daemon为True 20 t.setDaemon(True) 21 t.start() 22 23 #查看线程的生命周期 24 print("alive:",t.is_alive()) 25 t.join()
5.自定义线程类:
【1】创建步骤
1. 继承Thread类
2.重写__init__方法添加自己的属性,使用super加载父类属性
3.重写run方法
【2】使用方法
1.实例化对象
2.调用start自动执行run方法
3.调用join回收线程
1 from threading import Thread 2 from time import sleep,ctime 3 4 class MyThread(Thread): 5 def __init__(self,target=None,args=(),kwargs={},name='Tedu'): 6 super().__init__() 7 self.target = target 8 self.args = args 9 self.kwargs = kwargs 10 self.name = name 11 12 def run(self): 13 self.target(*self.args,**self.kwargs) 14 def player(sec,song): 15 for i in range(2): 16 print("playing %s:%s"%(song,ctime())) 17 sleep(sec) 18 19 t = MyThread(target = player,args=(3,),kwargs={'song':'凉凉'},name='happy') 20 21 t.start() 22 t.join()
三.线程通信
【1】通信方法:线程间使用全局变量间进行通信
【2】共享资源争夺
1.共享资源:多个进程或者线程都可以操作的资源成为公共资源,
对共享资源的操作代码段称为临界区。
2.影响:对共享资源的而无需操作可能会代来数据的混乱,或者操作错误。
此时往往需要同步互斥机制协调操作顺序。
【3】同步互斥机制
同步:同步是一种协作关系,为完成操作,多进程或者线程间形成一种协调,
按照必要的步骤有序执行操作。
互斥:是一种制约关系资源,当一个进程或者线程占有资源是会进行加锁处理,
此时其他进程线程就无法操作该资源,直到解锁后才能操作。
【4】线程同步互斥方法
1.线程Event
from threading import Event
e = Event() 创建线程event对象
e.wait([timeout]) 阻塞等待e被set
e.set() 设置e,使wait结束阻塞
e.clear() 使e回到未被设置的状态
e.is_set()查看当前e是否被设置
1 from threading import Event 2 3 #创建事件对象 4 e = Event() 5 6 e.set()#设置e 7 e.clear()#清楚set状态 8 print(e.is_set())#查看是否被设置 9 e.wait(3)#超时时间 10 print("-------------")
1 from threading import Thread,Event 2 from time import sleep 3 4 s = None#全局变量,用于通信 5 e = Event() 6 7 def foo(): 8 sleep(0.1) 9 print("Foo 前来拜山头") 10 global s 11 s = "天王盖地虎" 12 e.set()#设置e 13 14 f = Thread(target = foo) 15 f.start() 16 17 #主线程验证口令 18 print("说对口令为自己人") 19 e.wait()#添加阻塞 20 if s == '天王盖地虎': 21 print("是自己人") 22 else: 23 print("不是自己人") 24 25 f.join()
2.线程锁 Lock
from threading import Lock
lock = Lock()#创建锁对象
lock.acqire()#上锁 如果lock已经上锁在调用会阻塞
lock.release()#解锁
with lock: 上锁
...
...
with代码块结束自动解锁
1 from threading import Thread,Lock 2 3 a = b = 0 4 lock = Lock()#锁对象 5 #一个线程 6 def value(): 7 while True: 8 lock.acquire() 9 if a!=b: 10 print("a=%d,b=%d"%(a,b)) 11 lock.release() 12 13 t = Thread(target=value) 14 t.start() 15 #另一个线程 16 while True: 17 while True: 18 with lock: 19 a +=1 20 b +=1 21 22 t.join()
作业:1.对比进程线程的特点区别
2.做单进程, 多进程, 多线程的效率测试
执行10遍 创建10个进程 创建10个线程
每个执行一遍 每个执行一遍