zoukankan      html  css  js  c++  java
  • GIL锁 同步异步 Event事件

    一 GIL锁

    官方解释:
    '''
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
    native threads from executing Python bytecodes at once. This lock is necessary mainly 
    because CPython’s memory management is not thread-safe. (However, since the GIL 
    exists, other features have grown to depend on the guarantees that it enforces.)
    '''
    
    释义:
    在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除
    
    总结:
    在CPython中,GIL会把线程的并行变成串行,导致效率降低
    

    二 GIL带来的问题

    1. GIL是什么?

      解释:GIL,叫做全局解释锁,加到了解释器上,并且是一把互斥锁

    2. 为什么需要这把锁?

      主要是线程安全问题,因为python程序本质就是一堆字符串,所以运行一个python程序时,必须要开启一个解释器,但是在一个python解释器中只有一个,所有代码都要交给他来解释执行,当有多个线程都要执行代码时就会产生线程安全问题,那不开起子线程是不是就没有这个问题了? 当然不是的 ,

    3. cpython解释器与GC的问题

      在使用Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。那么GC与GIL有什么关联?

      要搞清楚这个问题,需先了解GC的工作原理,Python中内存管理使用的是引用计数,每个数会被加上一个整型的计数器,表示这个数据被引用的次数,当这个整数变为0时则表示该数据已经没有人使用,成了垃圾数据。

      当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,垃圾清理也是一串代码,也就需要一条线程来执行。

      简单的来说 python会自动帮我们处理垃圾,清扫垃圾也是一堆代码,也需要开启一个线程来执行,也就是说就算程序没有自己开线程,内部也有多个线程,这个时候GC线程与我们程序中的线程就会产生安全问题

      GIL是一把互斥锁,互斥锁将导致效率降低,具体表现是在cpython 即便开启了多线程,而且CPU也是多核的,却无法并行执行任务,因为解释器只有一个,同一时间只能有一个任务在执行

    4. 如何解决这个问题?

      由于底层的问题,目前还没有办法解决 ,只能尽可能的避免GIL锁影响我们的效率.

      1. 使用多线程能够实现并行,从而更好的利用多核CPU

      2. 对任务进行区分

        任务可以分为两类

        1. 计算密集型 基本没有IO 大部分时间都在计算 例如 人脸识别 图像处理 由于多线程不能并行,应该使用多进程将任务分配给不同的CPU核心

        2. IO密集型 计算任务非常少,大部分时间都在等待IO操作

          由于网络IO速度对比CPU处理速度非常慢,多线程并不会造成太大的影响

          另外如果有大量的客户端连接服务,进程根本开不起来 只能用多线程

    5. GIL与自定义锁的区别?

      GIL锁住的是解释器级别的数据

      自定义锁,锁的是解释器以外的共享资源 例如:硬盘上的文件.控制台

      对于这种不属于解释器的数据资源就应该自己加锁处理

      学完了GIL锁就应该知道你的任务是什么类型的就应该使用什么方式来处理,才能尽肯能提高效率**

    6. GIL的加锁与解锁时机?

      • 加锁的时机:在调用解释器时立即加锁
      • 解锁时机:①当前线程遇到了IO时释放 ②当前线程执行时间超过设定值时释放
    7. 关于性能的讨论?

      之所以加锁是为了解决线程安全问题

      由于有了锁,导致Cpython中多线程不能并行只能并发

      但是我们不能因此否认python

      1. python是一门语言,GIL是Cpython解释器的问题,还有Jpython,pypy

      2. 如果是单核CPU,GIL不会造成任何影响

      3. 由于目前大多数程序都是基于网络的,网络速度对比CPU是非常慢的,导致即使使用多核CPU也是无法提高效率的

      4. 对于IO密集型任务,不会有太大的影响

      5. 如果没有这把锁,我们程序员将必须自己来解决安全问题

        以下代码为性能测试

        from multiprocessing import Process
        from threading import  Thread
        import time
        # # 计算密集型任务
        #
        # def task():
        #     for i in range(100000000):
        #         1+1
        #
        #
        # if __name__ == '__main__':
        #     start_time = time.time()
        #
        #     ps = []
        #     for i in range(5):
        #         p = Process(target=task)
        #         # p = Thread(target=task)
        #         p.start()
        #         ps.append(p)
        #
        #     for i in ps:i.join()
        #
        #     print("共耗时:",time.time()-start_time)
        
        # 多进程胜
        
        
        # IO密集型任务
        
        def task():
            for i in range(100):
                with open(r"1.死锁现象.py",encoding="utf-8") as f:
                    f.read()
        
        if __name__ == '__main__':
            start_time = time.time()
        
            ps = []
            for i in range(10):
                p = Process(target=task)
                # p = Thread(target=task)
                p.start()
                ps.append(p)
        
            for i in ps:i.join()
            print("共耗时:",time.time()-start_time)
        
        # 多线程胜
        

    信号量

    • 可以限制同时并发执行公共代码的线程数量

    • 如果限制数量为1,则与普通互斥锁没有区别

    • 注意:信号量不是用来解决安全问题 的,而是用于限制最大的并发量

      from threading import Semaphore,currentThread,Thread
      import time
      
      s = Semaphore(5)
      
      def task():
          s.acquire()
          time.sleep(1)
          print(currentThread().name)
          s.release()
      
      for i in range(10):
          Thread(target=task).start()
      

    线程池与进程池

    • 什么是进程线程池?

      池表示一个容器,本质上就是一个存储进程或线程的列表

    • 池子中存储线程还是进程?

      如果是IO密集型任务使用线程池,如果是计算密集型任务则使用进程池

    • 为什么需要进程线程池

      在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程,虽然线程的开销小,但肯定不能无限的开,否则系统资源迟早被耗尽,解决的办法就是控制线程的数量。

      线程/进程池不仅帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及任务的分配

      import os
      import time
      from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
      from threading import activeCount,enumerate,currentThread
      
      # # 创建一个线程池   指定最多可以容纳两个线程
      # pool = ThreadPoolExecutor(20)
      #
      # def task():
      #     print(currentThread().name)
      #
      # # 提交任务到池子中
      # pool.submit(task)
      # pool.submit(task)
      #
      # print(enumerate())
      
      # 进程池的使用
      
      def task():
          time.sleep(1)
          print(os.getpid())
      
      
      if __name__ == '__main__':
          pool = ProcessPoolExecutor(2)
          pool.submit(task)
          pool.submit(task)
          pool.submit(task)
      

    同步异步

    程序的两种状态:阻塞和非阻塞

    阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!

    非阻塞:程序在正常运行没有遇到IO操作,或者通过某种方式使程序即时遇到了也不会停在原地,还可以执行其他操作,以提高CPU的占用率

    同步-异步 指的是提交任务的方式

    同步指调用:发起任务后必须在原地等待任务执行完成,才能继续执行

    异步指调用:发起任务后必须不用等待任务执行,可以立即开启执行其他操作

    同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会!

    很明显异步调用效率更高,但是任务的执行结果如何获取呢?

    程序中的异步调用并获取结果方式1:

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
            objs.append(res_obj)
    
    # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
    pool.shutdown(wait=True)
    
    # 从结果对象中取出执行结果
    for res_obj in objs:
        print(res_obj.result())
    print("over")
    

    程序中的异步调用并行并获取结果方式2:

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
            print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
    print("over")
    

    异步回调

    1. 什么是异步回调

      异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

    2. 为什么需要异步回调

      之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

    3. 异步回调的使用

      先来看一个案例:

      在编写爬虫程序时,通常都是两个步骤:

      1.从服务器下载一个网页文件

      2.读取并且解析文件内容,提取有用的数据

      按照以上流程可以编写一个简单的爬虫程序

      要请求网页数据则需要使用到第三方的请求库requests可以通过pip或是pycharm来安装,在pycharm中点击settings->解释器->点击+号->搜索requests->安装

      import requests,re,os,random,time
      from concurrent.futures import ProcessPoolExecutor
      
      def get_data(url):
          print("%s 正在请求%s" % (os.getpid(),url))
          time.sleep(random.randint(1,2))
          response = requests.get(url)
          print(os.getpid(),"请求成功 数据长度",len(response.content))
          #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
          return response
      
      def parser(obj):
          data = obj.result()
          htm = data.content.decode("utf-8")
          ls = re.findall("href=.*?com",htm)
          print(os.getpid(),"解析成功",len(ls),"个链接")
      
      if __name__ == '__main__':
          pool = ProcessPoolExecutor(3)
          urls = ["https://www.baidu.com",
                  "https://www.sina.com",
                  "https://www.python.org",
                  "https://www.tmall.com",
                  "https://www.mysql.com",
                  "https://www.apple.com.cn"]
          # objs = []
          for url in urls:
              # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
              # parser(res)
      
              obj = pool.submit(get_data,url) # 
              obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
              # objs.append(obj)
              
          # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
          # for obj in objs:
          #     res = obj.result()
          #     parser(res)
          # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
          # 2.解析任务变成了串行,
      

      总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数,

      如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。

      注意:

      1. 使用进程池时,回调函数都是主进程中执行执行
      2. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程
      3. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果

    Event事件

    什么是事件

    事件表示在某个时间发生了某个事情的通知信号,用于线程间协同工作。

    因为不同线程之间是独立运行的状态不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

    Event介绍

    Event象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    可用方法:

    event.isSet():返回event的状态值;
    event.wait():将阻塞线程;知道event的状态为True
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。
    

    使用案例:

    # 在链接mysql服务器前必须保证mysql已经启动,而启动需要花费一些时间,所以客户端不能立即发起链接 需要等待msyql启动完成后立即发起链接
    from threading import Event,Thread
    import time
    
    boot = False
    def start():
        global boot
        print("正正在启动服务器.....")
        time.sleep(5)
        print("服务器启动完成!")
        boot = True
        
    def connect():
        while True:
            if boot:
                print("链接成功")
                break
            else:
                print("链接失败")
            time.sleep(1)
    
    Thread(target=start).start()
    Thread(target=connect).start()
    Thread(target=connect).start()
    

    使用Event改造后:

    from threading import Event,Thread
    import time
    
    e = Event()
    def start():
        global boot
        print("正正在启动服务器.....")
        time.sleep(3)
        print("服务器启动完成!")
        e.set()
    
    def connect():
        e.wait()
        print("链接成功")
        
    Thread(target=start).start()
    Thread(target=connect).start()
    Thread(target=connect).start()
    

    增加需求,每次尝试链接等待1秒,尝试次数为3次

    from threading import Event,Thread
    import time
    
    e = Event()
    def start():
        global boot
        print("正正在启动服务器.....")
        time.sleep(5)
        print("服务器启动完成!")
        e.set()
    
    def connect():
        for i in range(1,4):
            print("第%s次尝试链接" % i)
            e.wait(1)
            if e.isSet():
                print("链接成功")
                break
            else:
                print("第%s次链接失败" % i)
        else:
            print("服务器未启动!")
    
    Thread(target=start).start()
    Thread(target=connect).start()
    # Thread(target=connect).start()
    
  • 相关阅读:
    springboot整合swagger2+跨域问题
    springboot整合日志+多环境配置+热部署
    springboot整合多数据源以及多数据源中的事务处理
    springboot整合jsp
    springboot整合freemarker
    SpringBoot的全局异常处理
    python之多进程记录
    使用python批量造测试数据
    python之global用法
    Jenkins构建从github上克隆时,报Host key verification failed.
  • 原文地址:https://www.cnblogs.com/bladecheng/p/11142114.html
Copyright © 2011-2022 走看看