zoukankan      html  css  js  c++  java
  • [Python 多线程] Semaphore、BounedeSemaphore (十二)

     Semaphore

    信号量,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。

    方法:

    Semaphore(value=1)                            构造方法。value小于0,抛ValueError异常。默认为1。

    acquire(blocking=True,timeout=None)  获取信号量,计数器减1,获取成功返回True。

    release()                                               释放信号量,计数器加1。

    计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。

     举例:

    图书馆有三本书,三本都被借走(acquire)之后,其他人想看,就得等别人还回来(阻塞),有人还回来(release)一本后,就有一个人可以拿到这本书,其他人仍然得等归还。

    #Semaphore 信号量,借还
    import threading,logging,time
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    def work(s:threading.Semaphore):
        logging.info('in sub thread')
        logging.info(s.acquire())
        logging.info('sub thread oevr')
    
    s = threading.Semaphore(3)
    logging.info(s.acquire())
    logging.info(s.acquire())
    logging.info(s.acquire())
    
    threading.Thread(target=work,args=(s,)).start()
    time.sleep(2)
    
    logging.info(s.acquire(False))  #不阻塞
    logging.info((s.acquire(timeout=3))) #3秒超时会阻塞
    
    logging.info('release')
    s.release()
    
    
    运行结果:
    [08:48:43]	 [MainThread,8840] True
    [08:48:43]	 [MainThread,8840] True
    [08:48:43]	 [MainThread,8840] True
    [08:48:43]	 [Thread-1,6212] in sub thread
    [08:48:45]	 [MainThread,8840] False
    [08:48:48]	 [MainThread,8840] False
    [08:48:48]	 [MainThread,8840] release
    [08:48:48]	 [Thread-1,6212] True
    [08:48:48]	 [Thread-1,6212] sub thread oevr
    

      这个例子只起了一个线程,如果多起几个,当release还回来的数小于阻塞的线程数时,程序就会一直处于阻塞状态,直到全部relase。

     应用举例:

    因为资源有限,且开启一个连接成本高,所以,使用连接池。

    一个简单的连接池(例子):

    连接池应该有容量(value总数),也应该工厂方法可以获取连接,能够把不用的连接归还,供其他使用者使用。

    #一个简单的连接池
    import threading,logging,time
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    
    class Conn:
        def __init__(self,name):
            self.name = name
    
    class Pool:
        def __init__(self,count=3):
            self.count = count
            #连接池容器
            self.pool = [self._connect('conn-{}'.format(x)) for x in range(self.count)]
    
    
        def _connect(self,conn_name):
            return Conn(conn_name)
    
        def get_conn(self):
            # if len(self.pool) > 0:
            return self.pool.pop() #从尾部弹出一个
    
        def return_conn(self,conn:Conn):
            self.pool.append(conn)
    
    pool = Pool(3)
    print(pool.pool)
    pool.get_conn()
    pool.get_conn()
    pool.get_conn()
    pool.get_conn() #第4个
    
    print('End Main')
    
    运行结果:
    [<__main__.Conn object at 0x00000211BBEBC160>, <__main__.Conn object at 0x00000211BBEBC1D0>, <__main__.Conn object at 0x00000211BBEBC240>]
    Traceback (most recent call last):
      File "C:/python/test.py", line 34, in <module>
        pool.get_conn()
      File "C:/python/test.py", line 24, in get_conn
        return self.pool.pop() #从尾部弹出一个
    IndexError: pop from empty list
    

      当连接池中已经没有可用连接时,再获取就会抛异常 IndexError:pop from empty list。

    那就加个判断,只在池中连接数量大于0的时候才可以获取连接:

    #修改get_conn函数
        def get_conn(self):
            if len(self.pool) > 0:
                return self.pool.pop() #从尾部弹出一个
    

      这样在连接池为空时,就不会抛异常了。

    这个连接池的例子如果使用多线程,这个get_conn()方法是线程不安全的,有可能其它线程看到池中还有一个连接,正准备获取,其它线程也看到了,也准备获取连接,就会抛异常。再或者,都在向池中加连接的时候,也可能会多加。

    这个问题可以用锁Lock来解决, 在获取连接和加连接时,加锁解锁;也可以使用semaphore信号量来解决。

    使用信号量对上例进行修改:

    #使用semaphore信号量修改连接池
    import threading,logging,time,random
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    
    class Conn:
        def __init__(self,name):
            self.name = name
    
        def __repr__(self):
            return self.name
    
    class Pool:
        def __init__(self,count=3):
            self.count = count
            #连接池容器
            self.pool = [self._connect('conn-{}'.format(x)) for x in range(self.count)]
            self.semaphore = threading.Semaphore(self.count)
    
    
        def _connect(self,conn_name):
            #返回一个连接名
            return Conn(conn_name)
    
        def get_conn(self):
            #从池中拿走一个连接
            # if len(self.pool) > 0:
            self.semaphore.acquire(timeout=5) #-1,获取连接,最大5秒超时时间,与后面随机秒数相对应
            data = self.pool.pop() #从尾部弹出一个
            return data
    
        def return_conn(self,conn:Conn):
            #向池中添加一个连接
            self.pool.append(conn)
            self.semaphore.release()  # 先加入池中再信号量+1
            return len(self.pool)
    
    
    pool = Pool(3)
    
    def worker(pool:Pool):
        conn = pool.get_conn()
        logging.info(conn)
        #模拟使用了资源一段时间(随机1-4秒),然后归还
        threading.Event().wait(timeout=random.randint(1,4))
        pool.return_conn(conn)
    
    for i in range(6):
        threading.Thread(target=worker,name="worker-{}".format(i),args=(pool,)).start()
    
    print('End Main')
    
    运行结果:
    [10:34:12]	 [worker-0,5264] conn-2
    [10:34:12]	 [worker-1,7420] conn-1
    [10:34:12]	 [worker-2,2612] conn-0
    End Main
    [10:34:13]	 [worker-3,3972] conn-1 #归还以后又可以获取连接
    [10:34:14]	 [worker-4,8172] conn-2
    [10:34:15]	 [worker-5,11192] conn-1
    

      上例中模拟获取连接以后使用了1-4秒钟,没有拿到资源的最多阻塞5秒钟,当连接使用结束归还后,阻塞的线程就又重新获取到连接。

    问题:

    1) 没有使用信号量就release的情况:

    import threading
    
    s = threading.Semaphore(3)
    print(s.__dict__)
    
    
    
    
    def work(s:threading.Semaphore):
        s.release()
    
    for i in range(3):
        threading.Thread(target=work,args=(s,)).start()
        print(s.__dict__)
    
    运行结果:
    {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 3}
    {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 2}
    {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 3}
    {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 4}
    {'_cond': <Condition(<unlocked _thread.lock object at 0x00000219202973A0>, 0)>, '_value': 5}
    

      没有acquire信号量时,就release的情况,结果导致了信号量的内置倒计数器的值增加,这样就超出了最大值。

    解决办法:

    使用BoundedSemaphore类:

    BoundedSemaphore,继承自Semaphore类。边界绑定,有界的信号量,不允许使用release超过初始值的范围,否则,抛ValueError异常。

    #BoundedSemaphore边界绑定
    import threading
    
    s = threading.BoundedSemaphore(3)
    print(s.__dict__)
    
    s.acquire()
    print(s.__dict__)
    
    def work(s:threading.BoundedSemaphore):
        s.release()
    
    for i in range(3):
        threading.Thread(target=work,args=(s,)).start()
        print(s.__dict__)
    
    运行结果:
    {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3}
    {'_value': 2, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3}
    {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3}
    {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3}
    {'_value': 3, '_cond': <Condition(<unlocked _thread.lock object at 0x000001A42DDF73A0>, 0)>, '_initial_value': 3}
    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "C:/python/test.py", line 11, in work
        s.release()
    ValueError: Semaphore released too many times
    

      使用BoundedSemaphore就可以控制不会多归还。

  • 相关阅读:
    VS2012 窗口布局, update1 和 英语语言包 离线安装方法
    C/C++ 笔记
    MFC学习笔记
    理解虚基类、虚函数与纯虚函数的概念
    ffmpeg使用
    gif制作 & word2007插入gif
    WIN8电脑开机怎么进入安全模式啊?
    vs2012 win8 64 visual assistX 中文 ??
    21 获取文件大小的方法
    20 线程中添加超时的方法
  • 原文地址:https://www.cnblogs.com/i-honey/p/8078518.html
Copyright © 2011-2022 走看看