zoukankan      html  css  js  c++  java
  • Day037--Python--线程的其他方法,GIL, 线程事件,队列,线程池,协程

    1. 线程的一些其他方法

      threading.current_thread()  # 线程对象

      threading.current_thread().getName()  # 线程名称

      threading.current_thread().ident   # 当前线程ID

      threading.get_ident()  #  当前线程ID

      threading.enumerate()  # 连同主线程在内的正在运行的线程名称

      threading.active_count()  # 活跃的线程数

    from threading import Thread
    import threading
    import time
    
    
    def work():
        time.sleep(1)
        print('子线程', threading.get_ident())   # 当前线程 ID
        print('子线程', threading.current_thread().ident)   # 当前线程ID
        print('子线程', threading.current_thread().getName())  # 当前线程名称
    
    if __name__ == '__main__':
        t = Thread(target=work)
        t.start()
        # time.sleep(2)
        print('主线程', threading.current_thread())   # 主线程对象
        print('主线程', threading.current_thread().getName())  # 主线程名称
        print('主线程', threading.current_thread().ident)   # 当前线程ID
        print('主线程', threading.get_ident())   # 当前线程ID
    
        print(threading.enumerate())   # 连同主线程在内正在运行的线程
        print(threading.active_count())   # 目前正在活跃的线程数量
    View Code

    2. 线程事件

    from threading import Thread, Event
    
    e = Event()  # e的两种状态: False, True, 当事件对象的状态为False的时候, wait的地方会阻塞 
    
    # e.set()  # 将事件对象的状态改为True
    # e.clear()  # 将事件对象的状态改为False
    print('在这里等待')
    e.wait()
    print('好了')
    View Code 事件 

    3. 线程队列

      双向队列

    import queue
    # 先进先出 FIFO
    
    q = queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    q.put_nowait(4)  # 没有数据会报错
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get_nowait())
    View Code先进先出
    import queue
    # 后进先出 LIFO  Last in First out
    
    q = queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    q.put_nowait(4)  # 没有数据会报错
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get_nowait())
    View Code后进先出  Lifo

      优先级队列

    # 优先级队列
    
    import queue
    
    q = queue.PriorityQueue()
    q.put((20, 'b'))   # 元组第一个对象表示优先级, 第二个对象是放入队列的数据
    q.put((20, 'c'))
    q.put((20, 'a'))   # 先比较优先级, 优先级相同的按照数据ASCII码顺序排列
    q.put((0, 'b'))
    q.put((30, 'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    View Code 优先级队列
    import queue
    q = queue.PriorityQueue()
    q.put((20, 'abc'))   # 元组第一个对象表示优先级, 第二个对象是放入队列的数据
    q.put((20, 'acc'))   # 先比较优先级, 优先级相同的按照数据ASCII码顺序排列
    q.put((20, 'ac'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    View Code

    4.线程池 (重点)

      Python 标准模块: concurrent.futures

    早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,
    他俩用法上是一样的。为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPoolExecutor和ProcessPoolExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以
    直接用他们两个了。

      multiprocessing VS. concurrent.futures

      线程池中的基本方法

    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数
    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    
    def func(n):
        time.sleep(1)
        print(n, current_thread().ident)
        return n**2
    
    if __name__ == '__main__':
        t_p = ThreadPoolExecutor(max_workers=4)  # 最大线程数量
        map_res = t_p.map(func, range(10))
        # print(map_res)
        # for i in map_res:
        #     print(i)
        print([i for i in map_res])   # 异步执行, map自带join功能
    View Code map
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    import time
    
    def func(n):
        time.sleep(1)
        print(n, current_thread().ident)
        return n**2
    
    if __name__ == '__main__':
        t_p = ThreadPoolExecutor(max_workers=4)  # 此处切换成ProcessPoolExecutor就可以直接转换成进程, 下面都一样
        t_res_list = []
        for i in range(10): 
            res_obj = t_p.submit(func, i)   # 整合了创建线程和启动线程  异步提交了10个任务
            # res_obj.result()  # 和get一样会阻塞等待
            t_res_list.append(res_obj)
    
       t_p.shutdown()  # 等同于close() + join() 如果不加, res.result会四个四个地打印,加了就一次性打印
    for res in t_res_list: # 和进程池一样, 不用等到get阻塞,异步进行 print(res.result()) print('主线程结束')

       回调函数

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    import time
    
    def func(n):
        # time.sleep(1)
        # print(n, current_thread().ident)
        return n**2
    
    def func2(n):
        # print('>>>>>>>', n)
        print('current_thread>>>', current_thread().getName())
        print('>>>>>>>', n.result())
    
    if __name__ == '__main__':
        t_p = ThreadPoolExecutor(max_workers=4)  # 此处切换成ProcessPoolExecutor就可以直接转换成进程, 下面都一样
        for i in range(10):
            res_obj = t_p.submit(func, i).add_done_callback(func2)
    
        t_p.shutdown()  # 等同于close() + join()
        print('主线程结束')
    View Code

     

    5.GIL 全局解释器锁 (重点)

    '''
    执行进程要调用python解释器, 开启一个python解释器进程. 代码写在硬盘上, 需要加载到内存中交给cpu运行. 要想运行,系统先开辟
    一块内存, 里面跑进程. 先把python解释器代码加载到内存,也是在这个进程中,此时这个进程中包含解释器和py文件, 将py文件的代码
    交给解释器解释(一堆字符串,相当于传参), 解释器解释,遇到函数解释器自动调用. 目前默认的解释器是CPython解释器. 此时解释器
    有两项事情要做:1. 编译器  2. 虚拟机 . 先经过编译器编译, 先编译成C语言的字节码, 生成.pyc文件, 然后虚拟机拿到c的字节码,
    输出机器码, 再配合操作系统,把这个进程扔给CPU去执行.  所有py文件里的线程都要去cpython解释器中去编译解释, CPython解释器
    中有一个GIL全局解释器锁, 导致同一时间同一进程内只有一个线程能够执行解释, 无法应用多核.
    '''

      在C语言写的python解释器中存在全局解释器锁, 由于全局解释器锁的存在, 在同一时间内, python解释器只能运行一个线程的代码, 这大大影响了python多线程的性能. 而这个解释器锁由于历史原因, 现在几乎无法消除.

      python GIL之所以会影响多线程等性能, 是因为再多线程的情况下, 只有当线程获得了一个全局锁的时候, 该线程代码才能运行, 而全局锁只有一个, 所以使用python多线程, 在同一时刻也只有一个线程在运行, 因此即使是在多核的情况下,也只能发挥出单核的性能.

      python线程切换:

      对于有IO操作的线程, 当一个线程执行IO操作时,python会强制收回GIL, 其他线程可以抢夺.

      对于计算密集型线程, python中会有一个执行指令的计数器, 当一个线程执行了一定数量的指令时, 该线程就会停止执行并让出当前的锁, 其他线程就可以抢夺GIL.

      参见: GIL锁简介、以及解决方案

      GIL锁与互斥锁的区别  

    # 线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
    
    # join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
    
        #1.多个线程去抢GIL锁,即抢执行权限
        #2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
        #3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
        #4.直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程 

      GIL锁保证只有一个线程可以进入cpu运行, 如果想用多核, 可以用多进程.

      多线程与多进程进行纯计算的效率对比

    from multiprocessing import Process
    from threading import Thread
    import time
    
    def func():
        num = 0
        for i in range(1, 10000000):
            num += i
    
    if __name__ == '__main__':
        p_s_t = time.time()
        p_list = []
        for i in range(10):
            p = Process(target=func)
            p.start()
            p_list.append(p)
        [p.join() for p in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
    
        t_s_t = time.time()
        t_list = []
        for i in range(10):
            t = Thread(target=func,)
            t.start()
            t_list.append(t)
        [t.join() for t in t_list]
        t_e_t = time.time()
        t_dif_t = t_e_t - t_s_t
        print('多进程执行时间', p_dif_t)
        print('多线程执行时间', t_dif_t)
    View Code

      I/o密集型效率对比   当计算少, IO操作多的时候, 多线程有优势

    from multiprocessing import Process
    from threading import Thread
    import time
    
    def func():
        time.sleep(1)
        for i in range(10):
            print('
    %s' % i, end='')
    
    if __name__ == '__main__':
        p_s_t = time.time()
        p_list = []
        for i in range(10):
            p = Process(target=func)
            p.start()
            p_list.append(p)
        [p.join() for p in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
    
        t_s_t = time.time()
        t_list = []
        for i in range(10):
            t = Thread(target=func,)
            t.start()
            t_list.append(t)
        [t.join() for t in t_list]
        t_e_t = time.time()
        t_dif_t = t_e_t - t_s_t
        print('多进程执行时间', p_dif_t)
        print('多线程执行时间', t_dif_t)
    View Code

    6.协程的认识

      协程: 单线程下实现并发

      并发: 伪并行, 遇到IO就切换, 单核下多个任务之间切换执行, 给你的效果就是貌似你的几个程序在同时执行, 提高效率.

         (任务切换+保存状态)

      并行: 多核cpu, 真正的同时执行

      串行: 一个任务执行完再执行下一个任务

      多线程多进程下的任务切换+保存状态是操作系统做的, 也要耗时. 为了提高效率, 出现协程.

           并发与串行效率对比

    # 并发执行的效率(在没有IO的情况下,两个纯计算的任务)
    import time
    
    
    def func1():
        num2 = 0
        for i in range(1000001):
            num2 += i
            yield
            # print('执行到下一个yield',current_thread().name)
    
    
    def func2():
        g = func1()
        next(g)
        sum = 0
        for i in range(1000000):
            # g.send(i)
            sum += i
            next(g)
    
    
    s_t = time.time()
    func2()
    print('协程的时间', time.time() - s_t)
    
    
    # 串行执行的效率
    def func3():
        num2 = 0
        for i in range(1000001):
            num2 += i
            # print('执行到下一个yield',current_thread().name)
    
    
    def func4():
        sum = 0
        for i in range(1000000):
            sum = sum + i
    
    
    s_t = time.time()
    func3()
    func4()
    print('串行的时间', time.time() - s_t)
    View Code

      此时串行比yield切换省时间

    # 串行
    import time
    def consumer(i):
        time.sleep(1)
        print('处理了数据:', i)
    
    def producer():
        for i in range(3):
            consumer(i)
            print('发送了数据:', i)
    
    start2 = time.time()
    producer()
    stop2 = time.time()
    print(stop2 - start2)
    View Code 串行

      通过生成器实现单线程下的并发, 没有实现IO切换, 比串行慢

    import time
    # 协程, 单线程下实现并发
    def consumer():
        while 1:
            x = yield
            time.sleep(1)
            print('处理了数据:', x)
    
    def producer():
        g = consumer()
        next(g)
        for i in range(3):
            g.send(i)
            print('发送了数据:', i)
    
    start = time.time()
    producer()
    stop = time.time()
    print(stop - start)
    View Code协程

    7.Greenlet

    import time
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' % name)
        time.sleep(1)
        g2.switch('taibai')
        print('%s eat 2' % name)
        g2.switch()
    
    def play(name):
        print('%s play 1' % name)
        time.sleep(1)
        g1.switch()
        print('%s play 2' % name)
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    
    g1.switch('taibai')
    
    # 也没有实现IO切换
    View Code

    8.Gevent

    9. condition 条件

      使得线程等待,只有满足某条件时,才释放n个线程

    import threading
     
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" %n)
        con.release()
     
    if __name__ == '__main__':
     
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
     
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
            con.release()
    View Code
    def condition_func():
    
        ret = False
        inp = input('>>>')
        if inp == '1':
            ret = True
    
        return ret
    
    
    def run(n):
        con.acquire()
        con.wait_for(condition_func)
        print("run the thread: %s" %n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    View Code
     

    10. 定时器 Timer

    import time
    from threading import Timer,current_thread #这里就不需要再引入Timer
    import threading
    def hello():
        print(current_thread().getName())
        print("hello, world")
        # time.sleep(3) #如果你的子线程的程序执行时间比较长,那么这个定时任务也会乱,当然了,主要还是看业务需求
    # t = Timer(10, hello)  #创建一个子线程去执行后面的函数
    # t.start()
    # after 10 seconds, "hello, world" will be printed
    for i in range(5):
        t = Timer(2, hello)
        t.start()
        time.sleep(3) # 这个是创建一个t用的时间是2秒,等待3秒后创建出来第二个.
    
    print(threading.active_count())
    print('主进程',current_thread().getName())
    View Code

    参见: https://www.cnblogs.com/clschao/articles/9684694.html#part_9

  • 相关阅读:
    css样式学习笔记
    Css教程玉女心经版本
    weblogic高级进阶之ssl配置证书
    weblogic高级进阶之查看日志
    weblogic之高级进阶JMS的应用
    【WebLogic使用】3.WebLogic配置jndi数据源
    shiro的helloworld
    尚硅谷spring 事物管理
    尚硅谷spring aop详解
    Spring Boot 2.x Redis多数据源配置(jedis,lettuce)
  • 原文地址:https://www.cnblogs.com/surasun/p/9869094.html
Copyright © 2011-2022 走看看