zoukankan      html  css  js  c++  java
  • 并发编程--一堆锁,GIL,同步异步,Event事件

    一堆锁

    死锁现象(*****

    ​ 死锁指的是,某个资源被占用之后,一直得不到释放,导致其他需要这个资源的线程进入阻塞状态

    • 产生死锁的情况

      1. 对同一把互斥锁,进行了多次加锁

      2. 一个共享资源,在访问时必须具备多把锁,但是这些锁被不同的线程或进程所持有,这样会导致相互等待对方释放,从而程序卡死

        解决方案:

        1. 按照相同的顺序进行抢锁
        2. 给抢锁加上超时,如果超时就放弃抢锁

    递归锁 RLock (了解)

    ​ 与普通的互斥锁相同点在于,在多线程之间有互斥的效果但是同一个线程中,递归锁可以多次加锁,执行多次acquire

    ​ 同一个线程必须保证 加锁的次数和解锁的次数相同 其它线程才能够抢到这把锁

    ​ RLock只是解决了代码逻辑上的错误导致的死锁,并不能解决多个锁造成的死锁问题

    # 同一把RLock 多次acquire
    #l1 = RLock()
    #l2 = l1
    
    # 不同的RLock 依然会锁死
    #l1 = RLock()
    #l2 = RLock()
    
    def task():
        l1.acquire()
        print(threading.current_thread().name,"拿到了筷子")
        time.sleep(0.1)
        l2.acquire()
        print(threading.current_thread().name, "拿到了盘子")
    
        print("吃饭")
        l1.release()
        l2.release()
    
    def task2():
        l2.acquire()
        print(threading.current_thread().name, "拿到了盘子")
    
        l1.acquire()
        print(threading.current_thread().name,"拿到了筷子")
    
        print("吃饭")
        l2.release()
        l1.release()
    
    t1 = Thread(target=task)
    t1.start()
    t2 = Thread(target=task2)
    t2.start()
    

    信号量 (了解)

    ​ 可以用来限制同时并发执行公共代码的线程数量

    ​ 如果限制数量为1,则和普通的互斥锁一样

    from threading import Thread, Semaphore, currentThread
    import time
    
    
    s = Semaphore(3)   # 一次只能有3个线程占用
    def task():
        s.acquire()
        time.sleep(1)
        print(currentThread().name)
        s.release()
        
    for i in range(10):
        Thread(target=task).start()
    

    ​ 注意:信号量不是用来解决安全问题的 而是用于限制最大的并发量

    GIL(*****

    什么时GIL锁

    '''
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
    '''
    
    '''
    在CPython中,全局解释器锁(global interpreter lock, GIL)是一个互斥体,它防止多个本机线程同时执行Python字节码。这个锁是必要的,主要是因为CPython的内存管理不是线程安全的。(然而,自从GIL存在以来,其他特性已经逐渐依赖于它强制执行的保证。)
    '''
    

    GIL本质是一把互斥锁,但GIL锁住的是解释器级别的数据

    自定义锁,锁的是解释器以外的共享资源,例如:硬盘上的文件 控制台,对于这种不属于解释器的数据资源就应该自己加锁处理

    为什么需要GIL锁

    ​ python程序本身只是一串字符,只有通过python解释器运行之后才具有意义,但是在一个python程序中python解释器只有一个,所有代码都是由它来执行的,当多个线程都需要执行代码时,就会产生线程安全问题。

    ​ 如果我们不开线程,是否就没有问题了?

    Cpython解释器与GC的问题

    GC线程又叫垃圾回收机制

    在python中,解释器会自动帮我们进行垃圾回收,这也是需要开启一个线程来进行的,也就是说,当我们没有开启子线程的时候,内部还是有多个线程在执行。所以这就会带来线程安全问题

    例如:

    ​ 线程a要定义一个变量 b,首先申请一块空内存,然后把数据装进去,此时变量b的引用计数为0,最后才会将引用计数加一

    ​ 但是,如果在刚把数据装进去的时候,CPU切换到GC线程,GC线程就会把变量b当作垃圾进行回收

    GIL锁带来的问题

    GIL锁本质还是一把互斥锁,所以就会降低程序运行效率

    • 没有解决办法,只有尽可能的避免GIL带来的影响

      1. 使用多线程实现并行,更好的利用多核CPU

      2. 对任务进行区分

        1. 计算密集型

          基本没有IO操作,大部分时间都是在进行计算,例如人脸识别,图像处理

          由于线程不能并行,应该使用多进程,将任务分给不同的CPU进行

        2. IO密集型

          计算任务非常少,大部分时间都在等待IO操作

          由于网络IO速度对比CPU处理速度非常慢,多线程并不会造成太大的影响,另外如有大量客户端连接服务,进程根本开不起来,只能用多线程

    多线程与多进程性能对比

    加锁主要就是为了解决线程安全问题,但这也导致了Cpython中,多线程只能并发而不能并行

    python的优势:

    1. python是一门语言,GIL是Cpython解释器的问题 ,Jpython,pypy 并没有

    2. 如果是单核CPU,GIL不会造成任何影响

    3. 由于目前大多数程序都是基于网络的,网络速度对比CPU是非常慢的,导致即使多核CPU也无法提高效率

    4. 对于IO密集型任务,不会有太大的影响

    5. 如果没有这把锁,我们程序猿将必须自己来解决安全问题

    性能测试 :

    1. 计算密集型任务

      from multiprocessing import Process
      from threading import  Thread
      import time
      # # 计算密集型任务
      
      def task():
          for i in range(100000000):
              1+1
      
      if __name__ == '__main__':
          start_time = time.time()
          ps = []
          for i in range(5):
              # p = Process(target=task)   # 4.420854806900024
              p = Thread(target=task)   # 20.043513298034668
              p.start()
              ps.append(p)
              
          for i in ps:i.join()
          print("共耗时:",time.time()-start_time)
      
    2. IO密集型

      from multiprocessing import Process
      from threading import  Thread
      import time
      # IO密集型任务
      
      def task():
          for i in range(100):
              with open(r"1.死锁现象.py", 'r', encoding="utf-8") as fr:
                  fr.read()
      
      if __name__ == '__main__':
          start_time = time.time()
      
          ps = []
          for i in range(10):
              # p = Process(target=task)   # 1.1040928363800049
              p = Thread(target=task)   # 0.09919905662536621
              p.start()
              ps.append(p)
      
          for i in ps:i.join()
          print("共耗时:",time.time()-start_time)
      

    进程池与线程池

    池是一个容器,可以用来装一些东西,但池也是有容量限制的,不可能无限制的装东西

    进程池和线程池就是用来装进程和线程的容器

    进程池和线程池的好处:

    1. 可以避免频繁的创建和销毁(进程/线程)来的资源开销
    2. 可以限制同时存在的线程数量,以保证服务器不会应为资源不足而导致崩溃
    3. 帮我们管理了线程的生命周期
    4. 管理了任务的分配

    进程池与线程池的使用

    1. 线程池

      from concurrent.futures import ThreadPoolExecutor
      from threading import enumerate,currentThread
      
      # 创建一个线程池   指定最多可以容纳两个线程
      pool = ThreadPoolExecutor(2)
      
      def task():
          print(currentThread().name)
      
      # 提交任务到池子中
      pool.submit(task)
      pool.submit(task)
      
      print(enumerate())
      
    2. 进程池

      import os
      import time
      from concurrent.futures import ProcessPoolExecutor
      
      # 创建一个进程池, 指定最多可以容纳两个线程
      def task():
          time.sleep(1)
          print(os.getpid())
      
      if __name__ == '__main__':
          pool = ProcessPoolExecutor(2)
          pool.submit(task)
          pool.submit(task)
          pool.submit(task)
      

    注意:如果进程不结束,池子里面的进程或线程也是一直存活的

    同步异步(*****

    **异步同步指的是提交任务的方式 **

    同步:是指提交任务后,必须需要等待任务完成之后才能进行下一个任务

    异步:是指提交任务后,不需要等待任务完成,继续进行下一个任务

    异步效率高于同步,但异步任务将导致一个问题,就是任务的发起方不知道任务何时处理完毕

    解决办法:

    1. 轮询:每隔一段时间询问一次

      ​ 结果:效率低,无法及时获取结果

    2. 异步回调:让任务的执行方主动通知

      ​ 结果:可以及时拿到任务的结果

    案例:

    from threading import Thread
    
    # 具体的任务
    def task(callback):
        print('run')
        for i in range(100000000):
            1 + 1
        callback(True)
        
    # 回调函数,参数为任务的结果
    def finished(res):
        if res:
            print('任务完成')
        else:
            print('任务失败')
            
            
    print('start...')
    t = Thread(target=task,args=(finished,))
    t.start()
    print('over')
    

    ​ 线程池中回调的使用

    from concurrent.futures import ThreadPoolExecutor
    
    def task(num):
        time.sleep(1)
        print(num)
        return "hello python"
    
    def callback(obj):
        print(obj.result())
    
    pool = ThreadPoolExecutor()
    res = pool.submit(task,123)
    res.add_done_callback(callback)
    print("over") 
    

    Event事件

    Event用于线程间状态同步,

    执行状态:指的是程序运行到哪一步,此时的状态

    执行结果:程序运行完了得到的结果

    如果我们要拿到执行结果,可以采用异步回调

    Event事件的本质就是一个标志,True or False

    Evevt里面包含了一个wait函数,可以阻塞当前线程,直到状态由False变为True

    from threading import Thread, Event
    import time
    
    
    e = Event()
    # is_bool = False
    
    def start_server():
        # global is_bool
        print('starting server.....')
        time.sleep(3)
        print('server started')
        # is_bool = True
        e.set()
    
    
    def connect_server():
        e.wait()
        if e.is_set():
            print('连接服务器成功')
        # while True:
        #     if is_bool:
        #         print('连接服务器成功')
        #         break
        #     else:
        #         print('连接服务器失败')
        # 
        #     time.sleep(0.1)
    
    
    t1 = Thread(target=start_server)
    t2 = Thread(target=connect_server)
    
    t1.start()
    t2.start()
    
  • 相关阅读:
    Python基础检测:20171105
    Python中斐波那契数列的四种写法
    Python中斐波那契数列的四种写法
    学习Python3:20171031
    学习Python3:20171031
    9.3 Trains and Evaluates the MNIST network using a feed dictionary
    9.3 Trains and Evaluates the MNIST network using a feed dictionary
    学习Python3:201701030
    学习Python3:201701030
    周末微光
  • 原文地址:https://www.cnblogs.com/Hades123/p/11158948.html
Copyright © 2011-2022 走看看