zoukankan      html  css  js  c++  java
  • Python之并发编程(六)死锁与递归锁、信号量

    并发编程之多线程2--死锁与递归锁,信号量等

    1. 死锁现象与递归锁

      • 进程也是有死锁的:

        所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,

        它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,

        如下就是死锁

        • 死锁现象:

          死锁-------------------
          from  threading import Thread,Lock,RLock
          import time
          mutexA = Lock()
          mutexB = Lock()
          class MyThread(Thread):
              def run(self):
                  self.f1()
                  self.f2()
              def f1(self):
                  mutexA.acquire()
                  print('33[33m%s 拿到A锁 '%self.name)
                  mutexB.acquire()
                  print('33[45%s 拿到B锁 '%self.name)
                  mutexB.release()
                  mutexA.release()
              def f2(self):
                  mutexB.acquire()
                  print('33[33%s 拿到B锁 ' % self.name)
                  time.sleep(1)  #睡一秒就是为了保证A锁已经被别人那到了
                  mutexA.acquire()
                  print('33[45m%s 拿到B锁 ' % self.name)
                  mutexA.release()
                  mutexB.release()
          if __name__ == '__main__':
              for i in range(10):
                  t = MyThread()
                  t.start() #一开启就会去调用run方法
          
          死锁现象
          
      • 那么怎么解决死锁现象呢?

        解决方法,递归锁:在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

        这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

        直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

      • mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,<br>则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
        
      • 解决死锁

      • # 2.解决死锁的方法--------------递归锁
        from  threading import Thread,Lock,RLock
        import time
        mutexB = mutexA = RLock()
        class MyThread(Thread):
            def run(self):
                self.f1()
                self.f2()
            def f1(self):
                mutexA.acquire()
                print('33[33m%s 拿到A锁 '%self.name)
                mutexB.acquire()
                print('33[45%s 拿到B锁 '%self.name)
                mutexB.release()
                mutexA.release()
            def f2(self):
                mutexB.acquire()
                print('33[33%s 拿到B锁 ' % self.name)
                time.sleep(1)  #睡一秒就是为了保证A锁已经被别人拿到了
                mutexA.acquire()
                print('33[45m%s 拿到B锁 ' % self.name)
                mutexA.release()
                mutexB.release()
        if __name__ == '__main__':
            for i in range(10):
                t = MyThread()
                t.start() #一开启就会去调用run方法
        
        解决死锁
        
    2. 信号量Semaphore(其实也是一把锁)

      1. Semaphore管理一个内置的计数器

      2. Semaphore与进程池看起来类似,但是是完全不同的概念。

        • 进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。
        • 信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁
      3. Semaphore示例:

        from threading import Thread,Semaphore,currentThread
        import time,random
        sm = Semaphore(5) #运行的时候有5个人
        def task():
            sm.acquire()
            print('33[42m %s上厕所'%currentThread().getName())
            time.sleep(random.randint(1,3))
            print('33[31m %s上完厕所走了'%currentThread().getName())
            sm.release()
        if __name__ == '__main__':
            for i in range(20):  #开了5个线程 ,这20人都要上厕所
                t = Thread(target=task)
                t.start()
        
        Semaphore举例
        
      4. 结果:

        hread-1上厕所
         Thread-2上厕所
         Thread-3上厕所
         Thread-4上厕所
         Thread-5上厕所
         Thread-3上完厕所走了
         Thread-6上厕所
         Thread-1上完厕所走了
         Thread-7上厕所
         Thread-2上完厕所走了
         Thread-8上厕所
         Thread-6上完厕所走了
         Thread-5上完厕所走了
         Thread-4上完厕所走了
         Thread-9上厕所
         Thread-10上厕所
         Thread-11上厕所
         Thread-9上完厕所走了
         Thread-12上厕所
         Thread-7上完厕所走了
         Thread-13上厕所
         Thread-10上完厕所走了
         Thread-8上完厕所走了
         Thread-14上厕所
         Thread-15上厕所
         Thread-12上完厕所走了
         Thread-11上完厕所走了
         Thread-16上厕所
         Thread-17上厕所
         Thread-14上完厕所走了
         Thread-15上完厕所走了
         Thread-17上完厕所走了
         Thread-18上厕所
         Thread-19上厕所
         Thread-20上厕所
         Thread-13上完厕所走了
         Thread-20上完厕所走了
         Thread-16上完厕所走了
         Thread-18上完厕所走了
         Thread-19上完厕所走了
        
        运行结果
        
    3. GIL全局解释器锁:

      • 定义:GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

      • 运行py文件内部执行过程

        计算机有4个cpu,运行py文件

        首先开辟一个进程空间,线程执行将python解释器和文件加载进去,python解释器包含编译器和虚拟机,

        文件通过编译器转义成字节码,再通过虚拟机转义成机器码,然后与操作系统去执行该线程

        Cpython规定,同一时刻只允许一个线程进入解释器

        为什么加锁?

        1. 当时都是单核时代,而且cpu价格非常贵

        2. 如果不加全局锁,开发Cpython解释器程序员就会在源码内部各种加锁,解锁,非常麻烦,各种死锁现象等等,为了省事直接加了线程锁

        3. 优点:保证了Cpython解释器的数据资源安全

          缺点:单个进程的多线程不能利用多核

        4. 单个进程的多线程可以并发,但是不能利用多核,不能并行,多个进程可以并发,并行

    4. IO密集型和计算密集型

      1. IO密集型

      当电脑是三核是执行多个任务,并行执行,遇到阻塞就等待,所以耗费时间比较长

      当单核cpu执行多个任务,遇见阻塞,就执行非IO任务,效率较高

      IO密集型适合单进程多线程

      1. 计算密集型

        当电脑是三核的执行多个任务,没有阻塞就执行,效率较高

        当电脑是单核的执行多个人来,来回切换执行,例如打开QQ和微信 cpu需要来回切换执行

    5. GIL与lock锁的区别

      • 相同点:都是同种锁,互斥锁
      • 不同点:
        • GIL锁全局解释器锁,保护解释器内部资源的数据安全
        • GIL锁上锁,释放无序手动操作
        • 自己代码中定义的互斥锁保护进程中的资源数据安全
        • 自己定义的互斥锁必须自己手动上锁,释放锁
    6. 验证计算密集型和IO密集型的效率

      • 计算密集型

        • 多进程单线程运行速度

          from threading import Thread
          from multiprocessing import Process
          import time
          import random
          
          #计算密集型:单个进程的多线程和多个进程并发并行
          def task():
              count=0
              for i in range(10000000):
                  count+=1
          if __name__ == '__main__':
              #多进程的并发并行
              start_time=time.time()
              l1=[]
              for i in range(4):
                  p=Process(target=task,)
                  l1.append(p)
                  p.start()
              for p in l1:
                  p.join()
              print(f'{time.time()-start_time}')#1.3144586086273193
          
        • 单进程多线程运行速度

          from threading import Thread
          from multiprocessing import Process
          import time
          import random
          
          def task():
              count = 0
              for i in range(10000000):
                  count += 1
          
          
          if __name__ == '__main__':
              #单进程多线程
              start_time=time.time()
              l1=[]
              for i in range(4):
                  p=Thread(target=task,)
                  l1.append(p)
                  p.start()
              for p in l1:
                  p.join()
              print(f'{time.time()-start_time}')#2.4723618030548096
          
      • IO密集型

        • 多进程单单线程运行速度

          #IO密集型:单进程的多线程并发 和 多个进程的并发并行
          from multiprocessing import Process
          import time
          import random
          def task():
              count=0
              time.sleep(random.randint(1,3))
              count+=1
          if __name__ == '__main__':
              start_time=time.time()
              l1=[]
              for i in range(50):
                  p=Process(target=task,)
                  l1.append(p)
                  p.start()
              for p in l1:
                  p.join()
          
              print(f'{time.time()-start_time}')#4.954715013504028
          
        • 单进程多线程运行速度

          from threading import Thread
          from multiprocessing import Process
          import time
          import random
          def task():
              count=0
              time.sleep(random.randint(1,3))
              count+=1
          if __name__ == '__main__':
              start_time=time.time()
              l1=[]
              for i in range(50):
                  p=Thread(target=task,)
                  l1.append(p)
                  p.start()
              for p in l1:
                  p.join()
              print(f'{time.time()-start_time}')#3.013162136077881
          
    7. 多线程实现socke套接字通信

      • 服务端:

        import socket
        from threading import Thread
        
        def communicate(conn,addr):
            while 1:
                try:
                    from_client_data=conn.recv(1024)
                    print(f'来自客户端{addr}得消息:{from_client_data.decode("utf-8")}')
                    to_client_data=input(">>>").strip()
                    conn.send(to_client_data.encode('utf-8'))
                except Exception:
                    break
            conn.close()
        
        def _accept():
            server=socket.socket()
            server.bind(('127.0.0.1',8848))
            server.listen(5)
            while 1:
                conn,addr=server.accept()
                t=Thread(target=communicate,args=(conn,addr))
                t.start()
        if __name__ == '__main__':
            _accept()
        
      • 客户端:

        import socket
        
        client=socket.socket()
        client.connect(('127.0.0.1',8848))
        
        while 1:
            to_server_data=input(">>>>").strip()
            client.send(to_server_data.encode('utf-8'))
            from_server_date=client.recv(1024)
            print(f'来自客户端的消息{from_server_date.decode("utf-8")}')
        client.close()
        
  • 相关阅读:
    C#开源实现MJPEG流传输
    EntityFramework中使用Repository装饰器
    Lambda应用设计模式
    Lambda表达式的前世今生
    那些年黑了你的微软BUG
    敏捷软件开发揭秘
    SVN previous operation has not finished
    NodeJS+Express开发web,为什么中文显示为乱码
    使用Visual Studio 调试断点不起作用的问题解决办法 调试Revit CAD 不能进入断点
    openFileDialog的Filter属性设置
  • 原文地址:https://www.cnblogs.com/zhangdadayou/p/11431960.html
Copyright © 2011-2022 走看看