zoukankan      html  css  js  c++  java
  • Python(多线程threading模块)

    day27

    参考:http://www.cnblogs.com/yuanchenqi/articles/5733873.html

    CPU像一本书,你不阅读的时候,你室友马上阅读,你准备阅读的时候,你室友记下他当时页码,等下次你不读的时候开始读。

    多个线程竞争执行。

    进程:A process can have one or many threads.一个进程有多个线程。

    一个线程就是一堆指令集合。

    线程和进程是同样的东西。

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

     1 import time
     2 import threading
     3 
     4 begin = time.time()
     5 def foo(n):
     6     print('foo%s'%n)
     7     time.sleep(1)
     8 
     9 def bar(n):
    10     print('bar%s'%n)
    11     time.sleep(2)
    12 
    13 
    14 # foo()
    15 # bar()
    16 # end = time.time()
    17 
    18 #并发,两个线程竞争执行
    19 t1 = threading.Thread(target = foo, args =(1,) )
    20 t2 = threading.Thread(target = bar, args =(2,) )
    21 t1.start()
    22 t2.start()
    23 
    24 t1.join()#t1,t2执行完再往下执行
    25 t2.join()
    26 #t1,t2同时执行
    27 end = time.time()
    28 
    29 
    30 print(end - begin)
    并发,两个线程竞争执行

    执行结果:

    foo1
    bar2
    2.002244710922241
    
    Process finished with exit code 0

    IO密集型任务函数(以上为IO密集型)计算效率会被提高,可用多线程

    计算密集型任务函数(以下为计算密集型)改成C语言

     1 import time
     2 import threading
     3 begin = time.time()
     4 
     5 def add(n):
     6     sum = 0
     7     for i in range(n):
     8         sum += i
     9     print(sum)
    10 
    11 # add(50000000)
    12 # add(80000000)
    13 
    14 #并发,两个线程竞争执行
    15 t1 = threading.Thread(target = add, args =(50000000,) )
    16 t2 = threading.Thread(target = add, args =(80000000,) )
    17 t1.start()
    18 t2.start()
    19 
    20 t1.join()#t1,t2执行完再往下执行
    21 t2.join()
    22 end = time.time()
    23 
    24 print(end - begin)

    计算密集型中用并发计算效率并没有提高。

    计算效率并没有提高。

    GIL

    In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once.

    在同一时刻,只能有一个线程。

    使之有多个进程就可以解决(如果三个线程无法同时进行,那么把它们分到三个进程里面去,用于解决GIL问题,实现并发)。

     线程与进程的区别:

    1. Threads share the address space of the process that created it; processes have their own address space.
    2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    4. New threads are easily created; new processes require duplication of the parent process.
    5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

    threading_test.py

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 
     5 def music(func):
     6     for i in range(2):
     7         print ("Begin listening to %s. %s" %(func,ctime()))
     8         sleep(1)
     9         print("end listening %s"%ctime())
    10 
    11 def move(func):
    12     for i in range(2):
    13         print ("Begin watching at the %s! %s" %(func,ctime()))
    14         sleep(5)
    15         print('end watching %s'%ctime())
    16 
    17 threads = []
    18 t1 = threading.Thread(target=music,args=('七里香',))
    19 threads.append(t1)
    20 t2 = threading.Thread(target=move,args=('阿甘正传',))
    21 threads.append(t2)
    22 
    23 if __name__ == '__main__':
    24 
    25     for t in threads:#线程加到了列表中
    26         # t.setDaemon(True)
    27         t.start()
    28         # t.join()
    29     # t1.join()
    30     #t2.join()########考虑这三种join位置下的结果?
    31     print ("all over %s" %ctime())
    32 
    33 #一共执行10秒

    一共只执行10秒,因为是同时执行,看哪个时间长。2*5s

    执行结果:

    Begin listening to 七里香. Fri Nov  2 16:43:09 2018
    all over Fri Nov  2 16:43:09 2018
    Begin watching at the 阿甘正传! Fri Nov  2 16:43:09 2018
    end listening Fri Nov  2 16:43:10 2018
    Begin listening to 七里香. Fri Nov  2 16:43:10 2018
    end listening Fri Nov  2 16:43:11 2018
    end watching Fri Nov  2 16:43:14 2018
    Begin watching at the 阿甘正传! Fri Nov  2 16:43:14 2018
    end watching Fri Nov  2 16:43:20 2018
    
    Process finished with exit code 0

    join

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 
     5 def music(func):
     6     for i in range(2):
     7         print ("Begin listening to %s. %s" %(func,ctime()))
     8         sleep(2)
     9         print("end listening %s"%ctime())
    10 
    11 def move(func):
    12     for i in range(2):
    13         print ("Begin watching at the %s! %s" %(func,ctime()))
    14         sleep(3)
    15         print('end watching %s'%ctime())
    16 
    17 threads = []
    18 t1 = threading.Thread(target=music,args=('七里香',))
    19 threads.append(t1)
    20 t2 = threading.Thread(target=move,args=('阿甘正传',))
    21 threads.append(t2)
    22 
    23 if __name__ == '__main__':
    24 
    25     for t in threads:#线程加到了列表中
    26         # t.setDaemon(True)
    27         t.start()
    28         #t.join() #变成了串行  t1已经执行完了,但是t2阻塞了,其中t为t2
    29     t1.join() #all over在第四秒就会被打印,因为t1四秒执行完,不再阻塞,而t2还在执行
    30     #t2.join()########考虑这三种join位置下的结果?
    31     print ("all over %s" %ctime())
    32 
    33 #一共执行6秒

    t1.join,t1执行完才能到下一步,所以4秒后才能print ("all over %s" %ctime())

    t2.join,t2执行结束才能到下一步,所以6秒后才能print ("all over %s" %ctime())

    如果将t.join()放到for循环中,即和串行一样先执行t1,再执行t2。

     

    setDeamon

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 
     5 def music(func):
     6     for i in range(2):
     7         print ("Begin listening to %s. %s" %(func,ctime()))
     8         sleep(2)
     9         print("end listening %s"%ctime())
    10 
    11 def move(func):
    12     for i in range(2):
    13         print ("Begin watching at the %s! %s" %(func,ctime()))
    14         sleep(3)
    15         print('end watching %s'%ctime())
    16 
    17 threads = []
    18 t1 = threading.Thread(target=music,args=('七里香',))
    19 threads.append(t1)
    20 t2 = threading.Thread(target=move,args=('阿甘正传',))
    21 threads.append(t2)
    22 
    23 if __name__ == '__main__':
    24 
    25     t2.setDaemon(True)
    26     for t in threads:#线程加到了列表中
    27         #t.setDaemon(True)
    28         t.start()
    29 
    30     print ("all over %s" %ctime())
    31 
    32 #主线程只会等待没设定的子线程t1,t2被设定setDaemon
    33 #t1已经执行完(4s),但是t2还没执行完,和主线程一起退出
    32 #主线程只会等待没设定的子线程t1,t2被设定setDaemon
    33 #t1已经执行完(4s),但是t2还没执行完,和主线程一起退出

    执行结果:
    Begin listening to 七里香. Fri Nov  2 17:33:29 2018
    Begin watching at the 阿甘正传! Fri Nov  2 17:33:29 2018
    all over Fri Nov  2 17:33:29 2018
    end listening Fri Nov  2 17:33:31 2018
    Begin listening to 七里香. Fri Nov  2 17:33:31 2018
    end watching Fri Nov  2 17:33:32 2018
    Begin watching at the 阿甘正传! Fri Nov  2 17:33:32 2018
    end listening Fri Nov  2 17:33:33 2018
    
    Process finished with exit code 0

    4秒就结束。

     

     print属于主线程!


    继承式调用
     1 import threading
     2 import time
     3 
     4 
     5 class MyThread(threading.Thread):#继承
     6     def __init__(self, num):
     7         threading.Thread.__init__(self)
     8         self.num = num
     9 
    10     def run(self):  # 定义每个线程要运行的函数
    11 
    12         print("running on number:%s" % self.num)
    13 
    14         time.sleep(3)
    15 if __name__ == '__main__':
    16     t1 = MyThread(1)
    17     t2 = MyThread(2)
    18     t1.start()
    19     t2.start()

    同步锁

     1 import time
     2 import threading
     3 
     4 def addNum():
     5     global num #在每个线程中都获取这个全局变量
     6 
     7     temp = num
     8 
     9     time.sleep(0.0001)#在前一次还没执行完,就开始减1
    10     num =temp-1 #对此公共变量进行-1操作
    14 
    15 num = 100  #设定一个共享变量
    16 thread_list = []
    17 r = threading.Lock()#同步锁
    18 for i in range(100):
    19 
    20     t = threading.Thread(target=addNum)
    21     t.start()
    22     thread_list.append(t)
    23 
    24 for t in thread_list: #等待所有线程执行完毕
    25     t.join()
    26 
    27 print('final num:', num )#有join所有执行完再输出

    执行结果:

    final num: 47
    
    Process finished with exit code 0

    最终结果不是0的原因:由于有sleep的原因,100个减一操作几乎同时进行,前一次还在sleep没进行减法运算,全局变量就被后一次线程进行减法运算。

    正常情况:100-1=99,99-1=98........1-1 = 0。

    有sleep:100-1=99(还没减),全局变量100被拿走,进行下一线程的运算100-1=99,造成最后结果不为0;

    解决方法:同步锁,使数据运算部分变成了串行。

     1 import time
     2 import threading
     3 
     4 def addNum():
     5     global num #在每个线程中都获取这个全局变量
     6     #num -= 1
     7 
     8     r.acquire()#同步锁,又变成串行
     9     temp = num
    10     #print('--get num:',num )
    11     time.sleep(0.0001)#在前一次还没执行完,就开始减1
    12     num =temp-1 #对此公共变量进行-1操作
    13     r.release()
    14     #只是将以上的部分变成了串行
    15 
    16     print('ok')
    17     #将不是数据的部分内容不放到锁中,100个线程同时拿到ok,这部分将不是串行,而是并发
    18 
    19 
    20 num = 100  #设定一个共享变量
    21 thread_list = []
    22 r = threading.Lock()#同步锁
    23 for i in range(100):
    24 
    25     t = threading.Thread(target=addNum)
    26     t.start()
    27     thread_list.append(t)
    28 
    29 for t in thread_list: #等待所有线程执行完毕
    30     t.join()
    31 
    32 print('final num:', num )#有join所有执

    锁中的部分变成了串行,只有运行结束才进入下一线程。

    但是锁外面的部分print('ok')还是并发的,100个线程同时拿到ok。

    死锁和递归锁

    在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

     1 import threading,time
     2 
     3 class myThread(threading.Thread):
     4     def doA(self):
     5         lockA.acquire()
     6         print(self.name,"gotlockA",time.ctime())
     7         time.sleep(3)#以上部分被A锁住
     8 
     9         lockB.acquire()#下面的也锁住
    10         print(self.name,"gotlockB",time.ctime())
    11 
    12         lockB.release()#释放后,执行doB
    13         lockA.release()
    14 
    15     def doB(self):
    16         #在此过程中,第二个线程进入,因为A,B已经被释放
    17         lockB.acquire()#有锁,正常输出,由于第二个进入,所以A(第二个线程),B(第一个线程)几乎同时获取
    18         #但是之后第一个线程想要获取A锁的时候,A锁已经被第二个线程占着,造成死锁.
    19         print(self.name,"gotlockB",time.ctime())
    20         time.sleep(2)
    21 
    22         lockA.acquire()
    23         print(self.name,"gotlockA",time.ctime())#没有被打印,反而第二个线程的被打印了
    24 
    25         lockA.release()
    26         lockB.release()
    27 
    28     def run(self):
    29         self.doA()
    30         self.doB()
    31 
    32 if __name__=="__main__":
    33 
    34     lockA=threading.Lock()#两个锁
    35     lockB=threading.Lock()
    36 
    37     #lock = threading.RLock()#该锁可以多次获取,多次acquire和release
    38     threads=[]
    39     for i in range(5):#5个线程
    40         threads.append(myThread())
    41     for t in threads:
    42         t.start()
    43     for t in threads:
    44         t.join()#等待线程结束,后面再讲。

    执行结果:

    Thread-1 gotlockA Sat Nov  3 13:40:48 2018
    Thread-1 gotlockB Sat Nov  3 13:40:51 2018
    Thread-1 gotlockB Sat Nov  3 13:40:51 2018
    Thread-2 gotlockA Sat Nov  3 13:40:51 2018

    以上程序卡住不能运行,doA运行完锁A锁B都释放,准备运行doB,休眠2秒后,获取锁A,此时由于线程锁都被释放,可以进入其他线程,如进入线程二,同时也获取锁A,两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。

    解决方法:

     1 import threading,time
     2 
     3 class myThread(threading.Thread):
     4     def doA(self):
     5         lock.acquire()
     6         print(self.name,"gotlockA",time.ctime())
     7         time.sleep(3)#以上部分被A锁住
     8 
     9         lock.acquire()#下面的也锁住
    10         print(self.name,"gotlockB",time.ctime())
    11 
    12         lock.release()#释放后,执行doB
    13         lock.release()
    14 
    15     def doB(self):
    16         #在此过程中,第二个线程进入,因为A,B已经被释放
    17         lock.acquire()#有锁,正常输出,由于第二个进入,所以A(第二个线程),B(第一个线程)几乎同时获取
    18         #但是之后第一个线程想要获取A锁的时候,A锁已经被第二个线程占着,造成死锁.
    19         print(self.name,"gotlockB",time.ctime())
    20         time.sleep(2)
    21 
    22         lock.acquire()
    23         print(self.name,"gotlockA",time.ctime())#没有被打印,反而第二个线程的被打印了
    24 
    25         lock.release()
    26         lock.release()
    27 
    28     def run(self):
    29         self.doA()
    30         self.doB()
    31 
    32 if __name__=="__main__":
    33 
    34     # lockA=threading.Lock()#两个锁
    35     # lockB=threading.Lock()
    36 
    37     lock = threading.RLock()#该锁可以多次获取,多次acquire和release
    38     threads=[]
    39     for i in range(5):#5个线程
    40         threads.append(myThread())
    41     for t in threads:
    42         t.start()
    43     for t in threads:
    44         t.join()#等待线程结束,后

    lock = threading.RLock(),该锁可以重用,只用lock。不用lockA,lockB。

    执行结果:

    Thread-1 gotlockA Sat Nov  3 13:48:04 2018
    Thread-1 gotlockB Sat Nov  3 13:48:07 2018
    Thread-1 gotlockB Sat Nov  3 13:48:07 2018
    Thread-1 gotlockA Sat Nov  3 13:48:09 2018
    Thread-3 gotlockA Sat Nov  3 13:48:09 2018
    Thread-3 gotlockB Sat Nov  3 13:48:12 2018
    Thread-4 gotlockA Sat Nov  3 13:48:12 2018
    Thread-4 gotlockB Sat Nov  3 13:48:15 2018
    Thread-4 gotlockB Sat Nov  3 13:48:15 2018
    Thread-4 gotlockA Sat Nov  3 13:48:17 2018
    Thread-2 gotlockA Sat Nov  3 13:48:17 2018
    Thread-2 gotlockB Sat Nov  3 13:48:20 2018
    Thread-2 gotlockB Sat Nov  3 13:48:20 2018
    Thread-2 gotlockA Sat Nov  3 13:48:22 2018
    Thread-5 gotlockA Sat Nov  3 13:48:22 2018
    Thread-5 gotlockB Sat Nov  3 13:48:25 2018
    Thread-3 gotlockB Sat Nov  3 13:48:25 2018
    Thread-3 gotlockA Sat Nov  3 13:48:27 2018
    Thread-5 gotlockB Sat Nov  3 13:48:27 2018
    Thread-5 gotlockA Sat Nov  3 13:48:29 2018

    信号量(Semaphore)

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

    计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

    BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

     1 import threading,time
     2 class myThread(threading.Thread):
     3     def run(self):
     4         if semaphore.acquire():
     5             print(self.name)
     6             time.sleep(3)
     7             semaphore.release()
     8 if __name__=="__main__":
     9     semaphore=threading.Semaphore(5)
    10     thrs=[]
    11     for i in range(23):
    12         thrs.append(myThread())
    13     for t in thrs:
    14         t.start()

     执行结果:

    Thread-1
    Thread-2
    Thread-3
    Thread-4
    Thread-5
    Thread-6
    Thread-7
    Thread-8
    Thread-9
    Thread-10
    Thread-11
    Thread-12
    Thread-13
    Thread-14
    Thread-15
    Thread-16
    Thread-17
    Thread-18
    Thread-19
    Thread-20
    Thread-21
    Thread-22
    Thread-23
    
    Process finished with exit code 0

    一次同时输出五个,最后一次输出三个。

    条件变量同步(Condition)

    线程间通信的作用

     1 import threading,time
     2 from random import randint
     3 class Producer(threading.Thread):
     4     def run(self):
     5         global L#一屉
     6         while True:
     7             val=randint(0,100)
     8             print('生产者',self.name,":Append"+str(val),L)
     9 
    10             if lock_con.acquire():#锁 ,与lock_con.acquire()一样
    11                 L.append(val)#做包子,从后面加
    12                 lock_con.notify()#通知wait,激活wait
    13                 lock_con.release()
    14             time.sleep(3)
    15 class Consumer(threading.Thread):
    16     def run(self):
    17         global L
    18         while True:
    19                 lock_con.acquire()
    20                 if len(L)==0:#没包子
    21                     lock_con.wait()#wait阻塞
    22 
    23                 print('消费者',self.name,":Delete"+str(L[0]),L)
    24                 del L[0]#从前面吃
    25                 lock_con.release()
    26                 time.sleep(0.1)
    27 
    28 if __name__=="__main__":
    29 
    30     L=[]
    31     lock_con=threading.Condition()#条件变量的锁
    32     threads=[]
    33     for i in range(5):#启动五个人在做包子,5个线程
    34         threads.append(Producer())
    35     threads.append(Consumer())#
    36     for t in threads:
    37         t.start()
    38     for t in threads:
    39         t.join()

    当一屉中有包子的时候,notify激活waiting,添加包子,和吃包子时有线程锁。

    同步条件(Event)

     1 import threading,time
     2 
     3 class Boss(threading.Thread):
     4     def run(self):
     5         print("BOSS:今晚大家都要加班到22:00。")
     6         event.isSet() or event.set()#set()设为true
     7         time.sleep(5)
     8         print("BOSS:<22:00>可以下班了。")
     9         event.isSet() or event.set()
    10 
    11 class Worker(threading.Thread):
    12     def run(self):
    13         event.wait()#等待老板决定,阻塞
    14         print("Worker:哎……命苦啊!")
    15         #event.clear()  # 标志位 False 等老板说可以下班, 设为true
    16         time.sleep(1)
    17         event.clear()#标志位 False 等老板说可以下班, 设为true
    18         event.wait()#等老板说别的 ,设为true后
    19         print("Worker:OhYeah!") #print Oh,Yeah
    20 
    21 if __name__=="__main__":
    22     event=threading.Event()
    23     threads=[]
    24     for i in range(5):#五个worker
    25         threads.append(Worker())
    26     threads.append(Boss())#一个老板
    27     for t in threads:
    28         t.start()
    29     for t in threads:
    30         t.join()

    boss说完后,5个worker马上能有反应。boss输出后,even.set(),标志位变为True,worker中的event.wait()才能停止阻塞。之后还需将标志位设为False,即event.clear()。

    再次等待boss说完话后even.set()将标志位变为True,worker再次发言。

    执行结果:

    BOSS:今晚大家都要加班到22:00。
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    BOSS:<22:00>可以下班了。
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    
    Process finished with exit code 0
     
  • 相关阅读:
    OCP-1Z0-053-200题-125题-155
    OCP-1Z0-053-200题-127题-154答案貌似都不对?
    OCP-1Z0-053-200题-128题-281
    OCP-1Z0-053-200题-129题-153
    OCP-1Z0-053-200题-130题-288
    OCP-1Z0-053-200题-131题-152
    OCP-1Z0-053-200题-132题-272
    OCP-1Z0-053-200题-133题-151
    OCP-1Z0-053-200题-134题-4
    OCP-1Z0-053-200题-135题-150
  • 原文地址:https://www.cnblogs.com/112358nizhipeng/p/9896292.html
Copyright © 2011-2022 走看看