zoukankan      html  css  js  c++  java
  • 并发编程之多线程

    一 线程

    什么是线程?

      程序的执行路线,线程是cpu上的的执行单位。传统举例:我们把操作系统比喻为一个工厂,进程就是这个工厂中的车间,线程是属于工厂中的流水线。

    进程和线程的关系?

      1.进程中包含了运行程序需要的所有资源。

      2.进程是一个资源单位,线程是CPU的最小执行单位。

      3.每一个进程一旦被创建,就默认开启了一条线程,称之为主线程。

      4.一个进程可以包含多个进程

      5.进程包含线程,而线程依赖进程。

    进程与线程的区别?

      1.进程对于操作系统的资源消耗非常高,而线程相反非常低(比进程低1-100倍)。

      2.在同一个进程中,多个线程之间的资源是共享的,但是在多个进程中,进程之间的资源是不共享的。

    为什么要使用多线程?

      为了提高程序效率。

      可是多进程也可以提高程序效率啊,为什么不使用多进程,那是因为进程对操作系统的资源耗费非常高。

    多线程是如何提高程序的开发效率?

      多线程可以使cpu在一个进程内进行切换,从而提高cpu占用率。

    多线程的使用场景?

      当程序遇到IO操作的时候,可以使用多线程的方式提高程序的效率,但是当程序中全部都是计算任务时,使用多线程不仅无法提高程序的效率,甚至由于cpu切换耗费时间,而导致程序效率的降低。

    #1.实例化Thread类
    from threading import Thread
    
    def task():
        print("子线程 is running")
    
    if __name__ == '__main__':
        t = Thread(target=task)
        t.start()
        print("over1")
    实例化Thread类的方法使用多线程
    from threading import Thread
    class MyThread(Thread):
        def run(self):
            print("子进程 is running")
    if __name__ == '__main__':
        t = MyThread()
        t.start()
        print("over2")
    自定义继承Thread的类使用多线程
    from threading import Thread
    
    
    x = 100
    
    def task():
        print("run....")
        global x
        x = 0
    
    
    t = Thread(target=task)
    t.start()
    t.join()
    
    print(x)
    print("over")
    
    
    >>>:
    run....
    0
    over
    多线程的资源共享

    二 线程的其他方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    三 守护线程

    在之前我们提到过一个守护进程,无论是进程还是线程,都遵循:守护XXX会等待主XXX运行完毕后被销毁,需要强调的是:运行完毕并非终止运行。

    #1.对主进程来说,运行完毕指的是主进程代码运行完毕
    
    #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

    详细解释:

    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    """
        守护线程
        守护线程会在所有非守护线程结束后结束
            三个线程 分贝 皇帝 太子 和皇后
            如果把皇后设置为守护线程 那么皇后线程会在 太子和皇帝都死亡后死亡
        当所有非线程结束后 守护线程也跟着结束了
    
    
        进程 守护进程会在被守护进程死亡跟着死亡
        同一个进程 可以有多个守护线程
    
    
    
    """
    
    from threading import Thread
    
    
    import time
    
    def task():
        print("sub thread run....")
        time.sleep(3)
        print("sub thread over....")
    
    t = Thread(target=task)
    t.setDaemon(True)
    t.start()
    
    
    t = Thread(target=task)
    t.setDaemon(True)
    t.start()
    
    print("over!")
    
    
    >>>:
    sub thread run....
    sub thread run....
    over!
    守护线程

    守护线程的迷惑人的例子:

    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
    
    >>>:
    123
    456
    main-------
    end123
    end456
    主线程运行结束是否结束问题

    上述运行结果的原因:t1是守护线程,t2是非守护线程,这两个线程与主线程公用一块内存空间,子线程的启动时间在主线程代码运行完毕之前,所以会首先执行t1与t2线程,打印出123,t1线程睡眠,此时cpu切换至t2线程,打印456,t2线程睡眠,cpu切换至主线程,主线程打印"main-------",此时其实主线程并未执行完毕,因为主线程执行完毕是指其他非守护线程运行完毕后,主线程才算运行完毕,因此,还会运行子线程中的代码,由于t1中睡眠的时间比t2中的时间短,所以守护线程t1也恰好运行完毕打印"end123",最后非守护线程t2打印"end456",此时非守护线程全部运行完毕,主线程此时运行完毕。

    四 线程中的互斥锁

    当多个进程或多个线程需要同时修改同一份数据时,可能会造成数据的错乱,所以必须得加锁。

    import time
    from threading import Thread,Lock
    
    lock =Lock()
    
    a = 100
    
    def task():
        lock.acquire()
        global a
        temp = a - 1
        time.sleep(0.01)
        a = temp
        lock.release()
    
    ts = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        ts.append(t)
    
    for t in ts:
        t.join()  #防止由于线程睡眠,导致不同线程都在睡眠阶段完成减法步骤,导致最终结果不正确。
    
    
    print(a)
    使用线程完成减法---锁的方法

    但是使用锁这个方法有一个缺点:同一时间的数据只能被一个线程使用,信号量便解决了这个问题。

    五 信号量

    什么是信号量?

      信号量其实也是一种锁,特点是可以设置一个数据可以同时被多个线程共享。

    信号量与普通锁的区别?

      1.普通锁一旦枷锁,则意味着多个线程中的共享数据在同一时间只能被一个线程使用。

      2.信号量可以让这个数据在同一时间能被多个线程使用

    使用场景?

      限制一个数据被同时访问的次数,保证程序的正常运行

    sem = Semaphore(3)
    
    def task():
        sem.acquire()
        print("%s run..." % current_thread())
        time.sleep(3)
        sem.release()
    
    
    for i in range(10):
        t = Thread(target=task)
        t.start()
    运行10个线程,但是信号量值为3

     六 GIL锁

    什么是GIL?

    官方解释:

    '''
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
    native threads from executing Python bytecodes at once. This lock is necessary mainly 
    because CPython’s memory management is not thread-safe. (However, since the GIL 
    exists, other features have grown to depend on the guarantees that it enforces.)
    '''
    
    释义:
    在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除。
    
    总结:
    在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。
    在CPython中,GIL会把线程的并行变成串行,导致效率降低

    GIL本质是一把互斥锁,既然是互斥锁,所有互斥锁的本质都是一样的,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。可以肯定的一点:保护 不同的数据的安全,就应该加不同的锁。需要知道的是,解释器并不只有Cpython,还有PyPy,JPython等等。GIL也只存在在Cpython中,这并不是Python这门语言的问题,而是CPython解释器的问题!

    GIL带来的问题?

    首先明确一下GIL带来的问题:GIL会降低程序的整体效率。下面解释一步一步的来解释一下:

      首先明确执行一个py文件,分为三个步骤:

        1.运行python解释器

        2.由python解释器将硬盘中的代码加载到内存中

        3.python解释器代码解析代码,交给cpu执行

    其次需要明确的一点是每当执行一个py文件,就会立即启动一个python解释器。

    当执行一个test.py文件时内存结构如下:

    py文件中的内容本质上都是字符串,只有在被解释器解析时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统执行。

    当我一个进程中仅存在一条线程既主线程时,此时GIL锁的存在不会有任何影响(不考虑垃圾回收机制),因为没有其他线程与主线程抢数据,必然不会造成数据错乱。

    当进程中存在多个线程时,这些线程都需要共享解释器,为了避免共享带来饿数据竞争问题,于是就给解释器加上了互斥锁。由于互斥锁的特性,程序串行,保证数据安全,降低执行效率,GIL将使得程序整体效率降低。

    为什么需要GIL?

      在了解这个问题之前,我们需要先了解一个机制--垃圾回收机制(GC),在Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。

      GC的工作原理:Python中内存管理使用的是引用计数,每个数都会被加上一个整形的计数器,表示这个数据被引用的次数,当这个数据被引用的此时为0时,该数据就变成了垃圾数据,当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作(垃圾清理也是一个线程)。

    综上,如果多个线程的target = work,那么执行流程是多个线程先访问解释器的代码即拿到执行权限,然后将target代码交给解释器解析并且执行。处于同一个进程中的线程共享处于这个进程中的解释器,所以垃圾回收线程也会访问解释器,并且由解释器执行,那么这就可能会遭遇一个问题:对于同一数据100,可能线程1中有代码 x = 100,线程1执行时,定义变量x = 100,定义变量的步骤为第一步在内存中申请内存空间并且将100存进去,第二步将100的内存地址与变量名x进行绑定,这个时候就有可能发生当执行完第一步后,CPU切换到了GC线程,此时变量定义还未完成,GC线程发现数据100的引用计数为0就会把它当成垃圾数据清理掉,等CPU再次切换至线程1时,刚刚保存的数据100已经被清理了,导致无法正常定义变量。如下图:

    为了解决这个问题(他一些涉及到内存的操作同样可能产生问题问题),Cpython解释器就简单粗暴的为解释器加了一把互斥锁即GIL锁,如下图:

    加上GIL锁后,同一时间只能有一个线程使用解释器,保证了解释器数据的安全。

    GIL的加锁与解锁时机

      加锁的时机:在有线程调用解释器立即加锁

      解锁时机:1.当前线程遇到IO操作时,释放锁,此时CPU会切换至其他线程。

           2.当前线程执行时间超过设定值时释放,解释器会检测线程的执行时间,一旦执行时间达到了某个阈值,就会通知当前线程保存状态,CPU将会切换至其他线程,以此来保证数据安全。

    GIL性能的讨论

      GIL的优点:保证了CPython中的内存管理是线程安全的。

      GIL的缺点:互斥锁的特征使得多线程无法实现并发。

      那就有人就会说了,Python连多线程都实现不了,那么这个语言有什么用,垃圾玩意。我想说的是兄弟,你别急,听我继续说,大家皆座,勿6,基操。

      我辩解的原因:

      1.GIL仅进在Cpython解释器中存在,在其他的解释器中没有这个问题,因此这个问题并不是Python语言这门语言的缺点。

      2.单核CPU下,多线程本来就没真正的并行。

      3.在多核CPU下,运行效率的确比单核处理器高,但是当前应用程序大多都是基于网络的,程序的运行速度,不仅仅要考虑CPU的运行效率,也要考虑网络速度,但是网络速度远远比不上CPU的运算速度,这就意味着每次处理器在执行运算前都需要等待网络IO,这样一来多核优势也就没有那么明显了。

    任务1 从网络上下载一个网页,等待网络IO的时间为1分钟,解析网页数据花费,1秒钟
    
    任务2 将用户输入数据并将其转换为大写,等待用户输入时间为1分钟,转换为大写花费,1秒钟
    
    
    **单核CPU下:**1.开启第一个任务后进入等待。2.切换到第二个任务也进入了等待。一分钟后解析网页数据花费1秒解析完成切换到第二个任务,转换为大写花费1秒,那么总耗时为:1分+1秒+1秒 = 1分钟2秒
    
     **多核CPU下:**1.CPU1处理第一个任务等待1分钟,解析花费1秒钟。1.CPU2处理第二个任务等待1分钟,转换大写花费1秒钟。由于两个任务是并行执行的所以总的执行时间为1分钟+1秒钟 = 1分钟1秒
    
    可以发现,多核CPU对于总的执行时间提升只有1秒,但是这边的1秒实际上是夸张了,转换大写操作不可能需要1秒,时间非常短!
    上面的两个任务都是需要大量IO时间的,这样的任务称之为IO密集型,与之对应的是计算密集型即没有IO操作全都是计算任务。
    
       对于计算密集型任务,Python多线程的确比不上其他语言!为了解决这个弊端,Python推出了多进程技术,可以良好的利用多核处理器来完成计算密集任务。
    举个例子

    总结:

      1.单核下无论是IO密集还是计算密集型任务,GIL都不会产生任何影响。

      2.多核下对于IO密集型任务,GIL会有细微的影响,但是基本上可以忽略。

      3.Cpython中IO密集任务应该采用多线程,计算密集型任务应该采用多进程。

    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    def task():
        for i in range(10000000):
            i += 1
    
    
    if __name__ == '__main__':
        start_time = time.time()
        # 多进程
        p1 = Process(target=task)
        p2 = Process(target=task)
        p3 = Process(target=task)
        p4 = Process(target=task)     #结果为: 2.4078094959259033
    
        # 多线程
        # p1 = Thread(target=task)
        # p2 = Thread(target=task)
        # p3 = Thread(target=task)
        # p4 = Thread(target=task)  #结果为: 2.7793614864349365
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        p1.join()
        p2.join()
        p3.join()
        p4.join()
    
        print(time.time() - start_time)
    计算密集型的效率测试(多进程与多线程)
    from multiprocessing import Process
    from threading import Thread
    import time
    
    
    def task():
        with open(r"C:UsersAdministratorDesktopday35视频1.GIL锁.mp4",'rb')as f:
            f.read()
    
    
    if __name__ == '__main__':
        start_time = time.time()
        # 多进程
        p1 = Process(target=task)
        p2 = Process(target=task)
        p3 = Process(target=task)
        p4 = Process(target=task)     #结果为: 0.9506115913391113
    
        # 多线程
        # p1 = Thread(target=task)
        # p2 = Thread(target=task)
        # p3 = Thread(target=task)
        # p4 = Thread(target=task)  #结果为: 0.0015041828155517578
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        p1.join()
        p2.join()
        p3.join()
        p4.join()
    
        print(time.time() - start_time)
    IO密集型效率测试

    七 自定义的线程锁与GIL的区别

    GIL保护的是解释器级别的数据安全,比如对象的引用计数,垃圾回收数据等。而自定义的线程锁是对线程中自己定义的数据的保护。

    from threading import Thread,Lock
    import time
    
    a = 0
    def task():
        global a
        temp = a
        time.sleep(0.01) 
        a = temp + 1
    
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(a)
    未加上自定义锁的线程

    过程分析:

      1.线程1获得CPU执行权,并获取GIL锁执行代码,得到a的值为0后进入睡眠,释放GIL锁,切换CPU。

      2.线程2获得CPU执行权,并获取GIL锁执行代码,得到a值为0后进入睡眠,释放GIL锁,切换CPU。

      3.线程1睡醒后获取CPU执行权,并获取GIL锁执行代码,将temp的值+1后赋值给a,执行完毕,释放GIL锁,切换CPU。

      4.线程2睡醒后获取CPU执行权,并获取GIL锁执行代码,将temp的值+1后赋值给a,执行完毕,释放GIL锁,切换CPU。最终a的值是1,并不是我们想要的答案2。

    之所以出现这个问题,是因为两个线程在并发的执行同一段代码。解决方案就是加锁。

    from threading import Thread,Lock
    import time
    
    lock = Lock()
    a = 0
    def task():
        global a
        lock.acquire()
        temp = a
        time.sleep(0.01)
        a = temp + 1
        lock.release()
    
    t1 = Thread(target=task)
    t2 = Thread(target=task)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(a)
    加上自定义锁的线程

    过程分析: 

      1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock

      2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL

      3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1

      4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock

      5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2。

    总结

      1.GIL锁保护的是解释器级别的共享数据的安全。

      2.自定义锁保护的是多个线程之间的共享数据的数据安全。

    八 进程池与线程池

    什么是进程池/线程池?

      池表示一个容器,本质上就是一个存储进程/线程的列表。

    池子中存储线程还是进程?

      如果是IO密集型任务使用线程池,如果是计算密集型任务则使用进程池。

    为什么需要进程池/线程池?

      在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程,虽然线程的开销小,但肯定不能无限开,否则系统资源迟早被耗尽,解决的方法就是控制线程的数量。线程池就帮我们解决了这个问题。

    注意:线程/进程池不仅能帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及人物的分配。

    from concurrent.futures import ProcessPoolExecutor
    import os,time
    pool = ProcessPoolExecutor(3) #设置进程池的数量为3
    
    def task():
        time.sleep(1)  #切换CPU
        print("进程号为%s is running"%os.getpid())
    
    
    if __name__ == '__main__':
        for i in range(10):
            pool.submit(task) #提交任务即创建进程
    
        #任务执行完成后不会被销毁
        time.sleep(2)
        for i in range(10):
            pool.submit(task)   #新的任务也会使用之前创建好的进程来执行
    
    
    >>>:
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程号为43732 is running
    进程号为43756 is running
    进程号为43704 is running
    进程池的使用
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    pool = ThreadPoolExecutor(3)
    
    
    def task():
        time.sleep(1) 
        print("线程号%s is running "%current_thread().name)
        
    
    for i in range(10):
        pool.submit(task)
    
    
    >>>:
    线程号ThreadPoolExecutor-0_0 is running 
    线程号ThreadPoolExecutor-0_1 is running 
    线程号ThreadPoolExecutor-0_2 is running 
    线程号ThreadPoolExecutor-0_1 is running 
    线程号ThreadPoolExecutor-0_0 is running 
    线程号ThreadPoolExecutor-0_2 is running 
    线程号ThreadPoolExecutor-0_0 is running 
    线程号ThreadPoolExecutor-0_2 is running 
    线程号ThreadPoolExecutor-0_1 is running 
    线程号ThreadPoolExecutor-0_2 is running 
    线程池的使用

    进程池线程池 与信号量的区别

      进程线程池中的进程线程一旦被创建,除非系统重启,池子中的进程线程就不会被销毁,也就是说,每一次运行的进程线程都是来自于进程池线程池的进程线程,而且从头到尾都只是开始创建的进程线程,不会产生新的

      信号量指的是产生一堆线程,也就是说每次执行的线程可能都是不一样的,只是每一次执行的线程个数受到限制而已。

    九 同步异步 - 阻塞非阻塞

    阻塞非阻塞指的是程序的运行状态

      阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称之为阻塞!

      非阻塞:程序正常运行没有遇到IO操作,或者通过某种方式使程序即使遇到了不会停在原地,还可以执行其他才操作,以提高CPU的占用率。

    同步-异步 指的是提交任务的方式

      同步调用:发起任务后必须在原地等待任务完成,才能继续执行。

      异步调用:发起任务后不用等待任务执行,可以立即开启执行其他操作。

      同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会!

      异步效率高于同步,但是并不是所有的任务都是可以异步执行的,判断一个任务是否可以异步的条件是:任务发起方是否立即需要执行结果。

      同步不等于阻塞,异步不等于非阻塞。

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
            objs.append(res_obj)
    
    # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
    pool.shutdown(wait=True)
    
    # 从结果对象中取出执行结果
    for res_obj in objs:
        print(res_obj.result())
    print("over")
    程序中的异步调用并获取结果
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
            print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
    print("over")
    程序中的同步调用并获取结果
  • 相关阅读:
    数据库中Schema(模式)概念的理解
    git错误处理
    mysql存储过程
    bunyan
    golang 小问题
    操作系统
    数据库优化
    内存控制
    MySQL优化2
    mysql优化1
  • 原文地址:https://www.cnblogs.com/846617819qq/p/10209448.html
Copyright © 2011-2022 走看看