zoukankan      html  css  js  c++  java
  • 铁乐学python_Day42_线程-信号量事件条件

    铁乐学python_Day42_线程-信号量事件条件

    线程中的信号量

    同进程的一样,Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
    from threading import Thread,Semaphore
    import threading
    import time
    
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    
    if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(23):
            t=Thread(target=func)
            t.start()
    

    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,
    而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程。

    线程中的事件

    同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。
    如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。
    为了解决这些问题,我们需要使用threading库中的Event对象。
    对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
    在初始情况下,Event对象中的信号标志被设置为假。
    如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
    一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
    如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

    event.isSet():返回event的状态值;
    event.wait():如果 event.isSet()==False将阻塞线程;
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。

    例如,有多个工作线程尝试链接MySQL,要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,
    如果连接不成功,都会去尝试重新连接。可以采用threading.Event机制来协调各个工作线程的连接操作。

    例:模拟连接mysql数据库
    import threading
    import time,random
    from threading import Thread,Event
    
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise TimeoutError('链接超时')
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(0.5)
            count+=1
        print('<%s>链接成功' %threading.current_thread().getName())
    
    def check_mysql():
        print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
        time.sleep(random.randint(2,4))
        event.set()
    
    if __name__ == '__main__':
        event=Event()
        conn1=Thread(target=conn_mysql)
        conn2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()
    
    例:模拟连接数据库2
    import time
    import random
    from threading import Event, Thread
    
    # 模拟连接数据库
    def connect_db(e):
        count = 1
        while count < 4:
            print('尝试第%s次检测连接' % count)
            e.wait(0.5)
            # 如果不传参数会一直等到事件为True为止
            # 如果传参数 传一个时间参数
            count += 1
            if e.is_set():
                print('连接成功')
                break
        else:
            print('连接失败')
    
    
    def check_conn(e):
        '''检测数据库是否可以连接'''
        time.sleep(random.randint(1, 2))
        e.set()
    
    e = Event()
    Thread(target=check_conn, args=(e,)).start()
    Thread(target=connect_db, args=(e,)).start()
    
    # 当你要做一件事情是有前提(前置条件)的时候
    # 你就先去处理前提(前置条件)的问题 —— 前提处理好了,把状态设置成True
    # 来控制即将要做的事情可以开始了。
    
    运行效果如下:
    尝试第1次检测连接
    尝试第2次检测连接
    连接成功
    

    条件(Condition)

    使线程等待,当满足某条件时,才释放n个线程。

    Python提供的Condition对象提供了对复杂线程同步问题的支持。
    Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。
    线程首先acquire一个条件变量,然后判断一些条件。
    如果条件不满足则wait;
    如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,
    其他处于wait状态的线程接到通知后会重新判断条件。
    不断的重复这一过程,从而解决复杂的同步问题。

    例:线程设置条件
    import threading
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
            con.release()
            print('****')
    
    运行如下:
    >>>1
    ****
    >>>run the thread: 0
    2
    ****
    >>>run the thread: 1
    run the thread: 2
    q
    
    例2:通过条件变量控制线程分批执行
    from threading import Condition, Thread
    
    def func(i, con):
        con.acquire()
        con.wait()
        print(i * '*')
        con.release()
    
    con = Condition()
    # 定义了范围从1到9
    for i in range(1, 10):
        Thread(target=func, args=(i, con)).start()
    while True:
        # 分批控制线程执行
        n = int(input('>>>'))
        if n == 0:
            break
        con.acquire()
        con.notify(n)
        con.release()
    
    运行效果如下:
    >>>1
    >>>*
    2
    >>>**
    ***
    3
    >>>******
    ****
    *****
    4
    >>>*******
    ********
    *********
    0
    
    以上,输入1则运行一次任务,1+2+3+4 10次己超出任务次数,完成后不会重复执行
    
    例:那朵花躲迷藏
    #!/usr/bin/env python
    # _*_ coding: utf-8 _*_
    # ____ Condition 条件变量
    # ____ 模拟那朵花中的面码捉迷藏对话
    
    import threading, time
    
    # 将躲迷藏中搜寻的角色这个类创造出来,且继承了线程Thread这个类
    class Seeker(threading.Thread):
    
        def __init__(self, cond, name):
            super(Seeker, self).__init__()
            self.cond = cond
            self.name = name
    
        def run(self):
            # 睡一秒,让躲猫猫的面码也运行起来,不然会阻塞住
            time.sleep(1)
            self.cond.acquire()
            print(self.name + ': (藏)好了吗?')
            self.cond.notify()
            self.cond.wait()
            print(self.name + ': (藏)好了吗?~~')
            self.cond.notify()
            self.cond.wait()
            print(self.name + ': 看到你了!面码!')
            self.cond.notify()
            self.cond.wait()
            self.cond.release()
            print(self.name + ": 谢谢你,面码~ ")
    
    # 再来是躲迷藏的面码
    class Hider(threading.Thread):
    
        def __init__(self, cond, name):
            super(Hider, self).__init__()
            self.cond = cond
            self.name = name
    
        def run(self):
            self.cond.acquire()
            self.cond.wait()
            # 释放对锁的占用,同时线程挂起来在这里,直到被notify
            print(self.name + ": 还没好哦~")
            self.cond.notify()
            self.cond.wait()
            print(self.name + ": (藏)好了哦~")
            self.cond.notify()
            self.cond.wait()
            self.cond.notify()
            self.cond.release()
            print(self.name + ": 阿,被看到了~")
    
    cond = threading.Condition()
    seeker = Seeker(cond, "仁太")
    hider = Hider(cond, "面码")
    seeker.start()
    hider.start()
    
    运行效果:交替进行的对话
    
    仁太: (藏)好了吗?
    面码: 还没好哦~
    仁太: (藏)好了吗?~~
    面码: (藏)好了哦~
    仁太: 看到你了!面码!
    面码: 阿,被看到了~
    仁太: 谢谢你,面码~ 
    

    无限循环的例子

    经典的生产者与消费者问题:假设有一群生产者(Producer)和一群消费者(Consumer)通过一个市场来交互产品。
    生产者的”策略“是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上;
    而消费者的”策略“是如果市场上剩余产品的数量多余100个,那么就消费3个产品。
    用Condition解决生产者与消费者问题的代码如下:

    #!/usr/bin/env python
    # _*_ coding: utf-8 _*_
    # ____消费者与生产者模型,多线程,条件变量,无限循环
    
    import threading
    import time
    
    # 生产者
    class Producer(threading.Thread):
    
        def run(self):
            global count
            while True:
                if con.acquire():
                    # 当产品在市场超过1000个时开始等候
                    if count > 1000:
                        con.wait()
                    # 少于1000个则开始生产100个产品投入市场
                    else:
                        count = count + 100
                        msg = self.name + ' produce 100, count=' + str(count)
                        print(msg)
                        con.notify()
                    con.release()
                    time.sleep(1)
    
    
    # 消费者
    class Consumer(threading.Thread):
    
        def run(self):
            global count
            while True:
                # 当市场少于200个产品时,消费者等候
                if con.acquire():
                    if count < 200:
                        con.wait()
                    # 否则消费
                    else:
                        count = count - 30
                        msg = self.name + ' consume 30, count=' + str(count)
                        print(msg)
                        con.notify()
                    con.release()
                    time.sleep(1)
    
    # 初始产品为100个
    count = 100
    con = threading.Condition()
    
    
    def main():
        # 两个生产者
        for i in range(2):
            p = Producer()
            p.start()
        # 五个消费者
        for i in range(5):
            c = Consumer()
            c.start()
    
    
    if __name__ == '__main__':
        main()
    
    无限循环的部分运行效果:
    Thread-1 produce 100, count=200
    Thread-2 produce 100, count=300
    Thread-3 consume 30, count=270
    Thread-4 consume 30, count=240
    Thread-5 consume 30, count=210
    Thread-6 consume 30, count=180
    Thread-1 produce 100, count=280
    Thread-3 consume 30, count=250
    Thread-5 consume 30, count=220
    Thread-2 produce 100, count=320
    Thread-4 consume 30, count=290
    Thread-6 consume 30, count=260
    Thread-7 consume 30, count=230
    Thread-1 produce 100, count=330
    Thread-4 consume 30, count=300
    Thread-2 produce 100, count=400
    Thread-5 consume 30, count=370
    Thread-3 consume 30, count=340
    Thread-6 consume 30, count=310
    Thread-7 consume 30, count=280
    Thread-1 produce 100, count=380
    Thread-5 consume 30, count=350
    Thread-3 consume 30, count=320
    Thread-4 consume 30, count=290
    Thread-2 produce 100, count=390
    Thread-6 consume 30, count=360
    Thread-7 consume 30, count=330
    Thread-1 produce 100, count=430
    Thread-3 consume 30, count=400
    Thread-2 produce 100, count=500
    

    浅析区别

    • 信号量 semaphore 允许统一时刻n个线程执行这段代码
    • 事件 event 有一个内部的事件来控制wait的行为且控制的是所有的线程
    • 条件 condition 有一个内部的条件来控制wait的行为,可以逐个或者分批次的控制线程的走向

    end
    参考:http://www.cnblogs.com/Eva-J/articles/8306047.html

  • 相关阅读:
    tile38 复制配置
    The Guardian’s Migration from MongoDB to PostgreSQL on Amazon RDS
    tile38 一款开源的geo 数据库
    sqler sql 转rest api 的docker 镜像构建(续)使用源码编译
    sqler sql 转rest api javascript 试用
    sqler sql 转rest api redis 接口使用
    sqler sql 转rest api 的docker image
    sqler sql 转rest api 的工具试用
    apache geode 试用
    benthos v1 的一些新功能
  • 原文地址:https://www.cnblogs.com/tielemao/p/9078285.html
Copyright © 2011-2022 走看看