zoukankan      html  css  js  c++  java
  • 并发编程之协程

    1.异步回调

    回调函数指的是a交给b一个任务,b在执行完成之后回过头调用了a的一个函数,称之为回调。

    为什么需要回调函数?因为需要获取异步任务的结果,但是又不应该阻塞,这时回调函数就可以提高效率,高效的获取任务的结果。

    什么时候使用回调函数?通常异步任务都会和回调函数一起使用。

    使用方式:使用add_done_callback()函数给Future对象绑定一个回调函数

    注:在多进程中回调函数是交给主进程来执行,而在多线程中,回调函数是谁有空谁执行(不可能是主线程)。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import requests, re
    from threading import current_thread
    
    
    # 爬虫
    # 1.从目标站点下载网页数据,本质就是HTML格式字符串
    # 2.用re正则表达式,提取出你所需要的数据
    def get_data(url):
        print('%s向%s发起请求' % (current_thread().name, url))
        res = requests.get(url)
        print('%s请求%s成功' % (current_thread().name, url))
        return res
    
    
    def parser(obj):
        res = obj.result()
        htm = res.content.decode('utf-8')
        l = re.findall('href=(.*?com)', htm)
        print('%s解析成功,共%s个连接' % (current_thread().name, len(l)))
    
    
    pool = ThreadPoolExecutor(2)
    
    url = ['https://www.baidu.com',
           'https://www.jd.com',
           'https://www.python.org',
           'https://www.baidu.com',
           'https://www.cnblogs.com', ]
    for i in url:
        obj = pool.submit(get_data, i)  
        obj.add_done_callback(parser)  # 回调parser函数,并会将obj传入。
    
    # ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
    # ThreadPoolExecutor-0_1向https://www.jd.com发起请求
    # ThreadPoolExecutor-0_1请求https://www.jd.com成功
    # ThreadPoolExecutor-0_1解析成功,共148个连接
    # ThreadPoolExecutor-0_1向https://www.python.org发起请求
    # ThreadPoolExecutor-0_0请求https://www.baidu.com成功
    # ThreadPoolExecutor-0_0解析成功,共13个连接
    # ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
    # ThreadPoolExecutor-0_0请求https://www.baidu.com成功
    # ThreadPoolExecutor-0_0解析成功,共13个连接
    # ThreadPoolExecutor-0_0向https://www.cnblogs.com发起请求
    # ThreadPoolExecutor-0_0请求https://www.cnblogs.com成功
    # ThreadPoolExecutor-0_0解析成功,共153个连接
    # ThreadPoolExecutor-0_1请求https://www.python.org成功
    # ThreadPoolExecutor-0_1解析成功,共55个连接
    多线程异步回调

    2.线程队列

    线程队列与进程队列的区别:进程队列可以被多进程共享,而线程中的队列就是一个普通的容器,不能进程共享。

    线程队列三种方式:

    from queue import Queue, LifoQueue, PriorityQueue
    from threading import Thread
    
    # 1.先进先出
    q1 = Queue()
    q1.put(1)
    q1.put(2)
    q1.put(3)
    print(q1.get())
    print(q1.get())
    print(q1.get())
    
    # 2.先进后出(堆栈)
    q2 = LifoQueue()
    
    q2.put(1)
    q2.put(2)
    q2.put(3)
    
    print(q2.get())
    print(q2.get())
    print(q2.get())
    
    # 优先级队列
    q3 = PriorityQueue()
    
    q3.put((2, 1))  # 第一个参数设置优先级,第二个参数传入数据
    q3.put((1, 2))  # 取出顺序是由小到大,优先级可以是数字或者字符,只要能比较大小即可
    q3.put((3, 3))
    
    print(q3.get()[-1])
    print(q3.get()[-1])
    print(q3.get()[-1])

    3.线程事件(Event)

    事件:指的是用于协调多个线程工作的,当一个线程要执行某个操作,需要获取另一个线程的状态。

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

    import time
    from threading import Thread
    from threading import Event
    
    # 使用变量类完成多线程协作
    # is_boot = False
    # def start():
    #     global is_boot
    #     print("正在启动服务器......")
    #     time.sleep(5)
    #     print("服务器启动成功!")
    #     is_boot = True
    #
    # def connect():
    #     while True:
    #         if is_boot:
    #             print("连接服务器成功!")
    #             break
    #         else:
    #             print("连接服务器失败!")
    #         time.sleep(0.5)
    #
    #
    # Thread(target=start).start()
    # Thread(target=connect).start()
    变量类多线程协作
    from threading import Thread, Event
    import time
    
    e = Event()  # 默认值为False
    
    
    def start():
        print('服务器启动。。')
        time.sleep(3)
        print('服务器启动成功')
        e.set()  # 就是把事件的值设置为True
    
    
    def connect():
        for i in range(3):
            print('等待服务器启动。。')
            e.wait(0.5)  # 可以设置等待时间,等待e.set()执行,也就是将事件的值设置为True
            if e.is_set():
                print('连接成功。。。')
                break
            else:
                print('连接失败。。。')
        else:
            print('放弃连接')
    Thread(target=start).start()
    Thread(target=connect).start()
    # 服务器启动。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 放弃连接
    # 服务器启动成功
    Event完成多线程协作
    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False

    4.单线程实现并发

    并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那么就可以实现单线程并发。

    python中的生成器就具备这样的特点,每次调用next都会执行回到生成器函数中执行代码,者意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着会自动保存执行状态

    import time
    
    
    def func1():
        a = 1
        while True:
            a += 1
            print(a)
            print('func1 run')
            yield
    
    
    def func2():
        g = func1()  # 获取生成器
        while True:
            print('func2 run')
            time.sleep(2)
            next(g)  # 执行生成器中的代码
    
    
    func2()

    虽然并发实现,但是还有问题,看一下代码:

    import time
    
    # 生成器执行纯计算任务
    def func1():
        a = 0
        for i in range(10000000):
            a += i
            yield
    def func2():
        g = func1()  # 获取生成器
        b = 0
        for i in range(10000000):
            b += i
            next(g)  # 执行生成器中的代码
    start = time.time()
    func2()
    print('执行时间为 :%s' % (time.time()-start))
    
    # 执行时间为 :2.2250561714172363
    
    
    # 普通调用函数执行纯计算任务
    def func1():
        a = 0
        for i in range(10000000):
            a += i
    def func2():
        b = 0
        for i in range(10000000):
            b += i
    start = time.time()
    func1()
    func2()
    print('执行时间为 :%s' % (time.time()-start))
    
    # 执行时间为 :1.0411033630371094

    可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言使没有必要的,在上述代码中,使用yield来切换使得代码结构非常混乱,所以就引入了greenlet模块,专门对yield进行了封装。

    greenlet模块

    import greenlet
    import time
    def task1():
        print('task1 run')
        time.sleep(5)  # 等待5秒
        g2.switch()  # 切换到task2任务中
        print('task1 run')
    
    def task2():
        print('task2 run')
        g1.switch()  # 切换到task1任务中
    
    g1 = greenlet.greenlet(task1)
    g2 = greenlet.greenlet(task2)
    
    g1.switch()  # 相当于开启一个线程
    
    # task1 run
    # task2 run
    # task1 run

    该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield,还是greenlet模块都不能检测I/O操作,遇到i/o时同样进程阻塞状态,所以此时的并发没有任何意义。现在我们需要一种方案既可以检测I/O又能够实现单线程并发,于是引入了gevent模块。

    5.协程

    协程:是单线程下的并发,就是在应用程序中控制多个任务的切换+保存状态,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    需要强调的是:

    1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到I/O或执行时间过长,就会被迫交出CPU执行权限,切换其他线程运行)

    2.单线程内开启协程,一旦遇到I/O,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非I/O操作的切换与效率无关)

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换

    优点:

    1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

    2.单线程内就可以实现并发的效果,最大限度利用CPU

    缺点:

    1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。

    2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

    6.gevent模块

    gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    在Python中使用Gevent模块来实现协程,它可以在多个任务之间进行切换,并且可以自己检测I/O。

    import gevent
    def task1():
        print('task1 run')
        gevent.sleep(3)
        print('task1 run')
    
    def task2():
        print('task2 run')
    
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1, g2])  # 开启所有协程

    上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

    而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

    from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

    或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    from gevent import monkey
    import gevent
    import time
    monkey.patch_all()  
    def task1():
        print('task1 run')
        time.sleep(3)
        print('task1 run')
    
    def task2():
        print('task2 run')
    
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1, g2])  # 开启所有协程

    需要注意:

    1.协程执行时要想使任务执行则必须对协程对象调用join函数

    2.有多个任务时,随便调用哪一个的join都会并发的执行所有任务,但是需要注意如果一个存在io的任务没有被join该任务将无法正常执行完毕

    3.monkey补丁的原理是把原始的阻塞模块替换为修改后的非阻塞模块,即偷梁换柱,来实现IO自定切换,所以打补丁的位置一定要放到导入阻塞模块之前

     

  • 相关阅读:
    PAT 甲级 1132 Cut Integer (20 分)
    AcWing 7.混合背包问题
    AcWing 9. 分组背包问题
    AcWing 5. 多重背包问题 II
    AcWing 3. 完全背包问题
    AcWing 4. 多重背包问题
    AcWing 2. 01背包问题
    AcWing 875. 快速幂
    AcWing 874. 筛法求欧拉函数
    AcWing 873. 欧拉函数
  • 原文地址:https://www.cnblogs.com/wangke0917/p/10220813.html
Copyright © 2011-2022 走看看