zoukankan      html  css  js  c++  java
  • python并发编程之多线程2死锁与递归锁,信号量等

    一、死锁现象与递归锁

    进程也是有死锁的

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,

    这些永远在互相等待的进程称为死锁进程

    如下就是死锁

    死锁-------------------
    from  threading import Thread,Lock,RLock
    import time
    mutexA = Lock()
    mutexB = Lock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('33[33m%s 拿到A锁 '%self.name)
            mutexB.acquire()
            print('33[45%s 拿到B锁 '%self.name)
            mutexB.release()
            mutexA.release()
        def f2(self):
            mutexB.acquire()
            print('33[33%s 拿到B锁 ' % self.name)
            time.sleep(1)  #睡一秒就是为了保证A锁已经被别人那到了
            mutexA.acquire()
            print('33[45m%s 拿到B锁 ' % self.name)
            mutexA.release()
            mutexB.release()
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start() #一开启就会去调用run方法
    
    死锁现象
    

      

    那么怎么解决死锁现象呢?

    解决方法,递归锁:在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

    直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

    mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,
    则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    # 2.解决死锁的方法--------------递归锁
    from  threading import Thread,Lock,RLock
    import time
    mutexB = mutexA = RLock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('33[33m%s 拿到A锁 '%self.name)
            mutexB.acquire()
            print('33[45%s 拿到B锁 '%self.name)
            mutexB.release()
            mutexA.release()
        def f2(self):
            mutexB.acquire()
            print('33[33%s 拿到B锁 ' % self.name)
            time.sleep(1)  #睡一秒就是为了保证A锁已经被别人拿到了
            mutexA.acquire()
            print('33[45m%s 拿到B锁 ' % self.name)
            mutexA.release()
            mutexB.release()
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start() #一开启就会去调用run方法
    
    解决死锁
    

      

    二、信号量Semaphore(其实也是一把锁)

    Semaphore管理一个内置的计数器

    Semaphore与进程池看起来类似,但是是完全不同的概念。

    进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。

    信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁

    from threading import Thread,Semaphore,currentThread
    import time,random
    sm = Semaphore(5) #运行的时候有5个人
    def task():
        sm.acquire()
        print('33[42m %s去洗手间'%currentThread().getName())
        time.sleep(random.randint(1,3))
        print('33[31m %s上完厕所走了'%currentThread().getName())
        sm.release()
    if __name__ == '__main__':
        for i in range(20):  #开了10个线程 ,这20人都要去洗手间
            t = Thread(target=task)
            t.start()
    
    Semaphore举例
    
    hread-1去洗手间
     Thread-2去洗手间
     Thread-3去洗手间
     Thread-4去洗手间
     Thread-5去洗手间
     Thread-3去完洗手间走了
     Thread-6去洗手间
     Thread-1去完洗手间走了
     Thread-7去洗手间
     Thread-2去完洗手间走了
     Thread-8去洗手间
     Thread-6去完洗手间走了
     Thread-5去完洗手间走了
     Thread-4去完洗手间走了
     Thread-9去洗手间
     Thread-10去洗手间
     Thread-11去洗手间
     Thread-9去完洗手间走了
     Thread-12去洗手间
     Thread-7去完洗手间走了
     Thread-13去洗手间
     Thread-10去完洗手间走了
     Thread-8去完洗手间走了
     Thread-14去洗手间
     Thread-15去洗手间
     Thread-12去完洗手间走了
     Thread-11去完洗手间走了
     Thread-16去洗手间
     Thread-17去洗手间
     Thread-14去完洗手间走了
     Thread-15去完洗手间走了
     Thread-17去完洗手间走了
     Thread-18去洗手间
     Thread-19去洗手间
     Thread-20去洗手间
     Thread-13去完洗手间走了
     Thread-20去完洗手间走了
     Thread-16去完洗手间走了
     Thread-18去完洗手间走了
     Thread-19去完洗手间走了
    
    运行结果
    

    三、Event

    如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    from threading import Event
    Event.isSet() #返回event的状态值
    Event.wait() #如果 event.isSet()==False将阻塞线程;
    Event.set() #设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    Event.clear() #恢复
    

    例如1.,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

    #首先定义两个函数,一个是连接数据库
    # 一个是检测数据库
    from threading import Thread,Event,currentThread
    import time
    e = Event()
    def conn_mysql():
        '''链接数据库'''
        count = 1
        while not e.is_set():  #当没有检测到时候
            if count >3: #如果尝试次数大于3,就主动抛异常
                raise ConnectionError('尝试链接的次数过多')
            print('33[45m%s 第%s次尝试'%(currentThread(),count))
            e.wait(timeout=1) #等待检测(里面的参数是超时1秒)
            count+=1
        print('33[44m%s 开始链接...'%(currentThread().getName()))
    def check_mysql():
        '''检测数据库'''
        print('33[42m%s 检测mysql...' % (currentThread().getName()))
        time.sleep(5)
        e.set()
    if __name__ == '__main__':
        for i  in range(3):  #三个去链接
            t = Thread(target=conn_mysql)
            t.start()
        t = Thread(target=check_mysql)
        t.start()
    
    详看

    2.例如2,红绿灯的例子

    from  threading import Thread,Event,currentThread
    import time
    e = Event()
    def traffic_lights():
        '''红绿灯'''
        time.sleep(5)
        e.set()
    def car():
        '''车'''
        print('33[42m %s 等绿灯33[0m'%currentThread().getName())
        e.wait()
        print('33[44m %s 车开始通行' % currentThread().getName())
    if __name__ == '__main__':
        for i in range(10):
            t = Thread(target=car)  #10辆车
            t.start()
        traffic_thread = Thread(target=traffic_lights)  #一个红绿灯
        traffic_thread.start()
    
    红绿灯

    四、定时器(Timer)

    指定n秒后执行某操作

    from threading import Timer
    def func(n):
        print('hello,world',n)
    t = Timer(3,func,args=(123,))  #等待三秒后执行func函数,因为func函数有参数,那就再传一个参数进去
    t.start()

    五、线程queue

    queue队列 :使用import queue,用法与进程Queue一样

    queue.Queue(maxsize=0) #先进先出

    # 1.队列-----------
    import queue
    q = queue.Queue(3) #先进先出
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())
    print(q.get())
    print(q.get())

    queue.LifoQueue(maxsize=0)#先进后出

    # 2.堆栈----------
    q = queue.LifoQueue() #先进后出(或者后进先出)
    q.put('first')
    q.put('second')
    q.put('third')
    q.put('for')
    print(q.get())
    print(q.get())
    print(q.get())

    queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    # ----------------
    '''3.put进入一个元组,元组的第一个元素是优先级
    (通常也可以是数字,或者也可以是非数字之间的比较)
    数字越小,优先级越高'''
    q = queue.PriorityQueue()
    q.put((20,'a'))
    q.put((10,'b'))  #先出来的是b,数字越小优先级越高嘛
    q.put((30,'c'))
    print(q.get())
    print(q.get())
    print(q.get())
    

      

    六、多线程性能测试

    1.多核也就是多个CPU
    (1)cpu越多,提高的是计算的性能
    (2)如果程序是IO操作的时候(多核和单核是一样的),再多的cpu也没有意义。
    2.实现并发
    第一种:一个进程下,开多个线程
    第二种:开多个进程
    3.多进程:
       优点:可以利用多核
       缺点:开销大
    4.多线程
       优点:开销小
       缺点:不可以利用多核
    5多进程和多进程的应用场景
       1.计算密集型:也就是计算多,IO少
         如果是计算密集型,就用多进程(如金融分析等)
       2.IO密集型:也就是IO多,计算少
         如果是IO密集型的,就用多线程(一般遇到的都是IO密集型的)
    

      

    下例子练习:
    # 计算密集型的要开启多进程
    from  multiprocessing import Process
    from threading import Thread
    import time
    def work():
        res = 0
        for i in range(10000000):
            res+=i
    if __name__ == '__main__':
        l = []
        start = time.time()
        for i in range(4):
            p = Process(target=work)  #1.9371106624603271  #可以利用多核(也就是多个cpu)
            # p  = Thread(target=work)  #3.0401737689971924
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop = time.time()
        print('%s'%(stop-start))
    
    计算密集型
    
    # I/O密集型要开启多线程
    from multiprocessing import Process
    from threading import Thread
    import time
    def work():
        time.sleep(3)
    if __name__ == '__main__':
        l = []
        start = time.time()
        for i in range(400):
            # p = Process(target=work)  #34.9549994468689   #因为开了好多进程,它的开销大,花费的时间也就长了
            p = Thread(target=work) #2.2151265144348145  #当开了多个线程的时候,它的开销小,花费的时间也小了
            l.append(p)
            p.start()
        for i in l :
            i.join()
        stop = time.time()
        print('%s'%(stop-start))
    
    I/O密集型
  • 相关阅读:
    使用NBU进行oracle异机恢复
    mycat偶尔会出现JVM报错double free or corruption并崩溃退出
    exp导出数据时丢表
    service_names配置不正确,导致dg创建失败
    XML概念定义以及如何定义xml文件编写约束条件java解析xml DTD XML Schema JAXP java xml解析 dom4j 解析 xpath dom sax
    HTTP协议简介详解 HTTP协议发展 原理 请求方法 响应状态码 请求头 请求首部 java模拟浏览器客户端服务端
    java集合框架容器 java框架层级 继承图结构 集合框架的抽象类 集合框架主要实现类
    【JAVA集合框架一 】java集合框架官方介绍 Collections Framework Overview 集合框架总览 翻译 javase8 集合官方文档中文版
    java内部类深入详解 内部类的分类 特点 定义方式 使用
    再谈包访问权限 子类为何不能使用父类protected方法
  • 原文地址:https://www.cnblogs.com/morgana/p/8491279.html
Copyright © 2011-2022 走看看