zoukankan      html  css  js  c++  java
  • GIL 线程池与进程池 同步与异步

    1.GIL   全局解释器锁     只存在于cPython中,其他解释器中没有

      释以:在cpython中它是一种互斥锁是为了防止多个线程在同一时间执行python字节码,这个锁是非常重要的,因为cpython的内存管理是非线程安全的,而且很多已经存在的代码需要依赖这个锁,所以即使它影响了程序效率也无法将其去除。

      优点:保证了cpython中内存管理是线程安全的

      缺点:使得多线程无法并行

      注:非线程安全:多个线程访问统一个资源,会有问题;

        线程安全:多个线程访问统一个资源,不会有问题,与之相反

      相关:用cPython的原因:1.c编译过得结果可以直接被计算机识别;最重要的是c有大量现成的库(算法,通讯),cpython可以无缝连接c的任何现成代码

                  2,大量的应用程序都是IO密集,

    2.GIL带来的问题:会使程序整体效率降低

      由于互斥锁的特性,会导致程序串行,多线程不能并行,为保证数据安全,故而加锁,因此降低执行效率

    3.解决方案       分区任务类型  

      如果是io密集型:使用多线程,因为io多的话,时间会大量花在io等待上,对此并行对耗时无提升

    ```python
    from multiprocessing import Process
    from threading import Thread
    import time
    def task():
        with open("test.txt",encoding="utf-8") as f:
            f.read()
    if __name__ == '__main__':
        start_time = time.time()
        # 多进程
        # p1 = Process(target=task)
        # p2 = Process(target=task)
        # p3 = Process(target=task)
        # p4 = Process(target=task)
    
        # 多线程
        p1 = Thread(target=task)
        p2 = Thread(target=task)
        p3 = Thread(target=task)
        p4 = Thread(target=task)
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        p1.join()
        p2.join()
        p3.join()
        p4.join()
    
        print(time.time()-start_time)
    ```

      如果是计算密集型,要看有没有多核处理器了,有的话用多进程        需要注意并行的任务不能太多 开启进程非常消耗资源

    from multiprocessing import Process
    from threading import Thread
    import time
    
    def task():
        for i  in range(10000000):
            i += 1
    
    if __name__ == '__main__':
        start_time = time.time()
        # 多进程
        # p1 = Process(target=task)
        # p2 = Process(target=task)
        # p3 = Process(target=task)
        # p4 = Process(target=task)
    
        # 多线程
        p1 = Thread(target=task)
        p2 = Thread(target=task)
        p3 = Thread(target=task)
        p4 = Thread(target=task)
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        p1.join()
        p2.join()
        p3.join()
        p4.join()
        
        print(time.time()-start_time)

    4.加锁解锁时机

    加锁:在调用解释器时立即加锁

    解锁:该线程任务结束时,   遇到io时 ,他使用解释器程过长,超过设定值时  

    5.GIL锁与自定义锁的关系

      首先他们都是互斥锁,为什么有了gil还需要自己加锁呢,是因为gil是加在解释器上的锁,只能锁住届时其内部的资源,故而我们自己开起的资源得自己加锁

      例如: 我们自己开启了一个json文件,多个线程要同时访问, 如果第一个线程写入数据写到一半,执行超时了,另一个线程过来接着写,就会产生问题,

    6.线程池与进程池            池:容器

      定义:装线程与进程的一种容器

      使用时机:如果是io密集型用线程池,如果是计算密集型用进程池

      作用:1.控制线程与进程的数量保证了系统的稳定     

            注:信号量中实现制同时并发多少,但线程已经全都建完了

         2.自动管理线程的开启与销毁

         3.自动分配任务给空闲的人

      使用:1.创建池子

         2.submit  提交任务

         3.pool.shutdown()等待所有任务全部完毕销毁所有线程后关闭线程池,关闭后就不能提交新任务了

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,os
    
    # 创建进程池,指定最大进程数为3,此时不会创建进程,不指定数量时,默认为CPU和核数
    pool = ProcessPoolExecutor(3)
    
    def task():
        time.sleep(1)
        print(os.getpid(),"working..")
    
    if __name__ == '__main__':
        for i in range(10):
            pool.submit(task) # 提交任务时立即创建进程
    
        # 任务执行完成后也不会立即销毁进程
        time.sleep(2)
    
        for i in range(10):
            pool.submit(task) #再有新任务是 直接使用之前已经创建好的进程来执行
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread,active_count
    import time,os
    
    # 创建进程池,指定最大线程数为3,此时不会创建线程,不指定数量时,默认为CPU和核数*5
    pool = ThreadPoolExecutor(3)
    print(active_count()) # 只有一个主线
    
    def task():
        time.sleep(1)
        print(current_thread().name,"working..")
    
    if __name__ == '__main__':
        for i in range(10):
            pool.submit(task) # 第一次提交任务时立即创建线程
    
        # 任务执行完成后也不会立即销毁
        time.sleep(2)
    
        for i in range(10):
            pool.submit(task) #再有新任务时 直接使用之前已经创建好的线程来执行

    7.同步与异步

      在并发症中经常提及的概念

    1.阻塞与非阻塞(就绪或运行)      程序的运行状态     

     阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!

     非阻塞:程序在正常运行状态

    2.并发与并行                多任务处理方式

     并发:多个任务看起来像同时运行,本质是切换加保存状态

     并行:真正的同时进行中,必须具备多核处理器

    3.同步与异步           任务提交(执行)方式

       同步:是指发起任务后,必须在原地等待,直到任务完成拿到结果             同步  i=阻塞           卡住i=阻塞

      注:默认情况为同步的   同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会

       异步:发起任务无需等待结果,可以继续等待其他代码                  异步i=非阻塞

      注 :异步任务必须依赖并发与并行,在py中通过多线程或多进程

        异步效率明显高于同步

    #=======================程序中的异步调用并获取结果方式一
    from
    concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == '__main__': objs = [] for i in range(3): res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果 objs.append(res_obj) # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行 pool.shutdown(wait=True) # 从结果对象中取出执行结果 for res_obj in objs: print(res_obj.result()) print("over")
    #=======================程序中的异步调用并获取结果方式二
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
            print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
    print("over")

    8.异步回调

      异步任务的问题:任务完成后产生的返回值,任务发起方何时去取

      解决方案:异步回调

      定义:是指在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

        eg:如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调

      用处:即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

      使用:就是在提交任务后得到一个futures对象,调用对象的add_done_callback来指定一个回调函数

    import requests,re,os,random,time
    from concurrent.futures import ProcessPoolExecutor
    
    def get_data(url):
        print("%s 正在请求%s" % (os.getpid(),url))
        time.sleep(random.randint(1,2))
        response = requests.get(url)
        print(os.getpid(),"请求成功 数据长度",len(response.content))
        #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
        return response
    
    def parser(obj):
        data = obj.result()
        htm = data.content.decode("utf-8")
        ls = re.findall("href=.*?com",htm)
        print(os.getpid(),"解析成功",len(ls),"个链接")
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(3)
        urls = ["https://www.baidu.com",
                "https://www.sina.com",
                "https://www.python.org",
                "https://www.tmall.com",
                "https://www.mysql.com",
                "https://www.apple.com.cn"]
        # objs = []
        for url in urls:
            # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
            # parser(res)
    
            obj = pool.submit(get_data,url) # 
            obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
            # objs.append(obj)
            
        # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
        # for obj in objs:
        #     res = obj.result()
        #     parser(res)
        # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
        # 2.解析任务变成了串行,

    9.通常异步任务都会绑定一个回调函数,用来处理任务结果

     

      1.在进程池中回调函数是在父进程中执行,因为任务由父进程发起,所以结果也应交给父进程

      2.在线程池中回调函数在子进程中执行,因为线程之间数据共享

      3.如果你的任务结果需要交给父进程来处理,那建议回调函数,回调函数会自动将数据返回给父进程,不需要自己处理IPC

  • 相关阅读:
    windows live sync, mesh, skydrive
    忘记SQL SERVER密码的解决
    处理ObjectDataSource调用中DAL层中的异常
    C#中获取应用程序路径的方法(集合)
    datatable复制一行数据到本表
    [Yii Framework] yii中如何在查询的时候使用数据库函数
    [Yii Framework] yii的路由配置
    [Yii Framework] 已经定义的命名空间常量
    [Yii Framework] yii中关于filter
    [Yii Framework] yii中如何不加载layout
  • 原文地址:https://www.cnblogs.com/wyf20190411-/p/10981946.html
Copyright © 2011-2022 走看看