zoukankan      html  css  js  c++  java
  • Python的并发并行[1] -> 线程[3] -> 多线程的同步控制

    多线程的控制方式


    目录

    1. 唤醒单个线程等待
    2. 唤醒多个线程等待
    3. 条件函数等待
    4. 事件触发标志
    5. 函数延迟启动
    6. 设置线程障碍

    1 唤醒单个线程等待

    Condition类相当于一把高级的锁,可以进行一些复杂的线程同步控制。一般Condition内部都有一把内置的锁对象(默认为RLock),对于Condition的使用主要有以下步骤:

    1. 建立两个线程对象,及Condition对象;
    2. 线程1首先获取Condition的锁权限,acquire();
    3. 线程1执行需要完成的任务后,调用等待wait(),此时,线程1会阻塞挂起,出让内置锁的控制权(即Condition可被其他线程acquire);
    4. 线程2对Condition内置锁的权限进行获取acquire(),当线程2执行需要完成的任务后,会调用唤醒,notify(),但此时线程1并不会立即被唤醒继续执行,而是要等待线程2释放Condition的内置锁release()之后,线程1才会唤醒;
    5. 线程2释放锁release()之后,线程1自动重新对锁进行占有,直到线程1完成所有任务后,再执行释放锁release(),从而结束。
     1 from threading import Thread, Condition
     2 import time
     3 cond = Condition()
     4 
     5 class Sleeper(Thread):
     6     def __init__(self):
     7         super(Sleeper, self).__init__()
     8 
     9     def run(self):
    10         print('Sleeper gets into room, starts sleeping')
    11         cond.acquire()
    12         print('Sleeper is sleeping, waiting for wakeup')
    13         cond.wait()
    14         print('Sleeper is waked up')
    15         cond.release()
    16         print('Sleeper out of room')
    17 
    18 
    19 class Waker(Thread):
    20     def __init__(self):
    21         super(Waker, self).__init__()
    22 
    23     def run(self):
    24         print('Waker waiting sleeper getting into room and sleeping...')
    25         time.sleep(1)
    26         cond.acquire()
    27         print('Waker gets into the room')
    28         cond.notify()
    29         print('Waker trying to wake up sleeper')
    30         time.sleep(3)
    31         print('Waker finished wake up, leave the room')
    32         cond.release()
    33 
    34 sleeper = Sleeper()
    35 waker = Waker()
    36 sleeper.start()
    37 waker.start()

    上面的代码中,首先导入所需的模块,生成Condition的实例,之后派生两个线程模拟sleeper和waker,其中sleeper会先对Condition进行获取权限,随后进入等待,而waker会在sleeper进入等待后,获取Condition的权限,然后尝试唤醒sleeper,随后释放Condition锁,将占有权归还,sleeper收到归还的权限后,调用release进行释放。

    运行得到结果

    Sleeper gets into room, starts sleeping  
    Sleeper is sleeping, waiting for wakeup  
    Waker waiting sleeper getting into room and sleeping...  
    Waker gets into the room  
    Waker trying to wake up sleeper  
    Waker finished wake up, leave the room  
    Sleeper is waked up  
    Sleeper out of room  

    从输出的结果中可以看到,waker在notify了sleeper之后,等待了3秒,而这三秒内,sleeper并未被彻底唤醒,而是等待waker的release()之后,sleeper才被真正唤醒继续执行。

    2 唤醒多个线程等待

    当有多个线程进入Condition的条件等待时,可以使用两种方式进行唤醒,第一种是利用等数量的线程唤醒,第二种是利用notify_all()函数唤醒所有线程。

    下面的例子对比了两种唤醒方式

     1 from threading import Thread, Condition
     2 import random
     3 import time
     4 cond = Condition()
     5 
     6 class Sleeper(Thread):
     7     def __init__(self):
     8         super(Sleeper, self).__init__()
     9         self.num = self.name[-1]
    10         print('Sleeper_%s ready' % self.num)
    11 
    12     def run(self):
    13         print('Sleeper_%s gets into room, starts sleeping' % self.num)
    14         cond.acquire()
    15         print('Sleeper_%s is sleeping, waiting for wakeup' % self.num)
    16         cond.wait()
    17         print('Sleeper_%s is waked up' % self.num)
    18         cond.release()
    19         print('Sleeper_%s out of room' % self.num)
    20 
    21 
    22 class Waker(Thread):
    23     def __init__(self, all=False):
    24         super(Waker, self).__init__()
    25         self.all = all
    26         self.num = self.name[-1]
    27         print('Waker_%s ready' % self.num)
    28 
    29     def run(self):
    30         print('Waker_%s waiting sleeper getting into room and sleeping...' % self.num)
    31         time.sleep(1)
    32         cond.acquire()
    33         print('Waker_%s gets into the room' % self.num)
    34         if self.all:
    35             cond.notify_all()
    36         else:
    37             cond.notify()
    38         print('Waker_%s trying to wake up sleeper' % self.num)
    39         time.sleep(3)
    40         print('Waker_%s finished wake up, leave the room' % self.num)
    41         cond.release()
    42 
    43 if __name__ == '__main__':
    44     print('-------Order Wake-------')
    45     sleepers = []
    46     wakers = []
    47     for i in range(3):
    48         sleepers.append(Sleeper())
    49     for i in range(3):
    50         wakers.append(Waker())
    51     
    52     random.shuffle(sleepers)
    53     random.shuffle(wakers)
    54     
    55     for s in sleepers:
    56         s.start()
    57     for s in wakers:
    58         s.start()
    59     for s in sleepers:
    60         s.join()
    61     
    62     print('-------All Wake-------')
    63     sleepers = []
    64     for i in range(3):
    65         sleepers.append(Sleeper())
    66     waker = Waker(all=True)
    67 
    68     random.shuffle(sleepers)
    69 
    70     for s in sleepers:
    71         s.start()
    72     waker.start()

    上面的代码中,在导入所需模块后,定义Sleeper和Waker类,并让他们拥有各自名字,在主函数中首先利用单个唤醒的方式去唤醒所有线程,随后再利用全部唤醒的方式唤醒所有线程,其中随机数模块用于重新排序Sleeper。

    运行得到结果

    -------Order Wake-------  
    Sleeper_1 ready  
    Sleeper_2 ready  
    Sleeper_3 ready  
    Waker_4 ready  
    Waker_5 ready  
    Waker_6 ready  
    Sleeper_1 gets into room, starts sleeping  
    Sleeper_1 is sleeping, waiting for wakeup  
    Sleeper_3 gets into room, starts sleeping  
    Sleeper_3 is sleeping, waiting for wakeup  
    Sleeper_2 gets into room, starts sleeping  
    Sleeper_2 is sleeping, waiting for wakeup  
    Waker_4 waiting sleeper getting into room and sleeping...  
    Waker_6 waiting sleeper getting into room and sleeping...  
    Waker_5 waiting sleeper getting into room and sleeping...  
    Waker_6 gets into the room  
    Waker_6 trying to wake up sleeper  
    Waker_6 finished wake up, leave the room  
    Waker_5 gets into the room  
    Waker_5 trying to wake up sleeper  
    Waker_5 finished wake up, leave the room  
    Sleeper_1 is waked up  
    Sleeper_1 out of room  
    Waker_4 gets into the room  
    Waker_4 trying to wake up sleeper  
    Waker_4 finished wake up, leave the room  
    Sleeper_3 is waked up  
    Sleeper_3 out of room  
    Sleeper_2 is waked up  
    Sleeper_2 out of room  
    -------All Wake-------  
    Sleeper_7 ready  
    Sleeper_8 ready  
    Sleeper_9 ready  
    Waker_0 ready  
    Sleeper_9 gets into room, starts sleeping  
    Sleeper_9 is sleeping, waiting for wakeup  
    Sleeper_7 gets into room, starts sleeping  
    Sleeper_7 is sleeping, waiting for wakeup  
    Sleeper_8 gets into room, starts sleeping  
    Sleeper_8 is sleeping, waiting for wakeup  
    Waker_0 waiting sleeper getting into room and sleeping...  
    Waker_0 gets into the room  
    Waker_0 trying to wake up sleeper  
    Waker_0 finished wake up, leave the room  
    Sleeper_9 is waked up  
    Sleeper_9 out of room  
    Sleeper_7 is waked up  
    Sleeper_7 out of room  
    Sleeper_8 is waked up  
    Sleeper_8 out of room  
    View Code

    从输出的结果中可以看出,

    1. Sleeper按顺序生成,按乱序启动;
    2. Waker的单次唤醒,首先唤醒最先进入等待的线程,而不是最先生成的,即唤醒的顺序是按照acquire后进入wait的顺序先后进行唤醒(出让锁控制权的顺序);
    3. Waker的notify_all函数能够一次唤醒所有等待线程,顺序与单个唤醒相同。

    3 条件函数等待

    利用Condition类中的wait_for函数可以实现一种条件等待,当等待的函数返回值为真的时候,线程会被唤醒并继续执行。查看wait_for源码可以看出,调用wait_for函数之后,会对传入的函数进行调用,若返回结果为True,则返回True,或返回结果为False,则进入循环中,通过对时间限制的判断,最终阻塞在wait函数上,直到超时后再次获取传入函数的返回值,若依旧False,则会由于超时判断而退出循环。

    下面的例子中定义了一个等待类,以及alarm函数,waiter会在启动后调用条件等待,等待函数为alarm,当alarm返回为真后,继续函数。

     1 from threading import Thread, Condition
     2 import time
     3 cond = Condition()
     4 
     5 class Waiter(Thread):
     6     def __init__(self, alarm):
     7         super(Waiter, self).__init__()
     8         self.alarm = alarm
     9         print('Waiter ready')
    10 
    11     def run(self):
    12         cond.acquire()
    13         print('Waiter waiting for alarm...')
    14         cond.wait_for(self.alarm, timeout=1)
    15         print('Waiter received alarm')
    16         cond.release()
    17         print('Waiter completed')
    18 
    19 def alarm():
    20     print('Sending alarm...')
    21     time.sleep(5)
    22     print('alarm sent')
    23     return True
    24 
    25 waiter = Waiter(alarm)
    26 waiter.run()

    运行得到结果

    Waiter ready  
    Waiter waiting for alarm...  
    Sending alarm...  
    alarm sent  
    Waiter received alarm  
    Waiter completed  

    从输出的结果可以看出,waiter也会等待alarm函数结束返回结果后再继续进行

    4 事件触发标志

    Event与Condition类似,实质上是一个对内置Flag监测的事件标志触发类,在生成的Event实例内,会由一个内置Flag,初始状态为False,当Event类调用wait函数时,会查看内置Flag的状态,若为True则不会阻塞,若为False则调用内置Condition类的wait函数等待唤醒。

    通过对源码的查看可以看出,实质上

    1. Event的wait函数是利用Flag以及Condition的wait函数实现;
    2. Event的set函数是利用Condition的with(acquire, release)获得控制,并将Flag置为Ture,同时notify_all();
    3. Event的clear函数同样是利用Condition的with获得控制,然后将标志Flag置为False。 

    下面的例子利用了set, wait, clear函数实现了一个传球模拟。

     1 from threading import Thread, Event
     2 import time
     3 
     4 evt = Event()
     5 class Mate_1(Thread):
     6     def run(self):
     7         print('Mate_1 is running') 
     8         time.sleep(1)
     9         print('Mate_1 got the ball')
    10         time.sleep(1)
    11         evt.set()
    12         print('Mate_1 pass the ball to others')
    13         evt.clear()
    14         evt.wait()
    15         time.sleep(1)
    16         print('Mate_1 got the ball again')
    17         time.sleep(1)
    18         print('Mate_1 makes a goal')
    19         evt.set()
    20 
    21 class Mate_2(Thread):
    22     def run(self):
    23         print('Mate_2 is running')
    24         if evt.is_set():
    25             evt.clear()
    26         evt.wait()
    27         time.sleep(1)
    28         print('Mate_2 got the ball')
    29         time.sleep(1)
    30         evt.set()
    31         print('Mate_2 pass the ball to others')
    32         time.sleep(1)
    33         evt.clear()
    34         evt.wait()
    35         time.sleep(1)
    36         print('Mate_2 hugs Mate_1 for congratulation')
    37 if __name__ == '__main__':
    38     m = Mate_1()
    39     n = Mate_2()
    40     m.start()
    41     n.start()

    上面的代码中,首先建立事件类实例,派生两个球员子线程,球员1会在开始后1秒收到足球,控球1秒后通过evt.set()唤醒另一个线程,模拟将球传出的过程,随后清除标志进入等待。此时球员2原本处于wait状态,收到set将内置Flag重置的信号后,被唤醒,表明自己接到球,控球1秒后再set传回,自己进入等待,球员1接到信号再次接球并得分后通知球员2,球员2则在收到进球信号后表示庆祝。

    运行得到结果

    Mate_1 is running  
    Mate_2 is running  
    Mate_1 got the ball  
    Mate_1 pass the ball to others  
    Mate_2 got the ball  
    Mate_2 pass the ball to others  
    Mate_1 got the ball again  
    Mate_1 makes a goal  
    Mate_2 hugs Mate_1 for congratulation 

    结果中可以看出,以 set 方法模拟传球,使两个线程间互相配合执行。

    函数延迟启动

    利用Timer类可以实现对函数的延时启动,以及在未启动前取消启动的操作

     1 from threading import Timer
     2 from time import ctime, sleep
     3 
     4 def func():
     5     print('Current time is', ctime(), 'Hello world')
     6 
     7 timex = Timer(3, func)
     8 print('Current time is', ctime())
     9 timex.start()
    10 timex.join()
    11 
    12 timex = Timer(3, func)
    13 print('Current time is', ctime())
    14 timex.start()
    15 sleep(1)
    16 timex.cancel()
    17 print('End')

    运行得到结果

    Current time is Thu Aug  3 16:22:19 2017  
    Current time is Thu Aug  3 16:22:22 2017 Hello world  
    Current time is Thu Aug  3 16:22:22 2017  
    End  

    从结果中可以看出,延时启动3秒起到了作用,而第二次的函数则在调用前被取消了

    6 设置线程障碍

     对于多线程来说,可以利用Barrier类实现对指定数量的线程进行阻碍,直到线程数量达到指定值后,同时释放所有的线程。

     1 from threading import Thread, Barrier
     2 import random
     3 import time
     4 COUNT = 0
     5 
     6 def action():
     7     print('ACTION')
     8 
     9 def foo():
    10     global COUNT
    11     time.sleep(random.randint(1, 7))
    12     print('Barrier_%d waiting' % COUNT)
    13     COUNT += 1
    14     barr.wait()
    15     print('Hello, world', COUNT)
    16 
    17 barr = Barrier(7, action=action, timeout=20)
    18 
    19 threads = []
    20 for i in range(7):
    21     threads.append(Thread(target=foo))
    22 for t in threads:
    23     t.start()
    24 for t in threads:
    25     t.join()
    26 
    27 print('Pass barrier')

    第 1-15 行,导入所需模块后,定义一个执行函数,用于越过障碍时执行,再定义一个线程函数,用于线程执行。线程执行函数会在进入后对全局变量+1,随后进入阻塞状态。

    第 16 行,设置障碍上限,且线程数量与障碍上限相同。若此处线程数量大于障碍数,则跨过障碍后,多出的两个障碍会一直阻塞。可考虑使用reset函数重置计数,达到循环。

    运行得到结果

    Barrier_0 waiting  
    Barrier_1 waiting  
    Barrier_2 waiting  
    Barrier_3 waiting  
    Barrier_4 waiting  
    Barrier_5 waiting  
    Barrier_6 waiting  
    ACTION  
    Hello, world 7  
    Hello, world 7  
    Hello, world 7  
    Hello, world 7  
    Hello, world 7  
    Hello, world 7  
    Hello, world 7  
    Pass barrier  

    从输出的结果可以看到,当障碍数量达到上限的时候,会运行执行函数,随后唤醒所有的线程。

    相关阅读


    1. 基本概念

    2. threading 模块

    3. 多线程的建立

    4. 锁与信号量

    参考链接


    《Python 核心编程 第3版》

  • 相关阅读:
    PostgreSQL数据库中的常见错误
    postgresql相关命令
    Linux系统查看公网IP地址
    TCP/IP TIME_WAIT状态原理
    TCP连接状态详解及TIME_WAIT过多的解决方法
    让你提升命令行效率的 Bash 快捷键 [完整版]
    linux 如何显示一个文件的某几行(中间几行)
    linux中内核的一个不错的参数somaxconn
    Linux crontab 实现每秒执行
    Linux tar This does not look like a tar archive
  • 原文地址:https://www.cnblogs.com/stacklike/p/8159230.html
Copyright © 2011-2022 走看看