zoukankan      html  css  js  c++  java
  • 第七章|7.2并发编程|多线程

    1、线程

      在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

    线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于cpu),而一条流水线必须属于一个车间,一个车间的工作过程是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。

    所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

    多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    进程就是把资源给隔离开

    进程与线程区别:

    每启动一个进程都会有一个线程;进程只是资源单位,并不能真正执行,进程内开的那个线程才是真正的运行单位;

    一个进程内起多个线程,跨部门之间的线程是不共享数据的,隔着进程的,同一进程内多个线程间是共享该进程内的地址资源的

    创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)。

    多线程应用:

    开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。

    2、开启线程的两种方式

    # import time
    # import random
    # from threading import Thread
    #
    # def piao(name):
    #     print('%s piaoing' %name)
    #     time.sleep(random.randrange(1,5))
    #     print('%s piao end' %name)
    #
    # if __name__ == '__main__':
    #     t1=Thread(target=piao,args=('egon',))
    #     t1.start()
    #     print('主线程')
    #只要是开了个进程,只是开了个内存空间,其实它会自动创建个线程。上边一共有2个线程;
    
    
    import time
    import random
    from threading import Thread
    
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
        def run(self):
            print('%s piaoing' %self.name)
    
            time.sleep(random.randrange(1,5))
            print('%s piao end' %self.name)
    if __name__ == '__main__':
        t1=MyThread('egon')
        t1.start()
        print('')
    
    打印:
    egon piaoing
    主
    egon piao end
    #1开进程的开销远大于开线程
    import time
    from threading import Thread
    from multiprocessing import Process
    def piao(name):
        print("%s piaoing"%name)
        time.sleep(2)
        print("%s piao end"%name)
    if __name__ == "__main__":
        #p1 = Process(target=piao, args=('egon', )) #它要申请内存空间
        #p1.start()
        t1 = Thread(target=piao, args=('egon', ))
        t1.start()#信号发出以后,线程立马就起来了
        print('主线程')
    #打印: #如果是进程,先打印 主线程 再egon is piaoing egon is piao end

      egon is piaoing
      主线程
      egon is piao end

    #2同一个进程内多个线程共享该进程的地址空间
    from threading import Thread
    from multiprocessing import Process
    n = 100
    def task():
        global n
        n = 0
    if __name__ == "__main__":
        # p1 = Process(target=task, ) #它要申请内存空间
        # p1.start() #开一个子进程,会copy主进程的内存空间
        # p1.join()  #确保它执行完了,根本就不会走task函数里边的  打印出的是100
    
        t1 = Thread(target=task, )
        t1.start()#共享
        t1.join()
        print('主线程', n) #子进程改了,不影响主进程,改的是它自己内存空间的,先走子线程,再走主线程。打印的是0
    # 3、瞅一眼pid 
    from threading import Thread
    from multiprocessing import Process,current_process
    import os
    def task():
        # print(current_process().pid)  #查看线程id,7728  不能看父进程(用os)的
        print('子进程PID:%s  父进程的PID:%s' %(os.getpid(),os.getppid()))
    
    if __name__ == '__main__':
        p1=Process(target=task,)
        p1.start()
        # print('主进程',current_process().pid) #主进程 3088
        print('主进程',os.getpid())
    
    #打印
    主进程 3088
    子进程PID:7728  父进程的PID:3088
    from threading import Thread
    import os
    def task():
        print('子线程:%s' %(os.getpid())) #一个进程内的线程大家的地位是一样的
    if __name__ == '__main__':
        t1=Thread(target=task,)
        t1.start()
        print('主线程',os.getpid()) #这两个线程的同属于一个进程
    
    #打印:
    子线程:6220
    主线程 6220

    3、Thread对象的其他属性或方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    from threading import Thread,currentThread,active_count,enumerate
    import time
    def task():
        print('%s is ruuning' %currentThread().getName())  #打印: 子线程1 is ruuning
        time.sleep(2)
        print('%s is done' %currentThread().getName())
    if __name__ == '__main__':
        t=Thread(target=task,name='子线程1')
        t.start()
        # t.setName('儿子线程1') #设置名字  #打印: 儿子线程1 is done
        # t.join()
        # print(t.getName()) #t就是currentThread() #打印:儿子进程1
        # currentThread().setName('主线程')
        # print(t.isAlive())  #查看线程是否还活着,加了t.join就死掉了 False
    
        # print('主线程',currentThread().getName())  看下主线程用那个currentThread,查看当前线程名  打印 主线程 主线程
    
        # t.join()
        # print(active_count()) #活跃的线程数 只剩下主线程了,因为你join了 打印 1
        print(enumerate())  #把当前活跃的线程对象拿过来                   打印:[<_MainThread(主线程, started 6476)>]

     4、守护线程

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    需要强调的是:运行完毕并非终止运行

    1对主进程来说,运行完毕指的是主进程代码运行完毕
    
    2对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
    
    1.1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    
    2.1、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    from threading import Thread
    import time
    
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        # t.setDaemon(True) #必须在t.start()之前设置;两种守护线程设置方式,另外一种是下面
        t.daemon=True
        t.start() #造线程,立马就造出来了,睡2s就足够打印下面"主线程'了
    
        print('主线程') #2s把主线程都运行完了;主线程没有要等的了,然后就死掉了,守护进程跟着死,就不会打印上边那个say hello
        print(t.is_alive()) #打印这2s也足够它运行了
    
    打印:
    主线程
    True
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    def bar():
        print(456)
        time.sleep(3) #t2非守护进程要等待3s,睡1s开到“end123”,再睡2s看到“end456”
        print("end456")
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        #一共3个线程;t1为守护线程,t2为非守护线程;主线程运行完就盯着非守护进程运行完,主线程才运行完
        t1.daemon=True
        t1.start()
        t2.start()
        print("main-------")
    
    打印:
    123
    456
    main-------
    end123
    end456

    5、互斥锁

    把并行变成串行;将同时运行的多个任务变成一个一个执行,牺牲了效率,保证了数据的安全;保护不同的数据就要加不同的锁;

    局部串行,只针对共享数据的部分修改,让它们串行;

    #mutex
    from threading import Thread,Lock
    import time
    n = 100
    def task():
        global n
        mutex.acquire() #1个线程起来先去抢一把锁,在它睡0.1s时,其他99个线程同时会去抢锁,等第一个执行完后抢得锁,这时候n=99,然后再来回循环
        temp = n
        time.sleep(0.1) #睡0.1s就足够其他99个线程启动运行了;都停在这睡之前都拿到了n=100了
        n = temp - 1  #这100个数据都改成了99,数据变得不安全了,得加把锁
        mutex.release()
    if __name__ == '__main__':
        mutex = Lock()
        t_l = []
        for i in range(100):
            t = Thread(target=task) #在1个进程里边开了100个线程,共享空间的
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        print('', n)
    
    打印: #这时候降低了效率,保证了数据的安全,如果不加锁,结果是 主 990

    6、GIL的基本概念

      同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

    本质就是把互斥锁;启动一个py文件,就是启动了py解释器的一个进程;运行py程序

    对于cpython解释器:垃圾回收机制+定期开启销毁。

    对于cpython解释器要想用多核优势,就要开多个进程。

    在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

      GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

    要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程

    ############验证python test.py只会产生一个进程
    #test.py内容
    import os,time
    print(os.getpid())
    time.sleep(1000)
    
    #打开终端执行
    python3 test.py
    
    #在windows下查看
    tasklist |findstr python
    
    #在linux下下查看
    ps aux |grep python

    一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内

    1、所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
    例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
    2、所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。

    如果多个线程的target=work,那么执行流程是:

    多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行。

    GIL与自定义互斥锁

    锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据;

    然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

    GIL保护的是解释器级别跟垃圾回收机制有关的数据;

    mutex保护的是自己的数据;

    代码要想执行就是要给py解释器,用的C代码,解释器上加那个GIL锁;

    分析

    1、100个线程去抢GIL锁,即抢执行权限
    2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
    3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
    4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

     GIL与多线程

     有了GIL的存在,同一时刻同一个进程内的多个线程,只能有一个出来执行;

     进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,

    1、cpu到底是用来做计算的,还是用来做I/O的?
    
    2、多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能
    
    3、每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处
    1、对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
    2、当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地

    假设我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:

    方案一:开启四个进程
    方案二:一个进程下,开启四个线程
    

    单核情况下,分析结果:

    如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
    如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
    

    多核情况下,分析结果:

    如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
    如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
    

    结论:

    现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

    ##计算密集型,应该用多进程
    from multiprocessing import Process
    from threading import Thread
    import os,time
    def work():
        res=0
        for i in range(100000000):
            res*=i
    
    if __name__ == '__main__':
        l=[]
        print(os.cpu_count()) #本机为4核
        start=time.time()
        for i in range(4):
            p=Process(target=work) #多进程 耗时5s多 牺牲开进程的开销用上了多核优势
            #p=Thread(target=work) #多线程 耗时18s多
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop=time.time()
        print('run time is %s' %(stop-start))
    #I/O密集型用多线程
    from multiprocessing import Process
    from threading import Thread
    import threading
    import os,time
    def work():
        time.sleep(2)
        #print('===>')
    if __name__ == '__main__':
        l=[]
        print(os.cpu_count()) #本机为4核
        start=time.time()
        for i in range(400):
            #p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上
            p=Thread(target=work) #耗时2s多 ,消耗的就是来回切的时间
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop=time.time()
        print('run time is %s' %(stop-start))
    多线程用于IO密集型,如socket,爬虫,web
    多进程用于计算密集型,如金融分析

    7、死锁与递归锁

    死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

    # 死锁
    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()  #线程1拿到了B锁,还没人跟它抢,最多其他线程都去抢A锁
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(0.1)
    
            mutexA.acquire()  #线程1执行f2,拿到B锁后,又去拿A锁,但这个时候A锁在第二个进程里边拿着呢
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    
    打印:卡那了
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-2 拿到了A锁
    # 互斥锁只能acquire一次
    # from threading import Thread,Lock
    #
    # mutexA=Lock()
    #
    # mutexA.acquire()
    # mutexA.release()

    如何解决呢,用递归锁

    递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

    # 递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,其他线程才能被抢到acquire
    from threading import Thread,RLock
    import time
    
    mutexB=mutexA=RLock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()  #这个时候计数器为2,其他进程都不能跟它抢
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(7)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    打印:
    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    Thread-1 拿到了A锁
    Thread-2 拿到了A锁
    Thread-2 拿到了B锁
    Thread-2 拿到了B锁
    Thread-2 拿到了A锁
    Thread-4 拿到了A锁
    Thread-4 拿到了B锁
    Thread-4 拿到了B锁
    Thread-4 拿到了A锁
    Thread-6 拿到了A锁
    Thread-6 拿到了B锁
    Thread-6 拿到了B锁
    Thread-6 拿到了A锁
    Thread-8 拿到了A锁
    Thread-8 拿到了B锁
    Thread-8 拿到了B锁
    Thread-8 拿到了A锁
    Thread-10 拿到了A锁
    Thread-10 拿到了B锁
    Thread-10 拿到了B锁
    Thread-10 拿到了A锁
    Thread-5 拿到了A锁
    Thread-5 拿到了B锁
    Thread-5 拿到了B锁
    Thread-5 拿到了A锁
    Thread-9 拿到了A锁
    Thread-9 拿到了B锁
    Thread-9 拿到了B锁
    Thread-9 拿到了A锁
    Thread-7 拿到了A锁
    Thread-7 拿到了B锁
    Thread-7 拿到了B锁
    Thread-7 拿到了A锁
    Thread-3 拿到了A锁
    Thread-3 拿到了B锁
    Thread-3 拿到了B锁
    Thread-3 拿到了A锁

    8、信号亮

     信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小。

    from threading import Thread, Semaphore, currentThread
    import time, random
    sm = Semaphore(3) #有3个人可以抢到
    def task():
        # sm.acquire()
        # print('%s in'%currentThread().getName())
        # sm.release()
         #加锁也可以用一个上下文管理的方式如下
        with sm:
            print('%s in'%currentThread().getName())
            time.sleep(random.randint(1,3))
    if __name__ == '__main__':
        for i in range(10):
            t = Thread(target = task)
            t.start()
    
    打印
    Thread-1 in
    Thread-2 in
    Thread-3 in
    Thread-4 in
    Thread-5 in
    Thread-6 in
    Thread-7 in
    Thread-8 in
    Thread-9 in
    Thread-10 in

    9、Event事件

      线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

    from threading import Event
    
    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    #应用场景
    
    from threading import Thread, Event
    import time
    event = Event() ###event.wait() #一直在那等着;直到等到event.set()才结束了
    def student(name):
        print('学生%s 正在听课 '%name)
        event.wait(2) #可以设置个超时时间,过了3s即使没有给我发set信号我也可以接着干其他的
        print('学生%s 课间活动 '%name)
    def teacher(name):
        print('老师%s 正在授课 '%name)
        time.sleep(8)
        event.set()
    if __name__ == '__main__':
        stu1 = Thread(target=student, args=('kris', ))
        stu2 = Thread(target=student, args=('alex', ))
        stu3 = Thread(target=student, args=('alen', ))
        t1=Thread(target=teacher,args=('egon',))
    
        stu1.start()
        stu2.start()
        stu3.start()
        t1.start()
    
    打印:
    学生kris 正在听课 
    学生alex 正在听课 
    学生alen 正在听课 
    老师egon 正在授课 
    学生alex 课间活动 
    学生alen 课间活动 
    学生kris 课间活动
    from threading import Thread,Event,currentThread
    import time
    event=Event()
    
    def conn(): #尝试链接,检测是否链接成功;event事件,一个等,一个唤醒
        n=0
        while not event.is_set(): #循环的发请求
            if n == 3:
                print('%s try too many times' %currentThread().getName())
                return
            print('%s try %s' %(currentThread().getName(),n))
            event.wait(0.5) #等5s时间尝试,
            n+=1
    
        print('%s is connected' %currentThread().getName())
    
    def check(): #检测服务端是否正常运行
        print('%s is checking' %currentThread().getName())
        time.sleep(5)
        event.set()
    
    if __name__ == '__main__':
        for i in range(3):
            t=Thread(target=conn)
            t.start()
        t=Thread(target=check)
        t.start()
    打印:
    Thread-1 try 0
    Thread-2 try 0
    Thread-3 try 0
    Thread-4 is checking
    Thread-2 try 1
    Thread-3 try 1
    Thread-1 try 1
    Thread-1 try 2
    Thread-2 try 2
    Thread-3 try 2
    Thread-2 try too many times
    Thread-3 try too many times
    Thread-1 try too many times

    10、定时器

    定时器,指定n秒后执行某操作

    from threading import Timer
    
    def task(name):
        print('hello %s' %name)
    t=Timer(5,task,args=('egon',))
    t.start()
    
    打印:
    hello egon
    from threading import Timer
    import random
    class Code:
        def __init__(self):
            self.make_cache()  #最开始实例化的时候就先拿到一个验证码;
    
        def make_cache(self, interval=9): #做缓存功能
            self.cache = self.make_code()
            print(self.cache)
            self.t = Timer(interval, self.make_cache )
            self.t.start()
        def make_code(self, n=4): #一个随机字符串
            res = ''
            for i in range(n):
                s1 = str(random.randint(0, 9))#转成str为了拼接字符串
                s2 = chr(random.randint(65, 90)) #拿到字母,对应ascii表里边的
                res += random.choice([s1, s2])
            return res
        def check(self):
            while True:
                code = input('请输入你的验证码>>>:').strip() #你输不对我就隔5s刷新一下
                if code.upper() == self.cache:
                    print('验证码输入正确')
                    self.t.cancel()
                    break
    obj = Code()
    obj.check()

    11、线程queue

    进程里边的queue是多个进程之间共享数据,解决处理共享锁的问题;

    ###基本用法
    
    import queue
    q=queue.Queue(3) #先进先出->队列
    
    q.put('first')
    q.put(2)
    q.put('third')
    # q.put(4)
    # q.put(4,block=False) #等同于 q.put_nowait(4) #不等待
    # q.put(4,block=True,timeout=3) #默认为block=True;等3s
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get(block=False)) #q.get_nowait()
    # print(q.get_nowait())
    
    # print(q.get(block=True,timeout=3))
    ##跟上边用法一样
    import queue
    q=queue.LifoQueue(3) #后进先出->堆栈
    q.put('first')
    q.put(2)
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    打印:
    third
    2
    first
    import queue
    q=queue.PriorityQueue(3) #优先级队列 数字越小优先级越高
    
    q.put((10,'one'))
    q.put((40,'two'))
    q.put((30,'three'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    打印:
    (10, 'one')
    (30, 'three')
    (40, 'two')

    多线程实现并发的套接字通信

    把服务端原来串行的多个任务放到不同的线程里边去,彼此不互相影响;

    #基于线程池实现
    ##控制了最大并发数,保证了服务端不会无限制的去开启线程
    ##服务端
    from socket import *
    from concurrent.futures import ThreadPoolExecutor
    def communicate(conn):
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port):
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
    
        while True:
            conn, addr = server.accept()
            pool.submit(communicate,conn)
        server.close()
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(2) #最多开启2个线程,那么客户端也就最多开启2个了
        server('127.0.0.1', 8081)
    #服务端
    from socket import *
    from threading import Thread
    
    def communicate(conn):  #通信
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    def server(ip,port):  #负责建链接
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn, addr = server.accept() #建链接,建成功一次;立马起个线程;建好链接就给你干通信的活
            t=Thread(target=communicate,args=(conn,)) #就像专门聘请个服务员来服务你
            t.start()
        server.close()
    
    if __name__ == '__main__':
        server('127.0.0.1', 8081) #主线程,相当于招待人员
    #客户端
    
    
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()

    问题是机器不能无限制的起线程;

    12、进程池线程池

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    #计算密集型多核优势的情况下应该用进程;I/O密集型的应该用线程
    import os,time,random
    def task(name):
        print('name:%s pid:%s run' %(name,os.getpid()))
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        pool=ProcessPoolExecutor(4) #先把池子造好,指定最多放4个进程;不指定就是cpu的核字
        #pool=ThreadPoolExecutor(5) #在一个进程里边PID是一样的
    
        for i in range(10):
            pool.submit(task,'egon%s' %i) #把这10个任务全部丢给池子,没有阻塞
            #异步调用,提交完任务不用等着任务执行拿到结果,只负责提交完了立马走
        pool.shutdown(wait=True) #执行join操作;默认值就是True #等任务提交结束,把入口关了
        print('')
    ##线程池
    
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import currentThread
    import os,time,random
    
    def task():
        print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(5)
    
        for i in range(10):
            pool.submit(task,)
        pool.shutdown(wait=True)
        print('')

    异步调用和回调机制

      可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

    #1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    def la(name):
        print('%s is laing' %name)
        time.sleep(random.randint(3,5))
        res=random.randint(7,13)*'#'
        return {'name':name,'res':res}
    
    def weigh(shit):
        name=shit['name']
        size=len(shit['res'])
        print('%s 拉了 《%s》kg' %(name,size))
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(13)
    
        shit1=pool.submit(la,'alex').result() #往池子里提交任务,拿到结果
        weigh(shit1) #称重
    
        shit2=pool.submit(la,'wupeiqi').result()
        weigh(shit2)
    
        shit3=pool.submit(la,'yuanhao').result()
        weigh(shit3)
    
    打印:
    alex is laing
    alex 拉了 《13》kg
    wupeiqi is laing
    wupeiqi 拉了 《13》kg
    yuanhao is laing
    yuanhao 拉了 《10》kg
    #2、异步调用:提交完任务后,不地等待任务执行完毕,
    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    def la(name):
        print('%s is laing' %name)
        time.sleep(random.randint(3,5))
        res=random.randint(7,13)*'#'
        return {'name':name,'res':res}
    
    
    def weigh(shit):
        shit=shit.result()
        name=shit['name']
        size=len(shit['res'])
        print('%s 拉了 《%s》kg' %(name,size))
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(13)
    
        pool.submit(la,'alex').add_done_callback(weigh) #绑定一个回调函数
    
        pool.submit(la,'wupeiqi').add_done_callback(weigh)
    
        pool.submit(la,'yuanhao').add_done_callback(weigh)
    
    打印:
    alex is laing
    wupeiqi is laing
    yuanhao is laing
    alex 拉了 《11》kg
    wupeiqi 拉了 《7》kg
    yuanhao 拉了 《8》kg

    加上回调机制可以实现结构耦合。

    阻塞是进程运行的一种状态,碰到I/O了进程就会阻塞,剥夺cpu的执行权限;

    同步就是阻塞??遇到阻塞了程序就要在原地等着,同步调用也是提交完任务在原地等着啊,同步调用它只是一种提交任务的方式,比如纯计算型的也要等啊它是没有I/O的,它提交完任务之后根本不会考虑是计算型的还是I/O型的。

    小练习

    浏览器本质就是套接字客户端;在程序中模拟浏览器用request模块

    # import requests
    # response = requests.get('http://www.cnblogs.com/linhaifeng') #去目标站点下载个文件
    # print(response.text)
    
    
    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    
    def get(url):
        print('GET %s' %url)
        response=requests.get(url)
        time.sleep(3)
        return {'url':url,'content':response.text}
    
    
    def parse(res):  #并发下载,谁下载好了谁就去触发解析功能
        res=res.result()
        print('%s parse res is %s' %(res['url'],len(res['content'])))
    
    
    if __name__ == '__main__':
        urls=[
            'http://www.cnblogs.com/linhaifeng',
            'https://www.python.org',
            'https://www.openstack.org',
        ]
    
        pool=ThreadPoolExecutor(2) #线程池,I/O密集型
    
        for url in urls:
            pool.submit(get,url).add_done_callback(parse) #异步提交;回调函数
    
    打印:
    GET http://www.cnblogs.com/linhaifeng
    GET https://www.python.org
    http://www.cnblogs.com/linhaifeng parse res is 16320
    GET https://www.openstack.org
    https://www.python.org parse res is 49273
    https://www.openstack.org parse res is 64050
    
    
  • 相关阅读:
    1245. Tree Diameter
    771. Jewels and Stones
    830. Positions of Large Groups
    648. Replace Words
    647. Palindromic Substrings
    435. Non-overlapping Intervals
    646. Maximum Length of Pair Chain
    645. Set Mismatch
    242. Valid Anagram
    438. Find All Anagrams in a String
  • 原文地址:https://www.cnblogs.com/shengyang17/p/8926187.html
Copyright © 2011-2022 走看看