1、Event
# Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化进行操作;
# 总结:使用同一个Event对象的标记flag,默认flag=False;谁wait就是等到flag变为True,或等到超时返回False,不限制等待的个数;
## 模拟实现老板雇佣一个工人生产5个杯子,老板一直等待工人的工作完成 ######
import threading import logging,time from threading import Event logging.basicConfig(level=10,format="%(asctime)s %(threadName)s %(thread)d %(message)s",datefmt="%Y/%m/%d-%H:%M:%S") def work(event:Event,n): logging.info("I'm working") cups = [] while True: logging.info('make 1') time.sleep(0.5) cups.append(1) if len(cups) >= n: event.set() # 完成任务后将event对象设置为True break logging.info("I' finished my job, cups={}".format(cups)) def boss(event:Event): logging.info("I'M boss waiting for you") event.wait() # boss线程阻塞,等待event对象变为True logging.info("GOOD job")
e=Event() b=threading.Thread(target=boss,args=(e,),name='boss') w=threading.Thread(target=work,args=(e,5),name='worker') b.start() w.start() ''' 运行结果: 2018/10/30-21:35:23 boss 14388 I'M boss waiting for you 2018/10/30-21:35:23 worker 15244 I'm working 2018/10/30-21:35:23 worker 15244 make 1 2018/10/30-21:35:23 worker 15244 make 1 2018/10/30-21:35:24 worker 15244 make 1 2018/10/30-21:35:24 worker 15244 make 1 2018/10/30-21:35:25 worker 15244 make 1 2018/10/30-21:35:25 worker 15244 I' finished my job, cups=[1, 1, 1, 1, 1] 2018/10/30-21:35:25 boss 14388 GOOD job '''
from threading import Thread,Event import logging logging.basicConfig(level=10,format="%(asctime)s %(threadName)s %(thread)d %(message)s", datefmt="%Y/%m/%d-%H:%M:%S") def do(e:Event,interval:int): while not e.wait(interval): logging.info('do smthing') e=Event() Thread(target=do,args=(e,3),name='do-thread').start() e.wait(10) # e.wait() 阻塞当前线程(当前线程为主线程),阻塞操作10秒,如果没有等到flag变为True,则当前线程继续往后执行 # e.set() # 如果e不设置为True,则do线程继续执行 print('MainThread is end')
2、Lock
# 锁,凡是存在资源争抢的地方都可以使用锁,从而保证只有一个使用者可以独享此资源;Lock为互斥锁;
2.1、为什么要加锁
# 由于线程之间的任务执行是cpu进行随机调度的,这就有可能一个线程执行了n条指令之后就被切换到别的线程,开始执行别的线程指令;当个多线程同时操作一个对象,如果没有对该对象上锁,会造成程序的执行结果不可预期,这就为“线程不安全”;
# 为了保证多个线程操作同一个对资源象时候,对象的数据安全与完整,则需要将资源对象家锁;
2.2、加锁与解锁
# Lock 锁,一旦线程获得锁,其他试图获取锁的线程将被阻塞;
# 加锁与解锁的方法:
①:使用try...finally语句保证锁的释放;
②:with上下文管理,锁对象支持上下文管理;
l=Lock() cups=[] e=Event() def boss(): logging.info("I'm boss,waiting for you") e.wait() print('good job,cups lens={}'.format(len(cups))) def worker(): logging.info("I'm working") e.wait(1) with l: while True: if len(cups) >= 100: logging.info('Finish JOB') e.set() break else: cups.append(1) b=Thread(target=boss,name='boss') b.start() for i in range(10): w=Thread(target=worker,name='worker-{}'.format(i)) w.start()
2.3、非阻塞锁
# 阻塞锁与非阻塞锁取决于在获取锁的时候是blocking=False还是True;
# 什么时候阻塞? 当一个线程成功获取锁之后,无论是阻塞或是非阻塞都不影响该线程的继续执行;但是如果此时另外一个线程也要获取该锁,该线程的阻塞与非阻塞就要看该线程获取锁的方式了;
def test(): print('2,----') l.acquire() print('3,~~~~~~~~~') l=Lock() l.acquire() print('1,=======') Thread(target=test).start() print('===MainThreadEnd====') ''' 主线程获得锁l.acquire()对象,主线程继续执行,打印1,=======; 继续执行到Thread的时候会启动一个工作线程,执行test函数; test函数内执行到l.acquire()的时候,由于主线程并没有释放锁,并且工作线程是l.acquire() 阻塞的方式获取锁,所以工作线程被阻塞; 如果以l.acquire(False) 非阻塞的方式获取锁,则工作线程继续往下执行打印print('3,~~~~~~~~~') '''
def worker(tasks): for task in tasks: time.sleep(0.001) if task.lock.acquire(False): # 获取锁则返回True, 此时如果acquire(True) 阻塞的方式,则只有1个工作线程工作,其他的5个线程都被阻塞,因为没有释放锁; logging.info('{} {} begin to start'.format(threading.current_thread(),task.name)) else: logging.info('{} {} is working'.format(threading.current_thread(),task.name)) class Task: def __init__(self,name): self.name=name self.lock=Lock() tasks=[Task('task-{}'.format(x)) for x in range(5)] # 构造5个任务,每个任务对象都有自己的一把锁 # 启动5个线程 for i in range(5): Thread(target=worker,args=(tasks,),name='worker-{}'.format(i)).start() ''' 阻塞锁acquire()运行结果: 2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-0 begin to start 2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-1 begin to start 2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-2 begin to start 2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-3 begin to start 2018/11/01-15:33:37 worker-0 10932 <Thread(worker-0, started 10932)> task-4 begin to start 非阻塞锁acquire(False)运行结果: 2018/11/01-15:34:31 worker-2 1704 <Thread(worker-2, started 1704)> task-0 begin to start 2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-0 is working 2018/11/01-15:34:31 worker-3 2608 <Thread(worker-3, started 2608)> task-0 is working 2018/11/01-15:34:31 worker-1 5560 <Thread(worker-1, started 5560)> task-0 is working 2018/11/01-15:34:31 worker-0 9468 <Thread(worker-0, started 9468)> task-0 is working 2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-1 begin to start 2018/11/01-15:34:31 worker-2 1704 <Thread(worker-2, started 1704)> task-1 is working 2018/11/01-15:34:31 worker-3 2608 <Thread(worker-3, started 2608)> task-1 is working 2018/11/01-15:34:31 worker-0 9468 <Thread(worker-0, started 9468)> task-1 is working 2018/11/01-15:34:31 worker-1 5560 <Thread(worker-1, started 5560)> task-1 is working 2018/11/01-15:34:31 worker-4 10856 <Thread(worker-4, started 10856)> task-2 begin to start .... '''
3、RLock
# RLock,可重入锁,是线程相关的锁;
# RLock本质上与Lock类似,也有阻塞和非阻塞的模式,但是RLock锁对象内部维护一个计数器count,在一个线程内可以多次成功获取,acquire()一次就count+=1;release()一次count-=1,release次数不能大于acquire次数;
# 其他线程要成功获取该锁,则需要该锁对象内部计数器count=0;
4、Condition
# 构造方法Condition(Lock=None),可以传入一个Lock或者RLock对象,默认是RLock;
# Condition用于生产者消费者模型中,解决了生产者消费者速度匹配的问题;采用了通知机制,非常有效率;
# 使用方式:
使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with进行上下文管理(自动加锁和解锁);
消费者wait,等待通知;
生产者生产完成,对消费者发送通知,可以使用notify或者notify_all 方法;
# 生产者消费者模型代码实现
class Dispather: def __init__(self): self.data=None self.event=Event() self.cond=Condition() def produce(self,total): for _ in range(total): data=random.randint(0,100) with self.cond: logging.info(data) self.data=data self.cond.notify_all() self.event.wait(1) # 模拟生产数据速度 self.event.set() # 数据生产完成将event设置为True def consume(self): while not self.event.is_set(): # 默认event.is_set()为False,循环不进入,等待event.set后开始进入循环; with self.cond: self.cond.wait() # 阻塞等待通知 logging.info('received {}'.format(self.data)) self.event.wait(0.5) # 模拟消费速度 d=Dispather() p=Thread(target=d.produce,args=(10,),name='producer') for i in range(2): c= Thread(target=d.consume,name='consumer-{}'.format(i)) c.start() p.start()