zoukankan      html  css  js  c++  java
  • 并发编程-多线程

    一.什么是线程

    一条流水线的工作流程(程序中负责执行的东西叫做线程,或者叫进程内部的执行序列),操作系统能够运算调度的最小单位。

    进程是资源单位,线程是执行单位

    二.线程和进程的区别

    1.开启进程的开销非常大,比开启线程的开销大很多

    2.开启进程的速度慢,开启线程的速度快

    3.进程之间通过队列等方式实现通信,线程可以共享进程中的数据

    1.线程是程序执行的最小单位,进程是操作系统分配资源的最小单位
    2.一个标准的线程由线程id,当前指令指针、寄存器和堆栈组成

    三.线程的应用

    一个文本编辑器:

    1.输入文字

    2.在屏幕上显示

    3.保存在磁盘中

    针对以上:1.在一个进程中开启多个线程。  2.开启多个进程。

    四.开启线程的两种方式

    图解:进程和线程的执行(p.start(),t.start())都是给操作系统发信号,然后操作系统调用cpu来执行进程中的线程(进程是资源单位,线程是执行单位)。

    线程的开启和进程没什么区别,只是引入模块不同

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('太白',))
        t.start()           # 发送信号,开启线程的速度很快,所以线程先执行
        print('主线程'
    方式一
    from threading import Thread
    import time
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')
    方式二

    五.线程vs进程的代码对比

     

    线程与进程的对比(理论)

    1:开启进程的开销非常大,要比开启线程的开销大很多
    2:开启线程的速度非常快,要比开启进程的开销快几十倍上百倍
    3:线程与线程之间可以共享数据,而进程与进程之间要借用队列等方法通信
          

    线程的应用

    1并发:一个cpu看起来像是同时执行多个任务
    2单个进程下开启三个线程,并发的执行任务(相对好一点,分场景)
    3开启三个进程并发的执行任务
    from threading import Thread
    from multiprocessing import Process
    import time
    #多线程
    def task(name):
        print(f'{name} is running')
        time.sleep(1)
        print(f'{name} is gone')
    
    if __name__ == '__main__':
        t1=Thread(target=task,args=('liyr',))
        t1.start()
        print('==主线程')
    #打印结果
    # liyr is running
    # ==主线程
    # liyr is gone
    
    #多进程(永远是主进程先打印)
    def task(name):
        print(f'{name} is running')
        time.sleep(1)
        print(f'{name} is gone')
    
    if __name__ == '__main__':
        t1=Process(target=task,args=('liyr',))
        t1.start()
        print('==主线程')
    #打印结果
    # ==主线程
    # liyr is running
    # liyr is gone
    进程和线程开启速度对比
    from threading import Thread
    from multiprocessing import Process
    import time,os
    
    # 多线程
    def task(name):
        print(os.getpid())
    
    if __name__ == '__main__':
        t1=Thread(target=task,args=('liyr',))
        t2=Thread(target=task,args=('liyr',))
        t1.start()
        t2.start()
        print(f'==主线程:{os.getpid()}')
    #打印结果
    # 15692
    # 15692
    # ==主线程:15692
    
    #多进程(子进程依赖于主进程)
    def task(name):
        print(os.getpid())
        print(f'=主线程:{os.getppid()}')
    
    if __name__ == '__main__':
        t1=Process(target=task,args=('liyr',))
        t2=Process(target=task,args=('liye',))
        t1.start()
        t2.start()
        print(f'==主线程:{os.getpid()}')
    #打印结果
    # ==主线程:8496
    # 15600
    # 14712
    # =主线程:8496
    # =主线程:8496
    进程与线程pid对比
    from threading import Thread
    
    x = 3
    def task(name):
        global x
        x = 100
    
    if __name__ == '__main__':
        t1 = Thread(target=task, args=('liyr',))
        t1.start()
        t1.join()
        print(f'==主线程:{x}')
    #同一个进程内的资源对于这个进程的多个线程来说是共享的
    同一进程内线程资源共享

    六.守护线程

    对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程全部运行完毕,主线程才运行完毕

    如果守护线程的生命周期小于其他线程,则会先结束,否则要等待其他非守护线程和主线程结束之后结束

    from threading import Thread
    import time
    def task(name):
        print(f'{name} is running')
        time.sleep(3)
        print(f'{name} is gone')
    
    if __name__ == '__main__':
        t1=Thread(target=task,args=('li',))
        # t2=Thread(target=task,args=('liyr',))
        t1.daemon=True
        t1.start()#线程的开启速度要比进程快很多
        # t2.start()
        print('==主线程')
        # 守护线程 等待非守护子线程以及主线程结束之后,结束.
        
        
    from threading import Thread
    import time
    
    def foo():
        print(123)  # 1
        time.sleep(1)
        print("end123")  # 4
    
    def bar():
        print(456)  # 2
        time.sleep(3)
        print("end456")  # 5
    
    
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")  # 3
    '''
    123
    456
    main-------
    end123
    end456
    '''
    示例

    七.线程的其他方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    from threading import Thread
    from threading import currentThread
    from threading import enumerate
    from threading import activeCount
    def task(name):
        print(666)
    
    
    if __name__ == '__main__':
        t1 = Thread(target=task, args=('liyr',),name='呵呵呵')
        t1.start()
        print(t1.isAlive())#判断线程是否活着
        t1.setName('哈哈哈')#设置一个线程名
        print(t1.getName())#获取线程名
        print(t1.name)#获取线程名(重要)
        print(currentThread())#返回当前的线程对象
        print(enumerate())#获取当前进程的所有的线程对象列表
        print(activeCount())#获取当前活跃的线程数量(重要)
        print(f'==主线程')
    示例

    八.互斥锁

    1.互斥锁

    互斥锁是控制同步带来的竞争,保证数据的安全性的一种措施

    from threading import Thread
    from threading import Lock
    import time
    import random
    x = 100
    
    def task(lock):
    
        lock.acquire()
        # time.sleep(random.randint(1,2))
        global x
        temp = x
        time.sleep(0.01)
        temp = temp - 1
        x = temp
        lock.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        l1 = []
        for i in range(100):
            t = Thread(target=task,args=(mutex,))
            l1.append(t)
            t.start()
    
        time.sleep(3)
        print(f'主线程{x}')
    示例

    2.死锁

    死锁是指两个线程(进程)在运行过程中由于争夺资源而造成的一种阻塞状态。

    产生死锁的原因:1.资源竞争 2.进程间推进顺序不合法

    如图:线程A持有锁,线程B持有锁b,下一步线程A等待获取锁b,线程B等待获取锁a,两者都未释放,造成死锁现象。

    from threading import Thread,Lock
    import time
    lock_A = Lock()
    lock_B = Lock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}抢到了A锁")
            lock_B.acquire()
            print(f"{self.name}抢到了B锁")
            lock_B.release()
            print(f"{self.name}释放了B锁")
            lock_A.release()
            print(f"{self.name}释放了A锁")
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}抢到了B锁")
            time.sleep(0.1)
            lock_A.acquire()
            print(f"{self.name}抢到了A锁")
            lock_A.release()
            print(f"{self.name}释放了A锁")
            lock_B.release()
            print(f"{self.name}释放了B锁")
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    示例

    3.递归锁

    递归锁可以解决死锁现象,当业务需要多个锁时,优先考虑递归锁.指向同一把锁:lock_a = lock_b = Rlock()

    1
    mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    from threading import Thread,RLock
    import time
    
    lock_A = lock_B = RLock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}抢到了A锁")
            lock_B.acquire()
            print(f"{self.name}抢到了B锁")
            lock_B.release()
            print(f"{self.name}释放了B锁")
            lock_A.release()
            print(f"{self.name}释放了A锁")
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}抢到了B锁")
            time.sleep(0.1)
            lock_A.acquire()
            print(f"{self.name}抢到了A锁")
            lock_A.release()
            print(f"{self.name}释放了A锁")
            lock_B.release()
            print(f"{self.name}释放了B锁")
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
    示例

    4.信号量

    信号量也是一把锁,用来控制并发数量

    1
    2
    3
    4
    5
    6
    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1
    调用release() 时内置计数器+1
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
     
    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
    from threading import Thread, Semaphore, current_thread
    import time
    import random
    sem = Semaphore(5)
    
    def task():
        sem.acquire()
    
        print(f'{current_thread().name} 厕所ing')
        time.sleep(random.randint(1,3))
    
        sem.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task,)
            t.start()
    
    #同一时间内,只有指定数目的线程数运行任务,当其中的某几个运行完了,立马有相应的线程再来执行。
    示例

    九.GIL全局锁(the Global Interpreter Lock)

    1.GIL全局锁

    首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念,Python完全可以不依赖于GIL。

    GIL本质就是一把互斥锁,互斥锁的本质都一样,都是将并发变成串行,保证数据的安全。Jpython,pypy都没有GIL锁。

    复制代码
    1.为什么使用GIL全局锁
        1.当时都是单核时代,而且cpu价格非常贵.
        2.如果不加全局解释器锁, 开发Cpython解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等.他为了省事儿,直接进入解释器时给线程加一个锁.
    
    2.优缺点
        优点:保证了Cpython解释器的数据资源的安全
        缺点:单个进程的多线程不能使用多核
    3.GIL全局锁是针对单个进程中线程的,同一时间内,只能有一个线程获取这把锁。
    4.所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码
    5.一遇到IO,GIL锁就释放。
    6.为什么有了GIL锁还要自己加锁,GIL锁不是已经保证了同一进程下线程是串行吗?
      这个问题主要还是IO,一旦线程中有了IO,如果不加锁,GIL会自动释放,此时又有其他线程抢到,那么线程执行的任务中数据就不安全。
    复制代码

    如图:python解释器分为虚拟机和编译器两部分,一个线程中的代码作为参数传给python解释器,由编译器将其转为c语言识别的字节码,然后由虚拟机将字节码转为机器语言,操作系统调用cpu来执行。

    from threading import Thread
    import time
    
    x = 100
    def task():
        global x
        temp = x
        x = temp - 1
    
    if __name__ == '__main__':
        for i in range(100):
            t = Thread(target=task,)
            t.start()
        print(x)
    
    #打印结果:0
    #因为GIL锁,同一时间内只有一个线程执行任务,所以这是串行
    示例1
    from threading import Thread
    import time
    
    x = 100
    def task():
        global x
        temp = x
        time.sleep(0.001)
        x = temp - 1
    
    if __name__ == '__main__':
        for i in range(100):
            t = Thread(target=task,)
            t.start()
        print(x)
    
    # 打印结果:94,95...不定,因为有阻塞(time.sleep()模仿),在0.001s之内可能有好几个线程拿到x值,剩下的再进行减1
    示例2

    2.GIL和Lock锁的区别

    1
    2
    3
    4
    5
    6
    7
    相同点:
        都是互斥锁
    不同点:
        1.GIL保护python解释器内部数据资源的安全
        2.GIL上锁、释放锁无需手动操作
        3.自己代码中定义的互斥锁保护进程中的资源数据的安全.
        4.自己定义的互斥锁必须自己手动上锁,释放锁.

    3.验证计算密集型IO密集型的效率

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #分析:
    我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
    方案一:开启四个进程
    方案二:一个进程下,开启四个线程
     
    #单核情况下,分析结果:
      如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
      如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
     
    #多核情况下,分析结果:
      如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
      如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
     
      
    #结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
    # import os
    # print(os.cpu_count())
    
    # 计算密集型用多进程
    # from multiprocessing import Process
    # import time
    # def task():
    #     i = 0
    #     for j in range(100000000):
    #         i+=1
    #
    # if __name__ == '__main__':
    #     l = []
    #     start_time = time.time()
    #     for i in range(4):
    #         p = Process(target=task)
    #         l.append(p)
    #         p.start()
    #     for j in l:
    #         j.join()
    #     print(time.time() - start_time)
    
    
    # 计算密集型用多线程
    # from threading import Thread
    # import time
    # def task():
    #     i = 0
    #     for j in range(100000000):
    #         i+=1
    #
    # if __name__ == '__main__':
    #     l = []
    #     start_time = time.time()
    #     for i in range(4):
    #         p = Thread(target=task)
    #         l.append(p)
    #         p.start()
    #     for j in l:
    #         j.join()
    #     print(time.time() - start_time)
    
    
    # io密集型用多进程
    
    # from multiprocessing import Process
    # import time
    # def task():
    #     time.sleep(3)
    # if __name__ == '__main__':
    #     l = []
    #     start_time = time.time()
    #     for i in range(4):
    #         p = Process(target=task)
    #         l.append(p)
    #         p.start()
    #     for j in l:
    #         j.join()
    #     print(time.time() - start_time)
    
    
    # io密集型用多线程
    
    # from threading import Thread
    # import time
    # def task():
    #     time.sleep(3)
    #
    # if __name__ == '__main__':
    #     l = []
    #     start_time = time.time()
    #     for i in range(4):
    #         p = Thread(target=task)
    #         l.append(p)
    #         p.start()
    #     for j in l:
    #         j.join()
    #     print(time.time() - start_time)
    示例

    十.多线程实现socket通信

    from threading import Thread
    import socket
    phone = socket.socket()
    phone.bind(('127.0.0.1',8888))
    phone.listen(4)
    
    def communite(conn):
        while 1:
            try:
                ret = conn.recv(1024)
                print(ret.decode("utf-8"))
                msg = input(">>>")
                conn.send(msg.encode("utf-8"))
            except Exception:
                break
        conn.close()
    while 1:
        conn,addr = phone.accept()
        t = Thread(target=communite,args=(conn,))
        t.start()
        # communite(conn)
    phone.close()
    服务端
    import socket
    client = socket.socket()
    client.connect(('127.0.0.1',8888))
    while 1:
        msg = input(">>>")
        client.send(msg.encode("utf-8"))
        ret = client.recv(1024)
        print(ret.decode("utf-8"))
    client.close()
    客户端

    十一..进程池、线程池

    我们不能无休止的添加线程(进程),这样需要用到线程池(进程池),是容纳的最多的线程数(进程数)。

    复制代码
    1.提交任务的两种方式:
        同步调用(提交任务,等待任务执行完再继续向下执行)
        异步调用(提交任务完成,就立马向下执行,不再等待。一般和回调配合使用,异步处理IO多的,回调处理IO少的)
    2.任务执行的三种方式:
        阻塞:(程序运行时,遇到IO,程序挂起,cpu被切走)
            阻塞
        非阻塞:(程序没有遇到IO,或遇到IO但通过某种手段让程序继续执行)
            就绪
            执行
    3.回调函数:按顺序接受每个任务的结果,进行下一步的处理。
    复制代码
    复制代码
    concurrent.futures 模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor:进程池:提供异步调用
    
    基本方法:
    1.submit()      #异步提交任务
    2.map(func,*iterables,timeout=None)
      取代for循环的submit操作
    3.shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作
        wait=True,等待池内所有任务执行完毕回收资源再继续
        wait=False,立即返回,并不会等到池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    4.result()   #取得submit的结果
    5.add_done_callback(fn)   #回调函数
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import os
    import time
    import random
    
    # print(os.cpu_count())
    def task(n):
        print(f'{os.getpid()} 接客')
        time.sleep(random.randint(1,3))
    
    
    if __name__ == '__main__':
    
        # 开启进程池  (并行(并行+并发))
        # p = ProcessPoolExecutor()  # 默认不写,进程池里面的进程数与cpu个数相等
        #
        # # p.submit(task,1)
        # # p.submit(task,1)
        # # p.submit(task,1)
        # # p.submit(task,1)
        # # p.submit(task,1)
        # # p.submit(task,1)
        # # p.submit(task,1)
        # for i in range(20):
        #     p.submit(task,i)
        #
        # 开启线程池  (并发)
        t = ThreadPoolExecutor()  # 默认不写, cpu个数*5 线程数
        # t = ThreadPoolExecutor(100)  # 100个线程
    
        for i in range(20):
            t.submit(task,i)
    示例1
    import time
    import os
    import threading
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    def func(n):
        time.sleep(2)
        print('%s打印的:'%(threading.get_ident()),n)
        return n*n
    tpool = ThreadPoolExecutor(max_workers=5) #默认一般起线程的数据不超过CPU个数*5
    # tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只需要将上面的ThreadPoolExecutor改为ProcessPoolExecutor就行了,其他都不用改
    #异步执行
    t_lst = []
    for i in range(5):
        t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):  可以传任意形式的参数
        t_lst.append(t)  #
        # print(t.result())
        #这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
    tpool.shutdown() #起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
    print('主线程')
    for ti in t_lst:
        print('>>>>',ti.result()
    示例2
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import threading
    import os,time,random
    def task(n):
        print('%s is runing' %threading.get_ident())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        s = executor.map(task,range(1,5)) #map取代了for+submit
        print([i for i in s])
    map示例

    十二:阻塞,非阻塞,异步,同步

    阻塞(站在程序运行的角度):程序遇到IO立马会停止(挂起),CPU马上切换,等到IO结束之后,再执行

    非阻塞(站在程序运行的角度):程序没有IO或者遇到IO后可以通过某种手段让CPU去执行其他的任务,尽可能的占用CPU(协程)

    异步(站在程序运行的角度):所有的任务同时发出,我就继续执行下一个代码,等结果
    #所有的任务同时发出,我就继续执行下一个代码,等结果
    #异步执行任务的方式一:将所有的任务结果统一回收
    
    # 异步调用返回值如何接收? 未解决.
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    
    def task(i):
        print(f'{os.getpid()}开始任务')
        time.sleep(random.randint(1,3))
        print(f'{os.getpid()}任务结束')
        return i
    if __name__ == '__main__':
    
        # 异步调用
        pool = ProcessPoolExecutor()
        l1 = []
        for i in range(10):
            obj = pool.submit(task,i)
            l1.append(obj)
    
        pool.shutdown(wait=True)
        # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,在执行. 有点类似与join.
        # shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
        # 一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值.
        print(l1)
        for i in l1:
            print(i.result())
        print('===主')
    # 统一回收结果: 我不能马上收到任何一个已经完成的任务的返回值,我只能等到所有的任务全部结束统一回收.
    异步示例

    同步(站在程序运行的角度):任务发布出去之后,自任务开始运行,直到这个任务最终结束之后,给我一个结果,我再发布下个任务

    #任务发布出去之后,自任务开始运行,直到这个任务最终结束之后,给我一个结果,我再发布下个任务
    
    # 2. 同步调用
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    
    def task(i):
        print(f'{os.getpid()}开始任务')
        time.sleep(random.randint(1,3))
        print(f'{os.getpid()}任务结束')
        return i
    if __name__ == '__main__':
    
        # 同步调用
        pool = ProcessPoolExecutor()
        for i in range(10):
            obj = pool.submit(task,i)
            # obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了.
            # obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.
            print(f'任务结果:{obj.result()}')
    
        pool.shutdown(wait=True)
        # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,在执行. 有点类似与join.
        # shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
        # 一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值.
        print('===主')
    同步示例

    异步+回调函数(不是万能的)

    import  requests
    from concurrent.futures import ProcessPoolExecutor
    import  os,time,random
    
    def get(url):
        respones=requests.get(url)
        print(f'{os.getpid()}正在爬取{url}的数据')
        time.sleep(random.randint(1,3))
        if respones.status_code==200:
            return respones.text
    
    def parse(obj):
        print(f'处理结果{len(obj.result())}')
    
    
    if __name__ == '__main__':
        url_list=[
            'http://www.luckincoffee.com/?bd_vid=6839995028662130211',
            'https://www.cnblogs.com/jin-xin/p/9076242.html?tdsourcetag=s_pctim_aiomsg',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'http://investor.luckincoffee.com/'
        ]
        p=ProcessPoolExecutor(4)
        for i in url_list:
            obj=p.submit(get,i)
            obj.add_done_callback(parse)#增加一个回调函数
        p.shutdown(wait=True)
        print('')
        
    #回调函数是主进程帮助你实现的,回调函数帮你进行分析任务,明确了进程的任务,只有一个网络爬取
    #分析任务:回调函数执行了,实现了对函数的解耦(回调函数是串行的)
    #极值情况:如果回调函数是IO任务,那么由于你的回调函数是主进程做的,所以有可能影响效率
    
    #如果多个任务,多进程多线程的处理的IO任务
        1:剩下的任务,非IO阻塞,就用异步+回调
        2:剩下的任务,IO<多个任务的IO,就用异步+回调
        3:身下的任务,IO>=多个任务的IO,就用第二种方式或者两个线程进程池
    '''
    线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,
    当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务.
    如果进程池+回调: 回调函数由主进程去执行.
    如果线程池+回调: 回到函数由空闲的线程去执行.
    '''
    异步回调示例

    十三.Event事件

    1
    2
    3
    4
    5
    6
    7
    1.Event事件是不同线程之间的同步对象
    2.相当于一个全局变量(在一个线程中设置,另一个线程中使用),一般用来两个线程之间的协调工作
     
    event.isSet():返回event的状态值;
    event.wait():如果 event.isSet()==False将阻塞线程;
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False

     

    # 版本二: 事件event
    '''
    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    
    event = Event()
    def check():
        print(f'{current_thread().name} 监测服务器是否开启...')
        time.sleep(3)
        print(event.is_set())
        event.set()
        print(event.is_set())
        print('服务器已经开启...')
    
    def connect():
    
        print(f'{current_thread().name} 等待连接...')
        # event.wait()  # 阻塞 直到 event.set() 方法之后
        event.wait(1)  # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
        print(f'{current_thread().name} 连接成功...')
    
    t1 = Thread(target=check,)
    t2 = Thread(target=connect,)
    t1.start()
    t2.start()
    '''
    #练习
    
    一个线程监测服务器是否开始,另个一线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s 一次,如果超过3次,还没有连接成功,则显示连接失败.
    '''
    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    
    event = Event()
    def check():
        print(f'{current_thread().name} 监测服务器是否开启...')
        time.sleep(4)
        event.set()
        print('服务器已经开启...')
    
    def connect():
        count = 1
        while not event.is_set():
            if count == 4:
                print('连接次数过多,已断开')
                break
            event.wait(1)
            print(f'{current_thread().name} 尝试连接{count}次')
            count += 1
        else:
            print(f'{current_thread().name} 连接成功...')
    
    t1 = Thread(target=check,)
    t2 = Thread(target=connect,)
    t1.start()
    t2.start()
    '''
    ============================================================
    from threading import Thread,current_thread
    import time
    from threading import Event
    e=Event()#默认为False
    
    def task():
        print(f'{current_thread().name}正在监测服务器是否正常')
        time.sleep(3)
        e.set()#改成True
    
    def task1():
        print(f'{current_thread().name}正在尝试链接服务器')
        e.wait()#轮询监测event是都为True,当其为true,继续执行下一行代码,阻塞
        #e.wait(1)设置超时时间,如果event改成了true,代码继续执行,如果超过一秒种,代码继续执行
        print(f'{current_thread().name}服务器链接成功')
    
    if __name__ == '__main__':
        t=Thread(target=task)
        t.start()
        for i in range(4):
            t1=Thread(target=task1)
            t1.start()
    示例

    十四.线程队列

    进程之间通过队列(from multiprocessing import Queue)下的Queue通信,那么线程之间亦可以通过队列通信

    class queue.Queue(maxsize=0) #先进先出

    import queue
    q=queue.Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    ------------------------------------
    from threading import Thread
    import queue
    def task1(q):
        while 1:
            print(q.get())                    
    if __name__ == '__main__':
        q = queue.Queue()
        for i in range(3):
            q.put(i)
        t1 = Thread(target=task1,args=(q,))
        t1.start()
    
    #打印结果  0  1   2
    队列(先进先出)

    class queue.LifoQueue(maxsize=0) #last in fisrt out

    q=queue.LifoQueue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    -------------------------------
    from threading import Thread
    import queue
    def task1(q):
        while 1:
            print(q.get())
    if __name__ == '__main__':
        q = queue.LifoQueue()
        for i in range(3):
            q.put(i)
        t1 = Thread(target=task1,args=(q,))
        t1.start()
    
    
    #打印结果   2   1  0
    堆栈 LIFO 先进后出

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    q=queue.PriorityQueue()
    q.put((10,'垃圾消息'))
    q.put((5,'一般消息'))
    q.put((0,'紧急消息'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    --------------------------------
    from threading import Thread
    import queue
    def task1(q):
        while 1:
            print(q.get())
    if __name__ == '__main__':
        q = queue.PriorityQueue()
        for i in range(3):
            q.put(i)
        # q.put(-10)
        t1 = Thread(target=task1,args=(q,))
        t1.start()
    
    #打印结果: -10 0 1 2
    
    from threading import Thread
    import queue
    def task1(q):
        while 1:
            print(q.get())
    if __name__ == '__main__':
        q = queue.PriorityQueue()
        q.put((-1,5))
        q.put((22,10))
        q.put((2,20))
        t1 = Thread(target=task1,args=(q,))
        t1.start()
    
    
    #打印结果:
    (-1, 5)
    (2, 20)
    (22, 10)
    优先级队列(数字越低,优先级越高)
  • 相关阅读:
    JavaWeb学习篇之----自定义标签&&JSTL标签库详解
    JavaWeb学习篇之----EL表达式详解
    JavaWeb学习篇之----Jsp详解
    JavaWeb学习篇之----Servlet过滤器Filter和监听器
    JavaWeb学习篇之----Session&&Cookie
    JavaWeb学习篇之----容器Request详解
    JavaWeb学习篇之----浏览器缓存问题详解
    JavaWeb学习篇之----容器Response详解
    JavaWeb学习篇之----HTTP协议详解
    factory工厂模式之抽象工厂AbstractFactory
  • 原文地址:https://www.cnblogs.com/luckinlee/p/11621062.html
Copyright © 2011-2022 走看看