线程threeding: + join
练习:
1 #!/usr/bin/env python3 2 #-*- coding:utf-8 -*- 3 ''' 4 Administrator 5 2018/8/10 6 ''' 7 8 import threading 9 from time import time,sleep,asctime,ctime 10 11 def music(name): 12 for i in range(2): 13 print("......music %s .. %s is doing %s "%(i,name,ctime())) 14 sleep(2) 15 print(".on mygod.....music %s.. %s is end %s" %(i,name,ctime())) 16 def move(name): 17 for i in range(2): 18 19 print("......move%s .. %s is doing %s "%(i,name,ctime())) 20 sleep(3) 21 print(".on mygod..%s...move .. %s is end %s" %(i,name,ctime()) ) 22 23 threads=[] 24 t1=threading.Thread(target=music,args=("七里香",)) 25 threads.append(t1) 26 t2=threading.Thread(target=move,args=("我不是药神",)) 27 threads.append(t2) 28 29 if __name__=="__main__": 30 for t in threads: 31 t.start() 32 t.join() 33 34 35 print("all is end.")
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 def __init__(self,num): 7 threading.Thread.__init__(self) 8 self.num = num 9 10 def run(self):#定义每个线程要运行的函数 11 12 print("running on number:%s" %self.num) 13 14 time.sleep(3) 15 16 if __name__ == '__main__': 17 18 t1 = MyThread(1) 19 t2 = MyThread(2) 20 t1.start() 21 t2.start()
1 import threading 2 from time import ctime,sleep 3 import time 4 5 def music(func): 6 for i in range(2): 7 print ("Begin listening to %s. %s" %(func,ctime())) 8 sleep(4) 9 print("end listening %s"%ctime()) 10 11 def move(func): 12 for i in range(2): 13 print ("Begin watching at the %s! %s" %(func,ctime())) 14 sleep(5) 15 print('end watching %s'%ctime()) 16 17 threads = [] 18 t1 = threading.Thread(target=music,args=('七里香',)) 19 threads.append(t1) 20 t2 = threading.Thread(target=move,args=('阿甘正传',)) 21 threads.append(t2) 22 23 if __name__ == '__main__': 24 25 for t in threads: 26 # t.setDaemon(True) 27 t.start() 28 # t.join() 29 # t1.join() 30 t2.join()########考虑这三种join位置下的结果? 31 print ("all over %s" %ctime())
setDaemon(True):
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦
1 #!/usr/bin/env python3 2 #-*- coding:utf-8 -*- 3 ''' 4 Administrator 5 2018/8/10 6 ''' 7 8 import threading 9 from time import time,sleep,asctime,ctime 10 11 def music(name): 12 for i in range(2): 13 print("......music %s .. %s is doing %s "%(i,name,ctime())) 14 sleep(2) 15 print(".on mygod.....music %s.. %s is end %s" %(i,name,ctime())) 16 def move(name): 17 for i in range(2): 18 19 print("......move%s .. %s is doing %s "%(i,name,ctime())) 20 sleep(3) 21 print(".on mygod..%s...move .. %s is end %s" %(i,name,ctime()) ) 22 23 threads=[] 24 t1=threading.Thread(target=music,args=("七里香",)) 25 threads.append(t1) 26 t2=threading.Thread(target=move,args=("我不是药神",)) 27 threads.append(t2) 28 29 if __name__=="__main__": 30 for t in threads: 31 t.setDaemon(True)#守护线程 32 t.start() 33 # t.join() 34 print("all is end. %s"%ctime())
1 #!/usr/bin/env python3 2 #-*- coding:utf-8 -*- 3 ''' 4 Administrator 5 2018/8/10 6 ''' 7 8 import threading 9 from time import time,sleep,asctime,ctime 10 11 def music(name): 12 for i in range(2): 13 print("......music %s .. %s is doing %s "%(i,name,ctime())) 14 sleep(2) 15 print(".on mygod.....music %s.. %s is end %s" %(i,name,ctime())) 16 def move(name): 17 for i in range(2): 18 19 print("......move%s .. %s is doing %s "%(i,name,ctime())) 20 sleep(3) 21 print(".on mygod..%s...move .. %s is end %s" %(i,name,ctime()) ) 22 23 threads=[] 24 t1=threading.Thread(target=music,args=("七里香",)) 25 threads.append(t1) 26 t2=threading.Thread(target=move,args=("我不是药神",)) 27 threads.append(t2) 28 29 if __name__=="__main__": 30 t2.setDaemon(True) # 守护线程 31 for t in threads: 32 # t.setDaemon(True)#守护线程 33 t.start() 34 # t.join() 35 print("all is end. %s"%ctime()) 36 37 """ 38 ......music 0 .. 七里香 is doing Fri Aug 10 12:21:46 2018 39 ......move0 .. 我不是药神 is doing Fri Aug 10 12:21:46 2018 40 all is end. Fri Aug 10 12:21:46 2018 41 .on mygod.....music 0.. 七里香 is end Fri Aug 10 12:21:48 2018 42 ......music 1 .. 七里香 is doing Fri Aug 10 12:21:48 2018 43 .on mygod..0...move .. 我不是药神 is end Fri Aug 10 12:21:49 2018 44 ......move1 .. 我不是药神 is doing Fri Aug 10 12:21:49 2018 45 .on mygod.....music 1.. 七里香 is end Fri Aug 10 12:21:50 2018 46 .on mygod..1...move .. 我不是药神 is end Fri Aug 10 12:21:52 2018 47 48 -----------------------t2.setDaemon(True)---------------------------------------------- 49 ......music 0 .. 七里香 is doing Fri Aug 10 12:20:30 2018 50 ......move0 .. 我不是药神 is doing Fri Aug 10 12:20:30 2018 51 all is end. Fri Aug 10 12:20:30 2018 52 .on mygod.....music 0.. 七里香 is end Fri Aug 10 12:20:32 2018 53 ......music 1 .. 七里香 is doing Fri Aug 10 12:20:32 2018 54 .on mygod..0...move .. 我不是药神 is end Fri Aug 10 12:20:33 2018 55 ......move1 .. 我不是药神 is doing Fri Aug 10 12:20:33 2018 56 .on mygod.....music 1.. 七里香 is end Fri Aug 10 12:20:34 2018 #### t1结束后, 程序就立即结束 57 58 """
join():
在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
其它方法
1 thread 模块提供的其他方法: 2 # threading.currentThread(): 返回当前的线程变量。 3 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 4 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。 5 # 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法: 6 # run(): 用以表示线程活动的方法。 7 # start():启动线程活动。 8 # join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 9 # isAlive(): 返回线程是否活动的。 10 # getName(): 返回线程名。 11 # setName(): 设置线程名。
2018-08-1013:17:35
- GIL 导致python没有办法真正利用系统的多核CUP
同步锁
ThreadLocal
1 import time,threading 2 3 4 5 6 num=100 7 def addNum(): 8 global num 9 # num-=1 10 temp=num 11 print("ok",num) 12 num=temp-1 13 14 15 thread_list=[] 16 for i in range(100): 17 t=threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: 22 t.join() 23 24 25 print("final results",num)
1 import time,threading 2 3 start=time.time() 4 num=100 5 r=threading.Lock() 6 7 def addNum(): 8 global num 9 # num-=1 10 r.acquire() #加锁 11 temp=num 12 time.sleep(0.1) 13 num=temp-1 14 r.release() #解锁 15 16 17 thread_list=[] 18 for i in range(100): 19 t=threading.Thread(target=addNum) 20 t.start() 21 thread_list.append(t) 22 23 for t in thread_list: 24 t.join() 25 26 27 print("final results",num) 28 end=time.time() 29 print(end-start)
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:
1 def process_student(name): 2 std = Student(name) 3 # std是局部变量,但是每个函数都要用它,因此必须传进去: 4 do_task_1(std) 5 do_task_2(std) 6 7 def do_task_1(std): 8 do_subtask_1(std) 9 do_subtask_2(std) 10 11 def do_task_2(std): 12 do_subtask_2(std) 13 do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的Student
对象,不能共享。
如果用一个全局dict
存放所有的Student
对象,然后以thread
自身作为key
获得线程对应的Student
对象如何?
1 global_dict = {} 2 3 def std_thread(name): 4 std = Student(name) 5 # 把std放到全局变量global_dict中: 6 global_dict[threading.current_thread()] = std 7 do_task_1() 8 do_task_2() 9 10 def do_task_1(): 11 # 不传入std,而是根据当前线程查找: 12 std = global_dict[threading.current_thread()] 13 ... 14 15 def do_task_2(): 16 # 任何函数都可以查找出当前线程的std变量: 17 std = global_dict[threading.current_thread()]
...
这种方式理论上是可行的,它最大的优点是消除了std
对象在每层函数中的传递问题,但是,每个函数获取std
的代码有点丑。
有没有更简单的方式?
ThreadLocal
应运而生,不用查找dict
,ThreadLocal
帮你自动做这件事:
1 import threading 2 3 # 创建全局ThreadLocal对象: 4 local_school = threading.local() 5 6 def process_student(): 7 # 获取当前线程关联的student: 8 std = local_school.student 9 print('Hello, %s (in %s)' % (std, threading.current_thread().name)) 10 11 def process_thread(name): 12 # 绑定ThreadLocal的student: 13 local_school.student = name 14 process_student() 15 16 t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') 17 t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') 18 t1.start() 19 t2.start() 20 t1.join() 21 t2.join()
执行结果:
Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
全局变量local_school
就是一个ThreadLocal
对象,每个Thread
对它都可以读写student
属性,但互不影响。你可以把local_school
看成全局变量,但每个属性如local_school.student
都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal
内部会处理。
可以理解为全局变量local_school
是一个dict
,不但可以用local_school.student
,还可以绑定其他变量,如local_school.teacher
等等。
ThreadLocal
最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
小结
一个ThreadLocal
变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal
解决了参数在一个线程中各个函数之间互相传递的问题。
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每个线程中都获取这个全局变量 6 # num-=1 7 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #对此公共变量进行-1操作 12 13 14 num = 100 #设定一个共享变量 15 thread_list = [] 16 for i in range(100): 17 t = threading.Thread(target=addNum) 18 t.start() 19 thread_list.append(t) 20 21 for t in thread_list: #等待所有线程执行完毕 22 t.join() 23 24 print('final num:', num )
注意:
1: why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内)
2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99.
多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?
有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。
我们可以通过同步锁来解决这种问题
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每个线程中都获取这个全局变量 6 # num-=1 7 lock.acquire() 8 temp=num 9 print('--get num:',num ) 10 #time.sleep(0.1) 11 num =temp-1 #对此公共变量进行-1操作 12 lock.release() 13 14 num = 100 #设定一个共享变量 15 thread_list = [] 16 lock=threading.Lock() 17 18 for i in range(100): 19 t = threading.Thread(target=addNum) 20 t.start() 21 thread_list.append(t) 22 23 for t in thread_list: #等待所有线程执行完毕 24 t.join() 25 26 print('final num:', num )
问题解决,但
请问:同步锁与GIL的关系?
Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。
但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
线程死锁和递归锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() 11 lockA.release() 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join()#等待线程结束,后面再讲。
解决办法:使用递归锁,将
lockA=threading.Lock() lockB=threading.Lock()<br data-filtered="filtered">#--------------<br data-filtered="filtered">lock=threading.RLock()
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
应用
1 import time 2 3 import threading 4 5 class Account: 6 def __init__(self, _id, balance): 7 self.id = _id 8 self.balance = balance 9 self.lock = threading.RLock() 10 11 def withdraw(self, amount): 12 13 with self.lock: 14 self.balance -= amount 15 16 def deposit(self, amount): 17 with self.lock: 18 self.balance += amount 19 20 21 def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景 22 23 with self.lock: 24 interest=0.05 25 count=amount+amount*interest 26 27 self.withdraw(count) 28 29 30 def transfer(_from, to, amount): 31 32 #锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的 33 _from.withdraw(amount) 34 35 to.deposit(amount) 36 37 38 39 alex = Account('alex',1000) 40 yuan = Account('yuan',1000) 41 42 t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) 43 t1.start() 44 45 t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) 46 t2.start() 47 48 t1.join() 49 t2.join() 50 51 print('>>>',alex.balance) 52 print('>>>',yuan.balance)
死锁现象及其解决方案
1 import threading 2 3 class Account: 4 def __init__(self,name,money,r): 5 self.name=name 6 self.balance=money 7 8 9 10 def withdraw(self,num): 11 r.acquire() 12 self.balance-=num 13 r.release() 14 def repay(self,num): 15 r.acquire() 16 self.balance+=num 17 r.release() 18 def abc(self,num): 19 r.acquire() 20 #当出现以下这种情况的时候,就可能会出现死锁现象。此时就需要使用 这个解决方案 21 self.withdraw() 22 self.balance+=num 23 r.release() 24 def transer(_from,to,count): 25 26 # r.acquire() 27 _from.withdraw(count) 28 to.repay(count) 29 # r.release() 30 r=threading.RLock() 31 a1=Account("alex",1000,r) 32 a2=Account("xiaohu",2000,r) 33 print("%s账户余额:%s"%(a1.name,a1.balance)) 34 35 36 print("%s账户余额:%s"%(a2.name,a2.balance)) 37 38 t1=threading.Thread(target=transer,args=(a1,a2,100)) 39 t2=threading.Thread(target=transer,args=(a2,a1,400)) 40 41 t1.start() 42 t2.start() 43 t1.join() 44 t2.join() 45 print("%s账户余额:%s"%(a1.name,a1.balance)) 46 47 48 print("%s账户余额:%s"%(a2.name,a2.balance))
结果:
条件变量同步(Condition)
有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。
wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
notify():条件创造后调用,通知等待池激活一个线程;
notifyAll():条件创造后调用,通知等待池激活所有线程。
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print('生产者',self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() 21 print('消费者',self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()
同步条件(Event)
条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。event=threading.Event():条件环境对象,初始值 为False;
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
实例1:
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚大家都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>可以下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()
实例2:
1 import threading,time 2 import random 3 def light(): 4 if not event.isSet(): 5 event.set() #wait就不阻塞 #绿灯状态 6 count = 0 7 while True: 8 if count < 10: 9 print('