21 interest=0.05 22 count=amount+amount*interest 23 24 self.withdraw(count) 25 26 27 def transfer(_from, to, amount): 28 29 #锁不可以加在这里 因为其他的线程执行的其它方法在不加锁的情况下数据同样是不安全的 30 _from.withdraw(amount) 31 to.deposit(amount) 32 33 alex = Account('alex',1000) 34 yuan = Account('yuan',1000) 35 36 t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) 37 t1.start() 38 39 t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) 40 t2.start() 41 42 t1.join() 43 t2.join() 44 45 print('>>>',alex.balance) 46 print('>>>',yuan.balance)
6.条件变量同步(Condition)
有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。
- wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
- notify():条件创造后调用,通知等待池激活一个线程;
- notifyAll():条件创造后调用,通知等待池激活所有线程。
示例代码:
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print('生产者',self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() 21 print('消费者',self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()
7.同步条件(Event)
条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;
- event.isSet():返回event的状态值;
- event.wait():如果 event.isSet()==False将阻塞线程;
- event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
- event.clear():恢复event的状态值为False。
例子1:
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚大家都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>可以下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()
例子2:
1 import threading,time 2 import random 3 def light(): 4 if not event.isSet(): 5 event.set() #wait就不阻塞 #绿灯状态 6 count = 0 7 while True: 8 if count < 10: 9 print(' 33[42;1m--green light on--- 33[0m') 10 elif count <13: 11 print(' 33[43;1m--yellow light on--- 33[0m') 12 elif count <20: 13 if event.isSet(): 14 event.clear() 15 print(' 33[41;1m--red light on--- 33[0m') 16 else: 17 count = 0 18 event.set() #打开绿灯 19 time.sleep(1) 20 count +=1 21 def car(n): 22 while 1: 23 time.sleep(random.randrange(10)) 24 if event.isSet(): #绿灯 25 print("car [%s] is running.." % n) 26 else: 27 print("car [%s] is waiting for the red light.." %n) 28 if __name__ == '__main__': 29 event = threading.Event() 30 Light = threading.Thread(target=light) 31 Light.start() 32 for i in range(3): 33 t = threading.Thread(target=car,args=(i,)) 34 t.start()
8.信号量
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
例子:
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(5) 7 semaphore.release() 8 if __name__=="__main__": 9 semaphore=threading.Semaphore(5) 10 thrs=[] 11 for i in range(100): 12 thrs.append(myThread()) 13 for t in thrs: 14 t.start()
9.队列Queue
使用队列方法:
1 创建一个“队列”对象 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 5 6 将一个值放入队列中 7 q.put(10) 8 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为 9 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。 10 11 将一个值从队列中取出 12 q.get() 13 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 14 15 Python Queue模块有三种队列及构造函数: 16 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 17 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 18 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 19 20 此包中的常用方法(q = Queue.Queue()): 21 q.qsize() 返回队列的大小 22 q.empty() 如果队列为空,返回True,反之False 23 q.full() 如果队列满了,返回True,反之False 24 q.full 与 maxsize 大小对应 25 q.get([block[, timeout]]) 获取队列,timeout等待时间 26 q.get_nowait() 相当q.get(False) 27 非阻塞 q.put(item) 写入队列,timeout等待时间 28 q.put_nowait(item) 相当q.put(item, False) 29 q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 30 q.join() 实际上意味着等到队列为空,再执行别的操作
例子集合:

1 #例子1: 2 import threading,queue 3 from time import sleep 4 from random import randint 5 class Production(threading.Thread): 6 def run(self): 7 while True: 8 r=randint(0,100) 9 q.put(r) 10 print("生产出来%s号包子"%r) 11 sleep(1) 12 class Proces(threading.Thread): 13 def run(self): 14 while True: 15 re=q.get() 16 print("吃掉%s号包子"%re) 17 if __name__=="__main__": 18 q=queue.Queue(10) 19 threads=[Production(),Production(),Production(),Proces()] 20 for t in threads: 21 t.start() 22 23 #例子2 24 import time,random 25 import queue,threading 26 q = queue.Queue() 27 def Producer(name): 28 count = 0 29 while count <20: 30 time.sleep(random.randrange(3)) 31 q.put(count) 32 print('Producer %s has produced %s baozi..' %(name, count)) 33 count +=1 34 def Consumer(name): 35 count = 0 36 while count <20: 37 time.sleep(random.randrange(4)) 38 if not q.empty(): 39 data = q.get() 40 print(data) 41 print(' 33[32;1mConsumer %s has eat %s baozi... 33[0m' %(name, data)) 42 else: 43 print("-----no baozi anymore----") 44 count +=1 45 p1 = threading.Thread(target=Producer, args=('A',)) 46 c1 = threading.Thread(target=Consumer, args=('B',)) 47 p1.start() 48 c1.start() 49 50 #例子3 51 #实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块) 52 # 实现一个线程从上面的队列里面不断的取出奇数 53 # 实现另外一个线程从上面的队列里面不断取出偶数 54 55 import random,threading,time 56 from queue import Queue 57 #Producer thread 58 class Producer(threading.Thread): 59 def __init__(self, t_name, queue): 60 threading.Thread.__init__(self,name=t_name) 61 self.data=queue 62 def run(self): 63 for i in range(10): #随机产生10个数字 ,可以修改为任意大小 64 randomnum=random.randint(1,99) 65 print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)) 66 self.data.put(randomnum) #将数据依次存入队列 67 time.sleep(1) 68 print ("%s: %s finished!" %(time.ctime(), self.getName())) 69 70 #Consumer thread 71 class Consumer_even(threading.Thread): 72 def __init__(self,t_name,queue): 73 threading.Thread.__init__(self,name=t_name) 74 self.data=queue 75 def run(self): 76 while 1: 77 try: 78 val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒 79 if val_even%2==0: 80 print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)) 81 time.sleep(2) 82 else: 83 self.data.put(val_even) 84 time.sleep(2) 85 except: #等待输入,超过5秒 就报异常 86 print ("%s: %s finished!" %(time.ctime(),self.getName())) 87 break 88 class Consumer_odd(threading.Thread): 89 def __init__(self,t_name,queue): 90 threading.Thread.__init__(self, name=t_name) 91 self.data=queue 92 def run(self): 93 while 1: 94 try: 95 val_odd = self.data.get(1,5) 96 if val_odd%2!=0: 97 print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)) 98 time.sleep(2) 99 else: 100 self.data.put(val_odd) 101 time.sleep(2) 102 except: 103 print ("%s: %s finished!" % (time.ctime(), self.getName())) 104 break 105 #Main thread 106 def main(): 107 queue = Queue() 108 producer = Producer('Pro.', queue) 109 consumer_even = Consumer_even('Con_even.', queue) 110 consumer_odd = Consumer_odd('Con_odd.',queue) 111 producer.start() 112 consumer_even.start() 113 consumer_odd.start() 114 producer.join() 115 consumer_even.join() 116 consumer_odd.join() 117 print ('All threads terminate!') 118 119 if __name__ == '__main__': 120 main() 121 122 #注意:列表是线程不安全的 123 #例子4 124 import threading,time 125 126 li=[1,2,3,4,5] 127 128 def pri(): 129 while li: 130 a=li[-1] 131 print(a) 132 time.sleep(1) 133 try: 134 li.remove(a) 135 except: 136 print('----',a) 137 138 t1=threading.Thread(target=pri,args=()) 139 t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
分类: python高级