zoukankan      html  css  js  c++  java
  • 并发编程---死锁||递归锁---信号量---Event事件---定时器

    死锁

    互斥锁:Lock(),互斥锁只能acquire一次

    递归锁:  RLock(),可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire

    # 死锁
    from threading import Thread,Lock
    import time
    
    mutexA = Lock()
    mutexB = Lock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            mutexB.release()
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(0.1)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexB.release()
            mutexA.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    '''
    打印结果:
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-2 拿到了A锁
    '''
    死锁
    #互斥锁只能acquire一次
    from threading import Thread,Lock
    mutexA = Lock()
    mutexA.acquire()
    mutexA.release()
    
    #递归锁:可以连续acquire多次,每acquier一次计数器就+1,只有计数为0时,才能被其他线程强到
    from threading import Thread,RLock
    import time
    
    mutexB = mutexA = RLock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            mutexB.release()
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexB.release()
            mutexA.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    '''
    打印结果:
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-1 拿到了A锁
    Thread-2 拿到了A锁
    Thread-2 拿到了B锁
    Thread-2 拿到了B锁
    Thread-2 拿到了A锁
    Thread-4 拿到了A锁
    Thread-4 拿到了B锁
    Thread-4 拿到了B锁
    Thread-4 拿到了A锁
    Thread-6 拿到了A锁
    Thread-6 拿到了B锁
    Thread-6 拿到了B锁
    Thread-6 拿到了A锁
    Thread-8 拿到了A锁
    Thread-8 拿到了B锁
    Thread-8 拿到了B锁
    Thread-8 拿到了A锁
    Thread-10 拿到了A锁
    Thread-10 拿到了B锁
    Thread-10 拿到了B锁
    Thread-10 拿到了A锁
    Thread-5 拿到了A锁
    Thread-5 拿到了B锁
    Thread-5 拿到了B锁
    Thread-5 拿到了A锁
    Thread-9 拿到了A锁
    Thread-9 拿到了B锁
    Thread-9 拿到了B锁
    Thread-9 拿到了A锁
    Thread-7 拿到了A锁
    Thread-7 拿到了B锁
    Thread-7 拿到了B锁
    Thread-7 拿到了A锁
    Thread-3 拿到了A锁
    Thread-3 拿到了B锁
    Thread-3 拿到了B锁
    Thread-3 拿到了A锁
    '''
    递归锁

    信号量

    信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行

    信号量同一时间可以有5个任务拿到锁去执行

    信号量:同一时间有多个线程在进行

    from threading import Thread,Semaphore,currentThread
    import time,random
    
    sm=Semaphore(3) #厕所的坑
    
    def task():
        # sm.acquire()
        # print('%s in' %currentThread.getName())
        # sm.release()
        with sm:
            print('%s in' %currentThread().getName())
            time.sleep(random.randint(2,3))
    
    if __name__ == '__main__':
        for i in range(10):
            t=Thread(target=task)
            t.start()
    '''
    打印结果:
    Thread-1 in
    Thread-2 in
    Thread-3 in
    
    Thread-4 in
    
    Thread-5 in
    Thread-6 in
    
    Thread-7 in
    
    Thread-9 in
    Thread-8 in
    Thread-10 in
    '''
    信号量

    Event事件

    event.isSet(): 返回event的状态值
    event.wait():如果 event.isSet()==False将阻塞线程
    event.set():  设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度
    event.clear():  恢复event的状态值为False
    from threading import Thread,Event
    import time
    
    event = Event()
    # event.wait() 在原地等,指导执行到event.set()
    # event.set() 等待结束
    
    def student(name):
        print('学生%s 正在听课' %name)
        event.wait(2)
        print('学生%s 课间活动' %name)
    
    def teacher(name):
        print('老师%s 正在授课' %name)
        time.sleep(5)
        event.set()
    
    if __name__ == '__main__':
        stu1 = Thread(target=student,args=('alex',))
        stu2 = Thread(target=student,args=('yang',))
        stu3 = Thread(target=student,args=('hang',))
        tea1 = Thread(target=teacher,args=('ding',))
    
        stu1.start()
        stu2.start()
        stu3.start()
        tea1.start()
    '''
    打印结果:
    学生alex 正在听课
    学生yang 正在听课
    学生hang 正在听课
    老师ding 正在授课
    学生hang 课间活动
    学生yang 课间活动
    学生alex 课间活动
    '''
    
    
    #设置连接的超时时间
    from threading import Thread,Event,currentThread
    import time
    
    event = Event()
    
    def conn():
        n=0
        while not event.is_set():
            if n == 3:
                print('%s try too many' %currentThread().getName())
                return
            print('%s try %s' %(currentThread().getName(),n))
            event.wait(0.5)
            n+=1
    
    
        print('%s is connecting' %currentThread().getName())
    
    def check():
        print('%s is checking' %currentThread().getName())
        time.sleep(5)
        event.set()
    
    if __name__ == '__main__':
        for i in range(3):
            t=Thread(target=conn)
            t.start()
        t=Thread(target=check)
        t.start()
    '''
    打印结果:
    Thread-1 try 0
    Thread-2 try 0
    Thread-3 try 0
    Thread-4 is checking
    Thread-1 try 1
    Thread-3 try 1
    Thread-2 try 1
    Thread-1 try 2
    Thread-3 try 2
    Thread-2 try 2
    Thread-1 try too many
    Thread-3 try too many
    Thread-2 try too many
    '''
    Event

    定时器

    定时器:隔一段时间,执行一个任务,每启动一个定时器,就等于启动一个线程

    t=Timer(5,task,args=('egon',))

    t.start()

    t.cancel()

    # from  threading import Timer
    #
    # def task(name):
    #     print('hello %s' %name)
    #
    # t=Timer(5,task,args=('egon',))
    # t.start()
    
    #验证码
    from threading import Timer
    import random
    
    class Code:
        def __init__(self):
            self.make_cache()
    
        def make_cache(self,interval=60):
            self.cache = self.make_code()
            print(self.cache)
            self.t = Timer(interval,self.make_cache)
            self.t.start()
    
        def make_code(self,n=4):
            res=''
            for i in range(n):
                s1 = str(random.randint(0,9))
                s2 = chr(random.randint(65,90))
                res+=random.choice([s1,s2])
            return res
    
        def check(self):
            while True:
                code = input('输入你的验证码>>: ').strip()
                if code.upper() == self.cache:
                    print('验证码输入正确')
                    self.t.cancel()
                    break
    
    obj=Code()
    obj.check()
    
    '''
    打印结果:
    X095
    输入你的验证码>>: X095
    验证码输入正确
    '''
    定时器
  • 相关阅读:
    开源项目
    测试面试话题8:测试人员如何让开发少写bug?
    其他
    接口平台
    001接口概念
    python3PIL模块实现图片加文字/小图片水印
    python3实现url编码/解码
    python3实现读取Excel进行接口自动化测试
    常用正则表达式
    Python3实现简单的接口性能测试
  • 原文地址:https://www.cnblogs.com/Mryang123/p/8910367.html
Copyright © 2011-2022 走看看