zoukankan      html  css  js  c++  java
  • 并发编程之多线程

    并发编程之多线程

    一 threading模块介绍

    multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

    官网链接:https://docs.python.org/3/library/threading.html?highlight=threading#

    二 开启线程的两种方式

    img

    [复制代码](javascript:void(0)

    #方式一
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('太白',))
        t.start()
        print('主线程')
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    #方式二
    from threading import Thread
    import time
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')
    

    [复制代码](javascript:void(0)

    img

    三、 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello')
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        hello
        主线程/主进程
        '''
    
        #在主进程下开启子进程
        t=Process(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        主线程/主进程
        hello
        '''
    谁的开启速度快
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/主进程pid',os.getpid())
    
        #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())
    瞅一瞅pid
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from  threading import Thread
    from multiprocessing import Process
    import os
    def work():
        global n
        n=0
    
    if __name__ == '__main__':
        # n=100
        # p=Process(target=work)
        # p.start()
        # p.join()
        # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
    
        n=1
        t=Thread(target=work)
        t.start()
        t.join()
        print('主',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据
    同一进程内的线程共享该进程的数据?
    

    [复制代码](javascript:void(0)

    小练习

      socket相关练习

    img

    [复制代码](javascript:void(0)

    #_*_coding:utf-8_*_
    #!/usr/bin/env python
    import multiprocessing
    import threading
    
    import socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    
    def action(conn):
        while True:
            data=conn.recv(1024)
            print(data)
            conn.send(data.upper())
    
    if __name__ == '__main__':
    
        while True:
            conn,addr=s.accept()
    
    
            p=threading.Thread(target=action,args=(conn,))
            p.start()
    
    多线程并发的socket服务端
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    #_*_coding:utf-8_*_
    #!/usr/bin/env python
    
    
    import socket
    
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.connect(('127.0.0.1',8080))
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        s.send(msg.encode('utf-8'))
        data=s.recv(1024)
        print(data)
    
    客户端
    

    [复制代码](javascript:void(0)

      一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    msg_l=[]
    format_l=[]
    def talk():
        while True:
            msg=input('>>: ').strip()
            if not msg:continue
            msg_l.append(msg)
    
    def format_msg():
        while True:
            if msg_l:
                res=msg_l.pop()
                format_l.append(res.upper())
    
    def save():
        while True:
            if format_l:
                with open('db.txt','a',encoding='utf-8') as f:
                    res=format_l.pop()
                    f.write('%s\n' %res)
    
    if __name__ == '__main__':
        t1=Thread(target=talk)
        t2=Thread(target=format_msg)
        t3=Thread(target=save)
        t1.start()
        t2.start()
        t3.start()
    

    [复制代码](javascript:void(0)

    四 线程相关的其他方法

    [复制代码](javascript:void(0)

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    import threading
    from multiprocessing import Process
    import os
    
    def work():
        import time
        time.sleep(3)
        print(threading.current_thread().getName())
    
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
    
        print(threading.current_thread().getName())
        print(threading.current_thread()) #主线程
        print(threading.enumerate()) #连同主线程在内有两个运行的线程
        print(threading.active_count())
        print('主线程/主进程')
    
        '''
        打印结果:
        MainThread
        <_MainThread(MainThread, started 140735268892672)>
        [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
        主线程/主进程
        Thread-1
        '''
    

    [复制代码](javascript:void(0)

    主线程等待子线程结束

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        t.join()
        print('主线程')
        print(t.is_alive())
        '''
        egon say hello
        主线程
        False
        '''
    

    [复制代码](javascript:void(0)

    五 守护线程

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    需要强调的是:运行完毕并非终止运行

    #1.对主进程来说,运行完毕指的是主进程代码运行完毕
    
    #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
    

    详细解释:

    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    

    [复制代码](javascript:void(0)

    复制代码

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.setDaemon(True) #必须在t.start()之前设置
        t.start()
    
        print('主线程')
        print(t.is_alive())
        '''
        主线程
        True
        '''
    

    复制代码

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
    
    迷惑人的例子
    

    [复制代码](javascript:void(0)

    六 互斥锁(同步锁)

    多线程的同步锁与多进程的同步锁是一个道理,就是多个线程抢占同一个数据(资源)时,我们要保证数据的安全,合理的顺序。

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    import time
    
    x = 100
    def task():
        global x
        temp = x
        time.sleep(0.1)
        temp -= 1
        x = temp
    
    
    
    if __name__ == '__main__':
        t_l1 = []
        for i in range(100):
            t = Thread(target=task)
            t_l1.append(t)
            t.start()
    
        for i in t_l1:
            i.join()
        print(f'主{x}')
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from threading import Lock
    import time
    
    x = 100
    lock = Lock()
    
    def task():
        global x
        lock.acquire()
        temp = x
        time.sleep(0.1)
        temp -= 1
        x = temp
        lock.release()
    
    
    if __name__ == '__main__':
        t_l1 = []
        for i in range(100):
            t = Thread(target=task)
            t_l1.append(t)
            t.start()
    
        for i in t_l1:
            i.join()
        print(f'主{x}')
    
    

    [复制代码](javascript:void(0)

    七 死锁现象与递归锁

    进程也有死锁与递归锁,进程的死锁和递归锁与线程的死锁递归锁同理。

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from threading import Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    
    
    class MyThread(Thread):
    
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到A锁')
    
            lock_B.acquire()
            print(f'{self.name}拿到B锁')
            lock_B.release()
    
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到B锁')
            time.sleep(0.1)
    
            lock_A.acquire()
            print(f'{self.name}拿到A锁')
            lock_A.release()
    
            lock_B.release()
    
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    
        print('主....')
    
    

    [复制代码](javascript:void(0)

    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from threading import RLock
    import time
    
    lock_A = lock_B = RLock()
    
    
    class MyThread(Thread):
        
        def run(self):
            self.f1()
            self.f2()
        
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}拿到A锁')
            
            lock_B.acquire()
            print(f'{self.name}拿到B锁')
            lock_B.release()
            
            lock_A.release()
        
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}拿到B锁')
            time.sleep(0.1)
            
            lock_A.acquire()
            print(f'{self.name}拿到A锁')
            lock_A.release()
            
            lock_B.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
        
        print('主....')
        
    
    

    [复制代码](javascript:void(0)

    八 信号量Semaphore

    同进程的一样

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

    img

    [复制代码](javascript:void(0)

    from threading import Thread
    from threading import Semaphore
    from threading import current_thread
    import time
    import random
    
    sem = Semaphore(5)
    
    def go_public_wc():
        sem.acquire()
        print(f'{current_thread().getName()} 上厕所ing')
        time.sleep(random.randint(1,3))
        sem.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=go_public_wc)
            t.start()
    
    

    [复制代码](javascript:void(0)

    九 Python GIL(Global interpreter Lock)

    首先,一些语言(java、c++、c)是支持同一个进程中的多个线程是可以应用多核CPU的,也就是我们会听到的现在4核8核这种多核CPU技术的牛逼之处。那么我们之前说过应用多进程的时候如果有共享数据是不是会出现数据不安全的问题啊,就是多个进程同时一个文件中去抢这个数据,大家都把这个数据改了,但是还没来得及去更新到原来的文件中,就被其他进程也计算了,导致数据不安全的问题啊,所以我们是不是通过加锁可以解决啊,多线程大家想一下是不是一样的,并发执行就是有这个问题。但是python最早期的时候对于多线程也加锁,但是python比较极端的(在当时电脑cpu确实只有1核)加了一个GIL全局解释锁,是解释器级别的,锁的是整个线程,而不是线程里面的某些数据操作,每次只能有一个线程使用cpu,也就说多线程用不了多核,但是他不是python语言的问题,是CPython解释器的特性,如果用Jpython解释器是没有这个问题的,Cpython是默认的,因为速度快,Jpython是java开发的,在Cpython里面就是没办法用多核,这是python的弊病,历史问题,虽然众多python团队的大神在致力于改变这个情况,但是暂没有解决。(这和解释型语言(python,php)和编译型语言有关系吗???待定!,编译型语言一般在编译的过程中就帮你分配好了,解释型要边解释边执行,所以为了防止出现数据不安全的情况加上了这个锁,这是所有解释型语言的弊端??)

      img

        但是有了这个锁我们就不能并发了吗?当我们的程序是偏计算的,也就是cpu占用率很高的程序(cpu一直在计算),就不行了,但是如果你的程序是I/O型的(一般你的程序都是这个)(input、访问网址网络延迟、打开/关闭文件读写),在什么情况下用的到高并发呢(金融计算会用到,人工智能(阿尔法狗),但是一般的业务场景用不到,爬网页,多用户网站、聊天软件、处理文件),I/O型的操作很少占用CPU,那么多线程还是可以并发的,因为cpu只是快速的调度线程,而线程里面并没有什么计算,就像一堆的网络请求,我cpu非常快速的一个一个的将你的多线程调度出去,你的线程就去执行I/O操作了,

      详细的GIL锁介绍:https://www.cnblogs.com/jin-xin/articles/11232225.html

    十 GIL锁与Lock的关系

    [复制代码](javascript:void(0)

    GIL VS Lock
    
        机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 
    
     首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据
    
        然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
    
     最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
    
    过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限
    
      线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果
    
      既然是串行,那我们执行
    
      t1.start()
    
      t1.join
    
      t2.start()
    
      t2.join()
    
      这也是串行执行啊,为何还要加Lock呢,需知join是等待t1所有的代码执行完,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码。
    
    

    复制代码

    [复制代码](javascript:void(0)

    详解:

    [复制代码](javascript:void(0)

    因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。
    
    

    [复制代码](javascript:void(0)

    十一 Event(事件)

    同进程的一样

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    [复制代码](javascript:void(0)

    复制代码

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    
    

    复制代码

    [复制代码](javascript:void(0)

    img

    例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

    十二 条件Condition(了解)

    使得线程等待,只有满足某条件时,才释放n个线程,看一下大概怎么用就可以啦~~

    img

    [复制代码](javascript:void(0)

    import time
    from threading import Thread,RLock,Condition,current_thread
    
    def func1(c):
        c.acquire(False) #固定格式
        # print(1111)
    
        c.wait()  #等待通知,
        time.sleep(3)  #通知完成后大家是串行执行的,这也看出了锁的机制了
        print('%s执行了'%(current_thread().getName()))
    
        c.release()
    
    if __name__ == '__main__':
        c = Condition()
        for i in range(5):
            t = Thread(target=func1,args=(c,))
            t.start()
    
        while True:
            num = int(input('请输入你要通知的线程个数:'))
            c.acquire() #固定格式
            c.notify(num)  #通知num个线程别等待了,去执行吧
            c.release()
    
    #结果分析: 
    # 请输入你要通知的线程个数:3
    # 请输入你要通知的线程个数:Thread-1执行了 #有时候你会发现的你结果打印在了你要输入内容的地方,这是打印的问题,没关系,不影响
    # Thread-3执行了
    # Thread-2执行了
    
    示例代码
    
    

    [复制代码](javascript:void(0)

    十三 定时器(了解)

    定时器,指定n秒后执行某个操作,这个做定时任务的时候可能会用到。

    img

    [复制代码](javascript:void(0)

    from threading import Thread,Event
    import threading
    import time,random
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise TimeoutError('链接超时')
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(0.5)
            count+=1
        print('<%s>链接成功' %threading.current_thread().getName())
    
    
    def check_mysql():
        print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
        time.sleep(random.randint(2,4))
        event.set()
    if __name__ == '__main__':
        event=Event()
        conn1=Thread(target=conn_mysql)
        conn2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()
    
    

    [复制代码](javascript:void(0)

    十四 线程队列

      线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢?

      queue队列 :使用import queue,用法与进程Queue一样

      queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

    • class queue.Queue(maxsize=0) #先进先出

    img

    [复制代码](javascript:void(0)

    import queue #不需要通过threading模块里面导入,直接import queue就可以了,这是python自带的
    #用法基本和我们进程multiprocess中的queue是一样的
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put_nowait() #没有数据就报错,可以通过try来搞
    print(q.get())
    print(q.get())
    print(q.get())
    # q.get_nowait() #没有数据就报错,可以通过try来搞
    '''
    结果(先进先出):
    first
    second
    third
    '''
    
    先进先出示例代码
    
    

    [复制代码](javascript:void(0)

      class queue.LifoQueue(maxsize=0) #last in fisrt out

    img

    [复制代码](javascript:void(0)

    import queue
    
    q=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put_nowait()
    
    print(q.get())
    print(q.get())
    print(q.get())
    # q.get_nowait()
    '''
    结果(后进先出):
    third
    second
    first
    '''
    
    先进后出示例代码
    
    

    [复制代码](javascript:void(0)

      class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    img

    [复制代码](javascript:void(0)

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((-10,'a'))
    q.put((-5,'a'))  #负数也可以
    # q.put((20,'ws'))  #如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序
    # q.put((20,'wd'))
    # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
    # q.put((20,('w',1)))  #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序
    
    q.put((20,'b'))
    q.put((20,'a'))
    q.put((0,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    '''
    
    优先级队列示例代码
    
    

    [复制代码](javascript:void(0)

    这三种队列都是线程安全的,不会出现多个线程抢占同一个资源或数据的情况。

    十五 Python标准模块--concurrent.futures

    到这里就差我们的线程池没有讲了,我们用一个新的模块给大家讲,早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,他俩用法上是一样的。

    为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以直接用他们两个了

    [复制代码](javascript:void(0)

    复制代码

    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.
    
    #2 基本方法
    #submit(fn, *args, **kwargs)
    异步提交任务
    
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    #result(timeout=None)
    取得结果
    
    #add_done_callback(fn)
    回调函数
    
    

    复制代码

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    import time
    import os
    import threading
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    def func(n):
        time.sleep(2)
        print('%s打印的:'%(threading.get_ident()),n)
        return n*n
    tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
    # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改
    #异步执行
    t_lst = []
    for i in range(5):
        t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):  可以传任意形式的参数
        t_lst.append(t)  #
        # print(t.result())
        #这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
    tpool.shutdown() #起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
    print('主线程')
    for ti in t_lst:
        print('>>>>',ti.result())
    
    # 我们还可以不用shutdown(),用下面这种方式
    # while 1:
    #     for n,ti in enumerate(t_lst):
    #         print('>>>>', ti.result(),n)
    #     time.sleep(2) #每个两秒去去一次结果,哪个有结果了,就可以取出哪一个,想表达的意思就是说不用等到所有的结果都出来再去取,可以轮询着去取结果,因为你的任务需要执行的时间很长,那么你需要等很久才能拿到结果,通过这样的方式可以将快速出来的结果先拿出来。如果有的结果对象里面还没有执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪一个的结果,可以通过枚举enumerate来搞,记录你是哪一个位置的结果对象的结果已经被取过了,取过的就不再取了
    
    #结果分析: 打印的结果是没有顺序的,因为到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,但是最后的我们通过结果对象取结果的时候拿到的是有序的,因为我们主线程进行for循环的时候,我们是按顺序将结果对象添加到列表中的。
    # 37220打印的: 0
    # 32292打印的: 4
    # 33444打印的: 1
    # 30068打印的: 2
    # 29884打印的: 3
    # 主线程
    # >>>> 0
    # >>>> 1
    # >>>> 4
    # >>>> 9
    # >>>> 16
    
    ThreadPoolExecutor的简单使用
    
    

    [复制代码](javascript:void(0)

    ProcessPoolExecutor的使用:

    只需要将这一行代码改为下面这一行就可以了,其他的代码都不用变
    tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
    # tpool = ProcessPoolExecutor(max_workers=5)
    
    你就会发现为什么将线程池和进程池都放到这一个模块里面了,用法一样
    
    

    img

    [复制代码](javascript:void(0)

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import threading
    import os,time,random
    def task(n):
        print('%s is runing' %threading.get_ident())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        s = executor.map(task,range(1,5)) #map取代了for+submit
        print([i for i in s])
    
    map的使用
    
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    import time
    import os
    import threading
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    def func(n):
        time.sleep(2)
        return n*n
    
    def call_back(m):
        print('结果为:%s'%(m.result()))
    
    tpool = ThreadPoolExecutor(max_workers=5)
    t_lst = []
    for i in range(5):
        t = tpool.submit(func,i).add_done_callback(call_back)
    
    回调函数简单应用
    
    

    [复制代码](javascript:void(0)

    img

    [复制代码](javascript:void(0)

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        # p=Pool(3)
        # for url in urls:
        #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
        # p.close()
        # p.join()
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    
    回调函数的应用,需要你自己去练习的
    
    

    [复制代码](javascript:void(0)

  • 相关阅读:
    Socket基础一
    MyBatisPlus【目录】
    MyBatis(十一)扩展:自定义类型处理器
    MyBatis(十一)扩展:批量操作
    MyBatis(十一)扩展:存储过程
    MyBatis(十一)扩展:分页插件PageHelper
    MyBatis(十)插件 4
    09月07日总结
    09月06日总结
    09月03日总结
  • 原文地址:https://www.cnblogs.com/ciquankun/p/11430538.html
Copyright © 2011-2022 走看看