zoukankan      html  css  js  c++  java
  • python并发编程之多线程2------------死锁与递归锁,信号量等

    一、死锁现象与递归锁

    进程也是有死锁的

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,

    它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

    如下就是死锁

     1 死锁-------------------
     2 from  threading import Thread,Lock,RLock
     3 import time
     4 mutexA = Lock()
     5 mutexB = Lock()
     6 class MyThread(Thread):
     7     def run(self):
     8         self.f1()
     9         self.f2()
    10     def f1(self):
    11         mutexA.acquire()
    12         print('33[33m%s 拿到A锁 '%self.name)
    13         mutexB.acquire()
    14         print('33[45%s 拿到B锁 '%self.name)
    15         mutexB.release()
    16         mutexA.release()
    17     def f2(self):
    18         mutexB.acquire()
    19         print('33[33%s 拿到B锁 ' % self.name)
    20         time.sleep(1)  #睡一秒就是为了保证A锁已经被别人那到了
    21         mutexA.acquire()
    22         print('33[45m%s 拿到B锁 ' % self.name)
    23         mutexA.release()
    24         mutexB.release()
    25 if __name__ == '__main__':
    26     for i in range(10):
    27         t = MyThread()
    28         t.start() #一开启就会去调用run方法
    死锁现象

    那么怎么解决死锁现象呢?

    解决方法,递归锁:在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

    直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

    mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,
    则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
     1 # 2.解决死锁的方法--------------递归锁
     2 from  threading import Thread,Lock,RLock
     3 import time
     4 mutexB = mutexA = RLock()
     5 class MyThread(Thread):
     6     def run(self):
     7         self.f1()
     8         self.f2()
     9     def f1(self):
    10         mutexA.acquire()
    11         print('33[33m%s 拿到A锁 '%self.name)
    12         mutexB.acquire()
    13         print('33[45%s 拿到B锁 '%self.name)
    14         mutexB.release()
    15         mutexA.release()
    16     def f2(self):
    17         mutexB.acquire()
    18         print('33[33%s 拿到B锁 ' % self.name)
    19         time.sleep(1)  #睡一秒就是为了保证A锁已经被别人拿到了
    20         mutexA.acquire()
    21         print('33[45m%s 拿到B锁 ' % self.name)
    22         mutexA.release()
    23         mutexB.release()
    24 if __name__ == '__main__':
    25     for i in range(10):
    26         t = MyThread()
    27         t.start() #一开启就会去调用run方法
    解决死锁

    二、信号量Semaphore(其实也是一把锁)

    Semaphore管理一个内置的计数器

    Semaphore与进程池看起来类似,但是是完全不同的概念。

    进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。

    信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁

     1 from threading import Thread,Semaphore,currentThread
     2 import time,random
     3 sm = Semaphore(5) #运行的时候有5个人
     4 def task():
     5     sm.acquire()
     6     print('33[42m %s上厕所'%currentThread().getName())
     7     time.sleep(random.randint(1,3))
     8     print('33[31m %s上完厕所走了'%currentThread().getName())
     9     sm.release()
    10 if __name__ == '__main__':
    11     for i in range(20):  #开了10个线程 ,这20人都要上厕所
    12         t = Thread(target=task)
    13         t.start()
    Semaphore举例
     1 hread-1上厕所
     2  Thread-2上厕所
     3  Thread-3上厕所
     4  Thread-4上厕所
     5  Thread-5上厕所
     6  Thread-3上完厕所走了
     7  Thread-6上厕所
     8  Thread-1上完厕所走了
     9  Thread-7上厕所
    10  Thread-2上完厕所走了
    11  Thread-8上厕所
    12  Thread-6上完厕所走了
    13  Thread-5上完厕所走了
    14  Thread-4上完厕所走了
    15  Thread-9上厕所
    16  Thread-10上厕所
    17  Thread-11上厕所
    18  Thread-9上完厕所走了
    19  Thread-12上厕所
    20  Thread-7上完厕所走了
    21  Thread-13上厕所
    22  Thread-10上完厕所走了
    23  Thread-8上完厕所走了
    24  Thread-14上厕所
    25  Thread-15上厕所
    26  Thread-12上完厕所走了
    27  Thread-11上完厕所走了
    28  Thread-16上厕所
    29  Thread-17上厕所
    30  Thread-14上完厕所走了
    31  Thread-15上完厕所走了
    32  Thread-17上完厕所走了
    33  Thread-18上厕所
    34  Thread-19上厕所
    35  Thread-20上厕所
    36  Thread-13上完厕所走了
    37  Thread-20上完厕所走了
    38  Thread-16上完厕所走了
    39  Thread-18上完厕所走了
    40  Thread-19上完厕所走了
    运行结果

    三、Event

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    from threading import Event
    Event.isSet() #返回event的状态值
    Event.wait() #如果 event.isSet()==False将阻塞线程;
    Event.set() #设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    Event.clear() #恢复
    

    例如1.,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

     1 #首先定义两个函数,一个是连接数据库
     2 # 一个是检测数据库
     3 from threading import Thread,Event,currentThread
     4 import time
     5 e = Event()
     6 def conn_mysql():
     7     '''链接数据库'''
     8     count = 1
     9     while not e.is_set():  #当没有检测到时候
    10         if count >3: #如果尝试次数大于3,就主动抛异常
    11             raise ConnectionError('尝试链接的次数过多')
    12         print('33[45m%s 第%s次尝试'%(currentThread(),count))
    13         e.wait(timeout=1) #等待检测(里面的参数是超时1秒)
    14         count+=1
    15     print('33[44m%s 开始链接...'%(currentThread().getName()))
    16 def check_mysql():
    17     '''检测数据库'''
    18     print('33[42m%s 检测mysql...' % (currentThread().getName()))
    19     time.sleep(5)
    20     e.set()
    21 if __name__ == '__main__':
    22     for i  in range(3):  #三个去链接
    23         t = Thread(target=conn_mysql)
    24         t.start()
    25     t = Thread(target=check_mysql)
    26     t.start()
    详看

    2.例如2,红绿灯的例子

     1 from  threading import Thread,Event,currentThread
     2 import time
     3 e = Event()
     4 def traffic_lights():
     5     '''红绿灯'''
     6     time.sleep(5)
     7     e.set()
     8 def car():
     9     ''''''
    10     print('33[42m %s 等绿灯33[0m'%currentThread().getName())
    11     e.wait()
    12     print('33[44m %s 车开始通行' % currentThread().getName())
    13 if __name__ == '__main__':
    14     for i in range(10):
    15         t = Thread(target=car)  #10辆车
    16         t.start()
    17     traffic_thread = Thread(target=traffic_lights)  #一个红绿灯
    18     traffic_thread.start()
    红绿灯

    四、定时器(Timer)

    指定n秒后执行某操作

    from threading import Timer
    def func(n):
        print('hello,world',n)
    t = Timer(3,func,args=(123,))  #等待三秒后执行func函数,因为func函数有参数,那就再传一个参数进去
    t.start()

    五、线程queue

    queue队列 :使用import queue,用法与进程Queue一样

    queue.Queue(maxsize=0) #先进先出

    1 # 1.队列-----------
    2 import queue
    3 q = queue.Queue(3) #先进先出
    4 q.put('first')
    5 q.put('second')
    6 q.put('third')
    7 print(q.get())
    8 print(q.get())
    9 print(q.get())
    View Code

    queue.LifoQueue(maxsize=0)#先进后出

    1 # 2.堆栈----------
    2 q = queue.LifoQueue() #先进后出(或者后进先出)
    3 q.put('first')
    4 q.put('second')
    5 q.put('third')
    6 q.put('for')
    7 print(q.get())
    8 print(q.get())
    9 print(q.get())
    View Code

    queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

     1 # ----------------
     2 '''3.put进入一个元组,元组的第一个元素是优先级
     3 (通常也可以是数字,或者也可以是非数字之间的比较)
     4 数字越小,优先级越高'''
     5 q = queue.PriorityQueue()
     6 q.put((20,'a'))
     7 q.put((10,'b'))  #先出来的是b,数字越小优先级越高嘛
     8 q.put((30,'c'))
     9 print(q.get())
    10 print(q.get())
    11 print(q.get())
    View Code

    六、多线程性能测试

    1.多核也就是多个CPU
    (1)cpu越多,提高的是计算的性能
    (2)如果程序是IO操作的时候(多核和单核是一样的),再多的cpu也没有意义。
    2.实现并发
    第一种:一个进程下,开多个线程
    第二种:开多个进程
    3.多进程:
    优点:可以利用多核
    缺点:开销大
    4.多线程
    优点:开销小
    缺点:不可以利用多核
    5多进程和多进程的应用场景
    1.计算密集型:也就是计算多,IO少
    如果是计算密集型,就用多进程(如金融分析等)
    2.IO密集型:也就是IO多,计算少
    如果是IO密集型的,就用多线程(一般遇到的都是IO密集型的)
    下例子练习:
     1 # 计算密集型的要开启多进程
     2 from  multiprocessing import Process
     3 from threading import Thread
     4 import time
     5 def work():
     6     res = 0
     7     for i in range(10000000):
     8         res+=i
     9 if __name__ == '__main__':
    10     l = []
    11     start = time.time()
    12     for i in range(4):
    13         p = Process(target=work)  #1.9371106624603271  #可以利用多核(也就是多个cpu)
    14         # p  = Thread(target=work)  #3.0401737689971924
    15         l.append(p)
    16         p.start()
    17     for p in l:
    18         p.join()
    19     stop = time.time()
    20     print('%s'%(stop-start))
    计算密集型
     1 # I/O密集型要开启多线程
     2 from multiprocessing import Process
     3 from threading import Thread
     4 import time
     5 def work():
     6     time.sleep(3)
     7 if __name__ == '__main__':
     8     l = []
     9     start = time.time()
    10     for i in range(400):
    11         # p = Process(target=work)  #34.9549994468689   #因为开了好多进程,它的开销大,花费的时间也就长了
    12         p = Thread(target=work) #2.2151265144348145  #当开了多个线程的时候,它的开销小,花费的时间也小了
    13         l.append(p)
    14         p.start()
    15     for i in l :
    16         i.join()
    17     stop = time.time()
    18     print('%s'%(stop-start))
    I/O密集型 

    七、python标准模块----concurrent.futures

  • 相关阅读:
    流处理 —— Spark Streaming中的Window操作
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.8 提供带注解的限定符元数据
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.7 为自动检测组件提供作用域
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.6 给自动检测组件命名
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.5 在组件中定义bean的元数据
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.4 使用过滤器自定义扫描
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.3 自动检测类和注册bean的定义
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.2 元注解
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10.1 @Component和深层的构造型注解
    Spring框架参考手册(4.2.6版本)翻译——第三部分 核心技术 6.10 类路径扫描和被管理的组件
  • 原文地址:https://www.cnblogs.com/haiyan123/p/7454131.html
Copyright © 2011-2022 走看看