zoukankan      html  css  js  c++  java
  • [Python 多线程] Barrier (十一)

    Barrier

    栅栏,也叫屏障。可以想象成路障、道闸。

    Python 3.2引入的新功能。

    构造方法:

    threading.Barrier(parties, action=None, timeout=None)

    构建Barrier对象,parties 指定参与方数目,timeout是wait方法未指定时超时的默认值。

    n_waiting    当前在栅栏中等待的线程数

    parties        通过栅栏所需的线程数

    wait(timeout=None) 等待通过栅栏,返回0到线程数-1的整数(barrier_id),每个线程返回不同。如果wait方法设置了超时,并超时发送,栅栏将处于broken状态。

    例1:

    #Barrier 栅栏
    import threading,logging
    logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
    
    def work(barrier:threading.Barrier):
        logging.info("n_waiting = {}".format(barrier.n_waiting))   # 等待的线程数
        bid = barrier.wait()   # 参与者的id,返回0到线程数减1的数值
        logging.info("after barrier {}".format(bid))  # 栅栏之后
    
    barrier = threading.Barrier(3) # 3个参与者,每3个开闸放行,0,1,2  4,5,6
    
    for x in range(1,4):  # 所有参数者个数,4,5,6,10,15
        threading.Event().wait(1)
        threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()
    
    运行结果:
    [-] Barrier-1 n_waiting = 0
    [-] Barrier-2 n_waiting = 1
    [-] Barrier-3 n_waiting = 2
    [-] Barrier-3 after barrier 2
    [-] Barrier-2 after barrier 1
    [-] Barrier-1 after barrier 0
    

      每一个进来就等待,不够3个就阻塞,直到够3个就开闸放行。

    Barrier实例的方法:

    broken  检测栅栏是否处于打破的状态,返回True或False

    abort()  将栅栏置于broken状态,等待中的线程或者调用等待方法的线程都会抛出threading.BrokenBarrieError异常,直到reset方法来恢复栅栏

    reset()  恢复栅栏,重新开始拦截

    例2:

    #Barrier 栅栏
    import threading,logging
    logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
    
    def work(barrier:threading.Barrier):
        logging.info("n_waiting = {}".format(barrier.n_waiting))
        try:
            bid = barrier.wait()
            logging.info("after barrier {}".format(bid))
        except threading.BrokenBarrierError:
            logging.info("Broken Barrier in {}".format(threading.current_thread()))
    
    barrier = threading.Barrier(3)
    
    for x in range(1,12): #12个
        if x == 3:
            barrier.abort() #有一个人坏了规矩
        elif x == 6:
            barrier.reset()
        threading.Event().wait(1)
        threading.Thread(target=work,args=(barrier,),name="Barrier-{}".format(x)).start()
    
    运行结果:
    [-] Barrier-1 n_waiting = 0 #0,1
    [-] Barrier-2 n_waiting = 1
    [-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 3124)>
    [-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 8036)>
    [-] Barrier-3 n_waiting = 0
    [-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 7428)>
    [-] Barrier-4 n_waiting = 0
    [-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1828)>
    [-] Barrier-5 n_waiting = 0
    [-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 7416)>
    [-] Barrier-6 n_waiting = 0 #6,7,8
    [-] Barrier-7 n_waiting = 1
    [-] Barrier-8 n_waiting = 2
    [-] Barrier-8 after barrier 2
    [-] Barrier-7 after barrier 1
    [-] Barrier-6 after barrier 0
    [-] Barrier-9 n_waiting = 0  #9,10,11
    [-] Barrier-10 n_waiting = 1
    [-] Barrier-11 n_waiting = 2
    [-] Barrier-11 after barrier 2
    [-] Barrier-9 after barrier 0
    [-] Barrier-10 after barrier 1
    

      一共有12个参与者,依次开始,1和2处于等待状态,到达第3的时候,进入了broken状态,则直到第6个,才恢复栅栏,从6开始继续拦截,达到3个(6,7,8)就放行,9,10,11也放行。

    例3:

    wait方法

    #Barrier 栅栏
    import threading,logging
    logging.basicConfig(level=logging.INFO,format="[-] %(threadName)s %(message)s")
    
    def work(barrier:threading.Barrier,i:int):
        logging.info("n_waiting = {}".format(barrier.n_waiting))
        try:
            if i < 3:
                bid = barrier.wait(1)  #超时1秒就将栅栏置于broken状态,抛出异常后续语句不会执行
            else:
                if i == 6:
                    barrier.reset() #恢复栅栏
                bid = barrier.wait()
            # logging.info("broken status = {}".format(barrier.broken))  #是否处于broken状态
            logging.info("after barrier {}".format(bid))
        except threading.BrokenBarrierError:
            logging.info("Broken Barrier in {}".format(threading.current_thread()))
    
    barrier = threading.Barrier(3)
    
    for i in range(1,11): #10个
        threading.Event().wait(2) #强制延迟2秒,让出时间片
        threading.Thread(target=work,args=(barrier,i),name="Barrier-{}".format(i)).start()
    
    运行结果:
    [-] Barrier-1 n_waiting = 0
    [-] Barrier-1 Broken Barrier in <Thread(Barrier-1, started 3100)>
    [-] Barrier-2 n_waiting = 0
    [-] Barrier-2 Broken Barrier in <Thread(Barrier-2, started 8836)>
    [-] Barrier-3 n_waiting = 0
    [-] Barrier-3 Broken Barrier in <Thread(Barrier-3, started 8428)>
    [-] Barrier-4 n_waiting = 0
    [-] Barrier-4 Broken Barrier in <Thread(Barrier-4, started 1204)>
    [-] Barrier-5 n_waiting = 0
    [-] Barrier-5 Broken Barrier in <Thread(Barrier-5, started 1556)>
    [-] Barrier-6 n_waiting = 0
    [-] Barrier-7 n_waiting = 1
    [-] Barrier-8 n_waiting = 2
    [-] Barrier-8 after barrier 2
    [-] Barrier-7 after barrier 1
    [-] Barrier-6 after barrier 0
    [-] Barrier-9 n_waiting = 0
    [-] Barrier-10 n_waiting = 1
    阻塞中
    

      wait方法在等待超时1秒后,就强制将栅栏置于broken状态,直到第6个的时候才reset恢复,然后6,7,8放行,9,10,继续阻塞。如果此时有第11个,就会9,10,11放行。

    应用场景:

    并发初始化

    所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据,检查,如果这些工作没完成就不能正常工作运行。

    10个线程做10种工作准备,每个线程负责一种工作,只有10个线程都完成后,才能继续工作,先完成的要等待后完成的线程。

    例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库链接失败,则初始化工作失败,就要abort,栅栏broken,所有线程收到异常退出。

    工作量

    有10个计算任务,完成6个,就算工作完成。

    PPTBYG

  • 相关阅读:
    LightOJ 1422 Halloween Costumes(区间dp)
    zoj 3537 Cake(区间dp)
    POJ 2955 Brackets(区间dp)
    HDU 1058 Humble Numbers(dp)
    uva 10934 Dropping water balloons(转载)
    树形dp
    Manacher算法求回文半径
    poj-1236.network of schools(强连通分量 + 图的入度出度)
    hdu-2255.奔小康赚大钱(最大权二分匹配)
    poj-2289.jamies contact groups(二分答案 + 二分多重匹配)
  • 原文地址:https://www.cnblogs.com/i-honey/p/8076214.html
Copyright © 2011-2022 走看看