zoukankan      html  css  js  c++  java
  • python 并发编程之多线程

    一、线程理论

    1.什么是线程

    多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

    所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

    2.进程与线程的区别

    • 同一进程内的多个线程共享该进程内的地址资源

    • 创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

    3.多线程应用举例

    开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。

    二、开启线程的两种方式

    方式一:

    import time, random
    from threading import Thread
    
    def task(name):
        print('%s is running' %name)
        time.sleep(random.randrange(1, 3))
        print('%s running end' %name)
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task, args=('gudon', ))
        t1.start()
        print('主线程。。。')
        
        
    ---------------------------打印结果-----------------------------
    gudon is running
    主线程。。。
    gudon running end
    

    方式二:

    import time, random
    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('%s is running' %self.name)
            time.sleep(random.randrange(1,3))
            print('%s running end' %self.name)
    
    
    if __name__ == "__main__":
        t1 = MyThread('Astro')
        t1.start()
        print('主线程......')
        
    ---------------------------打印结果-----------------------------
    Astro is running
    主线程......
    Astro running end
    

    multprocess 、threading 两个模块在使用方式上相似性很大

    二、进程与线程的区别

    1.开线程的开销远小于开进程的开销

    进程:

    import time, random
    from threading import Thread
    from multiprocessing import Process
    
    
    def task(name):
        print('%s is running' %name)
        time.sleep(random.randrange(1, 3))
        print('%s running end' %name)
    
    
    if __name__ == '__main__':
        p = Process(target=task, args=('Astro', ))
        p.start() # p.start ()将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程
        print('主...........')
        
    
    ---------------------------【进程】打印结果-----------------------------
    主...........
    Astro is running
    Astro running end
    

    线程:

    import time, random
    from threading import Thread
    from multiprocessing import Process
    
    def task(name):
        print('%s is running' %name)
        time.sleep(random.randrange(1, 3))
        print('%s running end' %name)
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task, args=('Astro', ))
        t1.start() #几乎是t.start ()的同时就将线程开启了,线程的创建开销要小鱼进程创建的开销
        print('主...........')
        
    ---------------------------【线程】打印结果-----------------------------
    
    Astro is running
    主...........
    Astro running end
    

    2.同一进程内的多个线程共享该进程的地址空间

    from threading import Thread
    from multiprocessing import Process
    
    n = 100
    def task():
        global n
        n = 0
    
    if __name__ == "__main__":
        p1 = Process(target=task)
        p1.start()
        p1.join()
    
        print('主 %s' %n)  
        
    ---------------------------打印结果-----------------------------
    主 100
    

    子进程 p1 创建的时候,会把主进程中的数据复制一份,进程之间数据不会互相影响,所以子进程p1 中 n=0 后,只是子进程中的 n 改了,主进程的 n 不会受影响 ,即 进程之间地址空间是隔离的

    from threading import Thread
    from multiprocessing import Process
    
    n = 100
    def task():
        global n
        n = 0
    
    if __name__ == "__main__":
        t1 = Thread(target=task)
        t1.start()
        t1.join()
    
        print('主 %s' %n)
    
    ---------------------------打印结果-----------------------------
    主 0
    

    同一进程内的线程之间共享进程内的数据,所以为 0

    3. pid

    pid 就是 process id ,进程的id号。

    开多个进程,每个进程都有不同的pid

    from multiprocessing import Process, current_process
    from threading import Thread
    import os
    
    def task():
       # print('子进程...', current_process().pid)  # 也可以使用 os.getpid()或 current_process().pid 来查看当前进程的pid,os.getppid() 可以查看当前进程的父进程的pid
        print('子进程PID:%s  父进程的PID:%s' % (os.getpid(), os.getppid()))
    
    if __name__ == '__main__':
        p1 = Process(target=task)
        p1.start()
    
        print('主线程', current_process().pid)
        
        
    ---------------------------打印结果-----------------------------
    主线程 808
    子进程PID:7668  父进程的PID:808
    

    在主进程下开启多个线程,每个线程都跟主进程的pid一样

    from threading import Thread
    import os
    def task():
        print('子线程pid:',os.getpid())
    
    if __name__ == '__main__':
        t1 = Thread(target=task, )
        t1.start()
    
        print('主线程pid:', os.getpid())
    
    ---------------------------打印结果-----------------------------
    子线程pid: 9084
    主线程pid: 9084
    

    三、Thread对象的其他属性或方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    

    from threading import Thread, currentThread,active_count,enumerate
    import time
    def task():
        print('%s is running '%currentThread().getName())
        time.sleep(2)
        print('%s is done' %currentThread().getName())
    
    
    if __name__ == "__main__":
        t = Thread(target=task, )
        # t = Thread(target=task, name='子线程001')  # 也可以在这里改子线程名字
        t.start()
        # t.setName('子线程001')
        t.join()
        currentThread().setName('主线程')
        print(active_count())  # 返回正在运行的线程数量
        print(enumerate())  # [<_MainThread(主线程, started 6904)>]     默认为:[<_MainThread(MainThread, started 6904)>]
        
        
    ---------------------------打印结果-----------------------------
    子线程001 is running 
    子线程001 is done
    1
    [<_MainThread(主线程, started 6432)>]
    
    

    四、守护线程与互斥锁

    1.守护线程

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    1、对主进程来说,运行完毕指的是主进程代码运行完毕

    主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    

    2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

    主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    

    from threading import Thread
    import time
    
    def sayHi(name):
        time.sleep(2)
        print('%s say Hello')
    
    if __name__ == '__main__':
        t = Thread(target=sayHi, args=('Astro', ))
        t.setDaemon(True)  # 设置为守护线程, t.daemon = True 也可以
        t.start()
    
        print('主线程')
        print(t.is_alive())
        
    ----------------------执行结果---------------------------
    主线程
    True
    

    t.start() 后,系统会马上创建出一条子线程,但是由于子线程中 time.sleep(2) ,2秒的时间对计算机来说已经很长了,所以在

    print('主线程')
    print(t.is_alive())
    

    执行后,主线程结束,子线程跟着就结束了,因为我们设置了该子线程为守护线程

    在看一个例子:

    def foo():
        print('foo runing..')
        time.sleep(1)
        print('foo end')
    
    def bar():
        print('bar running..')
        time.sleep(3)
        print('bar end')
    
    
    if __name__ == '__main__':
        t1 = Thread(target=foo, )
        t2 = Thread(target=bar, )
    
        t1.setDaemon(True)
        t1.start()
        t2.start()
    
        print('main.......')
        
    ----------------------执行结果---------------------------	
    foo runing..
    bar running..
    main.......
    foo end
    bar end
    
    

    t1 为守护进程,t1.start() 后,马上执行 foo函数,然后 time.sleep(1)

    此时 t2.start() 开启了线程,执行 bar函数后 time.sleep(3)

    3秒的时间内,执行了主线程的 print('main......') , 然后主线程任务已经完成,但是由于 子线程t2 还未执行完毕,t2 非守护线程,主线程还需要等待 非守护线程t2运行完毕后才能结束,所以在等待 t2结束的时间里,t1 线程执行完毕,t1只sleep 1秒,时间上足够t1 先执行了

    然后t2 三秒后执行自己剩下的部分

    2.互斥锁

    from threading import Thread, Lock
    import time
    
    n = 100
    def task(mutex):
        global n
        mutex.acquire()  # 加互斥锁
        temp = n
        time.sleep(0.1)
        n = temp - 1
        mutex.release()
    
    if __name__ == '__main__':
        mutex = Lock()
        t_list = []
        for i in range(100):
            t = Thread(target=task, args=(mutex, ))
            t_list.append(t)
            t.start()
    
        for t in t_list:
            t.join()
    
        print('main....',n)
     
    ----------------------执行结果---------------------------
    main.... 0
    

    如果不加互斥锁的情况下,得到的结果是 main.... 99

    因为,在循环中 t.start() 的时候,100个线程会立马创建出来,然后在函数中,100个线程的 temp都被赋值为了 100,所以 n = temp - 1 只是循环的被赋值为 99 而已

    另外,互斥锁只保证里局部的串行,与join 不一样

    五、GIL 全局解释器锁

    结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

    首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。>有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

    1.GIL介绍

    GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

    可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

    要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程

    运行这段代码

    test.py

    import os, time
    print(os.getpid())  # 5132
    time.sleep(1000)
    
    

    然后cmd 查看一下 tasklist |findstr python

    结果:

    python.exe                    5132 Console                    1      9,568 K
    

    运行该程序,只会产生一个进程

    在一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内.

    1、所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
    例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
    
    2、所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
    

    综上:

    如果多个线程的target=work,那么执行流程是

    多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行

    解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码

    2. GIL与Lock

    一个问题:Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?

    首先,我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

    然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

    最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock,如下图

    分析:

    1、100个线程去抢GIL锁,即抢执行权限
    2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
    3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
    4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程
    

    代码:

    from threading import Thread,Lock
    import os,time
    def work():
        global n
        lock.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        n=100
        l=[]
        for i in range(100):
            p=Thread(target=work)
            l.append(p)
            p.start()
        for p in l:
            p.join()
    
        print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全,不加锁则结果可能为99
    

    3.GIL 与 多线程

    有了GIL的存在,同一时刻同一进程中只有一个线程被执行

    进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,这是个问题!!!

    要解决这个问题,我们需要在几个点上达成一致:

    1、cpu到底是用来做计算的,还是用来做I/O的?
    
    2、多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能
    
    3、每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处
    

    一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。

    如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活,

    反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高

    结论:

    1、对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
    2、当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地

    假设我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:

    方案一:开启四个进程
    方案二:一个进程下,开启四个线程
    

    单核情况下,分析结果:

    如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
    如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
    

    多核情况下,分析结果:

    如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
    如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
    

    结论:

    现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

    4.多线程性能测试

    如果并发的多个任务是计算密集型:多进程效率高

    from multiprocessing import Process
    from threading import Thread
    import time, os
    
    def work():
        res = 0
        for i in range(100000000):
            res *= i
    
    
    if __name__ == '__main__':
        l = []
        print(os.cpu_count()) # 4核
        start = time.time()
        for i in range(4):
            # p = Process(target=work)  # 进程: 10.46 s
            p = Thread(target=work)  # 线程: 20.67 s
            l.append(p)
            p.start()
    
        for p in l:
            p.join()
    
        stop = time.time()
        print('run time is %s' %(stop - start))
        
    
    

    如果并发的多个任务是I/O密集型:多线程效率高

    # 如果并发的多个任务是I/O密集型:多线程效率高
    from multiprocessing import Process
    from threading import Thread
    import time, os
    
    def work():
        time.sleep(2)
        print("^_^ ---> ")
    
    if __name__ == '__main__':
        l = []
        start = time.time()
        for i in range(400):
            # p = Process(target=work)  # 进程:30.68 ,大部分时间耗费在创建进程上
            p = Thread(target=work)  # 线程:2.04
            l.append(p)
            p.start()
    
        for p in l:
            p.join()
    
        stop = time.time()
        print('run time is %s' % (stop - start))
    

    尾巴:

    • 多线程用于IO密集型,如socket,爬虫,web
    • 多进程用于计算密集型,如金融分析

    六、死锁与递归锁

    死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

    #互斥锁只能 acquire 一次
    from threading import Thread,Lock
    import time
    
    mutexA = Lock()
    mutexB = Lock()
    
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(0.1)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
    
    ------打印结果----
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-2 拿到了A锁
    
    

    程序卡住了!

    分析: t.start() , 执行 f1() ,然后 Thread-1 拿到A锁,再拿到B锁,再释放B锁,再释放A锁

    接着 Thread-1 执行 f2() ,在 f2() 中拿到B锁,然后 睡了 0.1s , 0.1s 的时间里, Thread-2已经启动了

    Thread-2 这个是时候 执行 f1(), 在f1() 中拿到A锁,当Thread-2去拿B锁的时候

    这个时候 B锁在Thread-1 手里还没释放

    0.1s 过去了,Thread-1 要在 f2() 中去拿 A锁,哎呀,发现 A 锁刚刚被Thread-2 在 f1() 里拿到了

    导致了各自想要的锁都在对方手里,而自己现在又没法释放对方需要的锁,导致了死锁

    递归锁:

    归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    # 递归锁 : 可以连续 require 多次, 内部有一个计数器,每acquire一次,计数器 +1, 每release一次 -1,只有计数器为0 时候,才能被其他线程抢到acquire
    from threading import Thread, RLock
    import time
    
    mutexA = mutexB = RLock()
    
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(0.1)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
    
    ---执行结果---
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-1 拿到了A锁
    Thread-2 拿到了A锁
    Thread-2 拿到了B锁
    Thread-2 拿到了B锁
    Thread-2 拿到了A锁
    Thread-4 拿到了A锁
    Thread-4 拿到了B锁
    Thread-4 拿到了B锁
    Thread-4 拿到了A锁
    Thread-6 拿到了A锁
    Thread-6 拿到了B锁
    Thread-6 拿到了B锁
    Thread-6 拿到了A锁
    Thread-8 拿到了A锁
    Thread-8 拿到了B锁
    Thread-8 拿到了B锁
    Thread-8 拿到了A锁
    Thread-10 拿到了A锁
    Thread-10 拿到了B锁
    Thread-10 拿到了B锁
    Thread-10 拿到了A锁
    Thread-5 拿到了A锁
    Thread-5 拿到了B锁
    Thread-5 拿到了B锁
    Thread-5 拿到了A锁
    Thread-9 拿到了A锁
    Thread-9 拿到了B锁
    Thread-9 拿到了B锁
    Thread-9 拿到了A锁
    Thread-7 拿到了A锁
    Thread-7 拿到了B锁
    Thread-3 拿到了A锁
    Thread-3 拿到了B锁
    Thread-3 拿到了B锁
    Thread-3 拿到了A锁
    Thread-7 拿到了B锁
    Thread-7 拿到了A锁
    

    尾巴:

    递归锁可以连续acquire多次,而互斥锁只能acquire一次


    七、信号量、Event、定时器

    1. 信号量

    信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小

    from threading import Thread, Semaphore, currentThread
    import time, random
    sm = Semaphore(3)
    def task():
        # sm.acquire()
        # print('%s in' %currentThread().getName())
        # time.sleep(random.randint(1,3))
        # sm.release()
        with sm:
            print('%s in' %currentThread().getName())
            time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = Thread(target=task)
            t.start()
            
    --- 执行结果 ---
    Thread-1 in
    Thread-2 in
    Thread-3 in
    Thread-4 in
    Thread-5 in
    Thread-6 in
    Thread-8 in
    Thread-7 in
    Thread-9 in
    Thread-10 in
    

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    2.Event

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    from threading import Event
    
    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    
    

    例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

    from threading import Thread, Event
    import time
    
    event = Event()
    
    def student(name):
        print('学生%s 正在听课' %name)
        event.wait()
        # event.wait(2)  #wait 可以设置一个超时的时间,即 2s 后,里吗执行下面的语句,不用等待 event.set() 的信号了
        print('学生%s 下课' %name)
    
    def teacher(name):
        print('老师%s 正在讲课' %name)
        time.sleep(5)
        event.set() # 给上面的 event.wait() 发送一个信号
    
    if __name__ == '__main__':
        stu1 = Thread(target=student, args=('鸣人', ))
        stu2 = Thread(target=student, args=('佐助', ))
        stu3 = Thread(target=student, args=('小樱', ))
        t1 = Thread(target=teacher, args=('卡卡西', ))
    
        stu1.start()
        stu2.start()
        stu3.start()
        t1.start()
        
    --- 执行结果 ---
    学生鸣人 正在听课
    学生佐助 正在听课
    学生小樱 正在听课
    老师卡卡西 正在讲课  
    (5s 后打印下面的内容)
    学生小樱 下课
    学生佐助 下课
    学生鸣人 下课
    

    event.wait(2) #wait 可以设置一个超时的时间,即 2s 后,里吗执行下面的语句,不用等待 event.set() 的信号了

    from threading import Thread, Event, currentThread
    import time
    
    event = Event()
    
    def conn():
        n = 0
        while  not event.is_set(): # 当event 没有set 的时候
            if n == 3:
                print('%s try too many times' %currentThread().getName())
                return
            print('%s try %s' %(currentThread().getName(), n))
            event.wait(0.5)  # 设置超时时间
            n += 1
        print('%s is connected' %currentThread().getName())
    
    def check():
        print('%s is checking' %currentThread().getName())
        time.sleep(5)
        event.set()
    
    if __name__ == "__main__":
        for i in range(3):
            t = Thread(target=conn)
            t.start()
    
        t = Thread(target=check)
        t.start()
        
    --- 执行结果 ---
    Thread-1 try 0
    Thread-2 try 0
    Thread-3 try 0
    Thread-4 is checking
    Thread-3 try 1
    Thread-1 try 1
    Thread-2 try 1
    Thread-1 try 2
    Thread-2 try 2
    Thread-3 try 2
    Thread-1 try too many times
    Thread-2 try too many times
    Thread-3 try too many times
    
        
    

    3.定时器

    定时器,指定n秒后执行某操作

    from threading import Timer
    
    def task(name):
        print('%s is running...' %name)
    
    t = Timer(3, task, args=('Astro', ))
    t.start()
    
    --- 执行结果 ---
    3s 后打印
    Astro is running...
    
    from threading import Timer
    import random
    
    class Code:
        def __init__(self):
            self.make_cache()
    
        def make_cache(self, interval=5):
            self.cache = self.make_code()
            print(self.cache)
            self.t = Timer(interval, self.make_cache)
            self.t.start()
    
    
    
    
        def make_code(self, n=4):
            res = ''
            for i in range(n):
                s1 = str(random.randint(0, 9))
                s2 = chr(random.randint(65, 90)) # chr() 返回值是当前整数对应的ascii字符
                res += random.choice([s1, s2]) # random.choice() 返回随机项,传入一个数组,返回数组中随机一个元素;传入一个字符串,返回某个字符
            return res
    
        def check(self):
            while True:
                code = input('请输入验证码:').strip()
                if code.upper() == self.cache:
                    print('验证码正确')
                    self.t.cancel()
                    break
    
    
    c = Code()
    c.check()
    
    -- 打印结果 ---
    V1N6
    请输入验证码:v1n # 输入
    请输入验证码:B1TI
    b1ti # 输入
    验证码正确
    
    

    八、线程queue

    1. queue.Queue(maxsize=0) #队列:先进先出

    import queue
    
    q = queue.Queue(3) # 先进先出 队列
    q.put('one')
    q.put(2)
    q.put('three')
    # q.put(4) # 放第四个的时候会卡住
    # q.put(4, block=True) # 程序卡住
    # q.put(4,block=False) # 程序报错  queue.Full  == q.put_nowait(4)
    # q.put(4, block=True,timeout=3) # 3s内,队列里没有元素被取走,程序报错: queue.Full
    
    
    print(q.get())  # one
    print(q.get())  # 2
    print(q.get())  # three
    # print(q.get())  # 程序卡住
    # print(q.get(block=True))  # 程序卡住
    # print(q.get(block=False))  # 程序报错 queue.Empty
    # print(q.get_nowait())  # 程序报错 queue.Empty
    # print(q.get(block=True,timeout=3))  # 3s内,队列依然是empty,报错
    

    2. queue.LifoQueue(maxsize=0) 堆栈 后进先出

    import queue
    
    q = queue.LifoQueue(3)  # last in first out 后进先出  堆栈
    q.put(1)
    q.put(2)
    q.put(3)
    
    print(q.get())  # 3
    print(q.get())  # 2
    print(q.get())  # 1
    

    3. queue.PriorityQueue(maxsize=0) 优先级队列:存储数据时可设置优先级的队列

    import queue
    
    q = queue.PriorityQueue(3)  # 优先级队列
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((10, 'a'))
    q.put((40, 'b'))
    q.put((20, 'c'))
    
    print(q.get())  # (10, 'a')
    print(q.get())  # (20, 'c')
    print(q.get())  # (40, 'b')
    

    九、进程池 线程池

    1. 线程池与进程池

    在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制

    基本方法:

    1、submit(fn, *args, **kwargs)
    异步提交任务
    
    2、map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    3、shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    4、result(timeout=None)
    取得结果
    
    5、add_done_callback(fn)
    回调函数
    

    进程池:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    
    def task(name):
        print('name:%s pid:%s run...' %(name, os.getpid()))
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(4)
    
        for i in range(10):
            pool.submit(task, 'apple%s'%i)
            
        pool.shutdown(wait=True) # wait 默认为True ,类似join的效果,及所有进程执行完后,再执行主线程下面的代码
        print('主.....')
        
    
    ---执行结果----
    主.....
    name:apple0 pid:8312 run...
    name:apple1 pid:13256 run...
    name:apple2 pid:6088 run...
    name:apple3 pid:14296 run...
    
    name:apple4 pid:8312 run...
    name:apple5 pid:13256 run...
    name:apple6 pid:6088 run...
    name:apple7 pid:14296 run...
    
    name:apple8 pid:13256 run...
    name:apple9 pid:6088 run...
    
    

    可以看到, pid 就是 就是4个,进程池保证了 进程的数量为 4

    ​ ​

    线程池:

    线程池与进程池使用类似:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import currentThread
    import os,time,random
    
    def task():
        print('name:%s pid:%s run...' %(currentThread().getName(), os.getpid()))
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(5)
    
        for i in range(10):
            pool.submit(task)
    
        pool.shutdown(wait=True) # wait 默认为True ,类似join的效果,及所有进程执行完后,再执行主线程下面的代码
        print('主.....')
    
    --- 执行结果 ---
    name:ThreadPoolExecutor-0_0 pid:528 run...
    name:ThreadPoolExecutor-0_1 pid:528 run...
    name:ThreadPoolExecutor-0_2 pid:528 run...
    name:ThreadPoolExecutor-0_3 pid:528 run...
    name:ThreadPoolExecutor-0_4 pid:528 run...
    
    name:ThreadPoolExecutor-0_2 pid:528 run...
    name:ThreadPoolExecutor-0_4 pid:528 run...
    
    name:ThreadPoolExecutor-0_1 pid:528 run...
    name:ThreadPoolExecutor-0_0 pid:528 run...
    name:ThreadPoolExecutor-0_3 pid:528 run...
    主.....
    
    
    

    保证了 5 个线程的数量。

    2. 同步调用 和 异步调用

    提交任务的两种方式:
    1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

    from concurrent.futures import ThreadPoolExecutor
    import time, random
    
    
    def fly(name):
        print('%s is flying' %name)
        time.sleep(random.randint(3,5))
        res = random.randint(6, 13) * '#'
        return {'name': name, 'res':res}
    
    
    def weight(bird_data):
        name = bird_data['name']
        distance = len(bird_data['res'])
        print('%s 飞了 %s 米' %(name, distance))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(12)
    
        bird_data1 = pool.submit(fly, '青鸟').result()
        weight(bird_data1)
        bird_data2 = pool.submit(fly, '火烈鸟').result()
        weight(bird_data2)
        bird_data3 = pool.submit(fly, '鸽子').result()
        weight(bird_data3)
    
    --- 执行结果 ---
    青鸟 is flying
    
    青鸟 飞了 9 米
    火烈鸟 is flying
    
    火烈鸟 飞了 10 米
    鸽子 is flying
    
    鸽子 飞了 9 米
    

    2、异步调用:提交完任务后,不会原地等待任务执行完毕

    from concurrent.futures import ThreadPoolExecutor
    import time, random
    
    
    def fly(name):
        print('%s is flying' %name)
        time.sleep(random.randint(3,5))
        res = random.randint(6, 13) * '#'
        return {'name': name, 'res': res}
    
    
    def weight(bird_data):
        bird_data = bird_data.result()
        name = bird_data['name']
        distance = len(bird_data['res'])
        print('%s 飞了 %s 米' %(name, distance))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(12)
        #可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
        pool.submit(fly, '青鸟').add_done_callback(weight)
        pool.submit(fly, '火烈鸟').add_done_callback(weight)
        pool.submit(fly, '鸽子').add_done_callback(weight)
    
    ---  执行结果   ---
    青鸟 is flying
    火烈鸟 is flying
    鸽子 is flying
    
    青鸟 飞了 6 米
    火烈鸟 飞了 12 米
    鸽子 飞了 9 米
    

    用法举例: 服务端建立线程池设置线程数为2,在客户端可以开启3个,则只有两个客户端可以正常运行,第三个想要正常运行需要停止另外两个中的一个才可以

    # 客户端
    from socket import *
    
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1', 9969))
    
    while True:
        msg = input('>>:').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    

    #服务端
    from socket import *
    from concurrent.futures import ThreadPoolExecutor
    
    def communicate(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
    
        conn.close()
    
    def server(ip, port):
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind((ip, port))
        server.listen(5)
    
        while True:
            conn, addr = server.accept()
            pool.submit(communicate, conn)
            # t = Thread(target=communicate, args=(conn,))
            # t.start()
    
        server.close()
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(2)
        server('127.0.0.1', 9969)
    

    3. map 方法

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        executor.map(task, range(1, 12))  # map取代了for+submit
    
  • 相关阅读:
    jax + php 写入数据库最简单实例
    JavaScript--水平幻灯片
    深入理解CSS3 gradient斜向线性渐变
    js基础-1
    html5 Canvas 如何自适应屏幕大小
    清除浮动塌陷
    spring MVC配置
    dom4j使用
    Visio绘制时序图
    Eclipse中引来的jar包乱码
  • 原文地址:https://www.cnblogs.com/friday69/p/9615654.html
Copyright © 2011-2022 走看看