zoukankan      html  css  js  c++  java
  • 并发编程之协程

    1.异步回调

    回调函数指的是a交给b一个任务,b在执行完成之后回过头调用了a的一个函数,称之为回调。

    为什么需要回调函数?因为需要获取异步任务的结果,但是又不应该阻塞,这时回调函数就可以提高效率,高效的获取任务的结果。

    什么时候使用回调函数?通常异步任务都会和回调函数一起使用。

    使用方式:使用add_done_callback()函数给Future对象绑定一个回调函数

    注:在多进程中回调函数是交给主进程来执行,而在多线程中,回调函数是谁有空谁执行(不可能是主线程)。

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import requests, re
    from threading import current_thread
    
    
    # 爬虫
    # 1.从目标站点下载网页数据,本质就是HTML格式字符串
    # 2.用re正则表达式,提取出你所需要的数据
    def get_data(url):
        print('%s向%s发起请求' % (current_thread().name, url))
        res = requests.get(url)
        print('%s请求%s成功' % (current_thread().name, url))
        return res
    
    
    def parser(obj):
        res = obj.result()
        htm = res.content.decode('utf-8')
        l = re.findall('href=(.*?com)', htm)
        print('%s解析成功,共%s个连接' % (current_thread().name, len(l)))
    
    
    pool = ThreadPoolExecutor(2)
    
    url = ['https://www.baidu.com',
           'https://www.jd.com',
           'https://www.python.org',
           'https://www.baidu.com',
           'https://www.cnblogs.com', ]
    for i in url:
        obj = pool.submit(get_data, i)  
        obj.add_done_callback(parser)  # 回调parser函数,并会将obj传入。
    
    # ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
    # ThreadPoolExecutor-0_1向https://www.jd.com发起请求
    # ThreadPoolExecutor-0_1请求https://www.jd.com成功
    # ThreadPoolExecutor-0_1解析成功,共148个连接
    # ThreadPoolExecutor-0_1向https://www.python.org发起请求
    # ThreadPoolExecutor-0_0请求https://www.baidu.com成功
    # ThreadPoolExecutor-0_0解析成功,共13个连接
    # ThreadPoolExecutor-0_0向https://www.baidu.com发起请求
    # ThreadPoolExecutor-0_0请求https://www.baidu.com成功
    # ThreadPoolExecutor-0_0解析成功,共13个连接
    # ThreadPoolExecutor-0_0向https://www.cnblogs.com发起请求
    # ThreadPoolExecutor-0_0请求https://www.cnblogs.com成功
    # ThreadPoolExecutor-0_0解析成功,共153个连接
    # ThreadPoolExecutor-0_1请求https://www.python.org成功
    # ThreadPoolExecutor-0_1解析成功,共55个连接
    多线程异步回调

    2.线程队列

    线程队列与进程队列的区别:进程队列可以被多进程共享,而线程中的队列就是一个普通的容器,不能进程共享。

    线程队列三种方式:

    from queue import Queue, LifoQueue, PriorityQueue
    from threading import Thread
    
    # 1.先进先出
    q1 = Queue()
    q1.put(1)
    q1.put(2)
    q1.put(3)
    print(q1.get())
    print(q1.get())
    print(q1.get())
    
    # 2.先进后出(堆栈)
    q2 = LifoQueue()
    
    q2.put(1)
    q2.put(2)
    q2.put(3)
    
    print(q2.get())
    print(q2.get())
    print(q2.get())
    
    # 优先级队列
    q3 = PriorityQueue()
    
    q3.put((2, 1))  # 第一个参数设置优先级,第二个参数传入数据
    q3.put((1, 2))  # 取出顺序是由小到大,优先级可以是数字或者字符,只要能比较大小即可
    q3.put((3, 3))
    
    print(q3.get()[-1])
    print(q3.get()[-1])
    print(q3.get()[-1])

    3.线程事件(Event)

    事件:指的是用于协调多个线程工作的,当一个线程要执行某个操作,需要获取另一个线程的状态。

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

    import time
    from threading import Thread
    from threading import Event
    
    # 使用变量类完成多线程协作
    # is_boot = False
    # def start():
    #     global is_boot
    #     print("正在启动服务器......")
    #     time.sleep(5)
    #     print("服务器启动成功!")
    #     is_boot = True
    #
    # def connect():
    #     while True:
    #         if is_boot:
    #             print("连接服务器成功!")
    #             break
    #         else:
    #             print("连接服务器失败!")
    #         time.sleep(0.5)
    #
    #
    # Thread(target=start).start()
    # Thread(target=connect).start()
    变量类多线程协作
    from threading import Thread, Event
    import time
    
    e = Event()  # 默认值为False
    
    
    def start():
        print('服务器启动。。')
        time.sleep(3)
        print('服务器启动成功')
        e.set()  # 就是把事件的值设置为True
    
    
    def connect():
        for i in range(3):
            print('等待服务器启动。。')
            e.wait(0.5)  # 可以设置等待时间,等待e.set()执行,也就是将事件的值设置为True
            if e.is_set():
                print('连接成功。。。')
                break
            else:
                print('连接失败。。。')
        else:
            print('放弃连接')
    Thread(target=start).start()
    Thread(target=connect).start()
    # 服务器启动。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 等待服务器启动。。
    # 连接失败。。。
    # 放弃连接
    # 服务器启动成功
    Event完成多线程协作
    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False

    4.单线程实现并发

    并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那么就可以实现单线程并发。

    python中的生成器就具备这样的特点,每次调用next都会执行回到生成器函数中执行代码,者意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着会自动保存执行状态

    import time
    
    
    def func1():
        a = 1
        while True:
            a += 1
            print(a)
            print('func1 run')
            yield
    
    
    def func2():
        g = func1()  # 获取生成器
        while True:
            print('func2 run')
            time.sleep(2)
            next(g)  # 执行生成器中的代码
    
    
    func2()

    虽然并发实现,但是还有问题,看一下代码:

    import time
    
    # 生成器执行纯计算任务
    def func1():
        a = 0
        for i in range(10000000):
            a += i
            yield
    def func2():
        g = func1()  # 获取生成器
        b = 0
        for i in range(10000000):
            b += i
            next(g)  # 执行生成器中的代码
    start = time.time()
    func2()
    print('执行时间为 :%s' % (time.time()-start))
    
    # 执行时间为 :2.2250561714172363
    
    
    # 普通调用函数执行纯计算任务
    def func1():
        a = 0
        for i in range(10000000):
            a += i
    def func2():
        b = 0
        for i in range(10000000):
            b += i
    start = time.time()
    func1()
    func2()
    print('执行时间为 :%s' % (time.time()-start))
    
    # 执行时间为 :1.0411033630371094

    可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言使没有必要的,在上述代码中,使用yield来切换使得代码结构非常混乱,所以就引入了greenlet模块,专门对yield进行了封装。

    greenlet模块

    import greenlet
    import time
    def task1():
        print('task1 run')
        time.sleep(5)  # 等待5秒
        g2.switch()  # 切换到task2任务中
        print('task1 run')
    
    def task2():
        print('task2 run')
        g1.switch()  # 切换到task1任务中
    
    g1 = greenlet.greenlet(task1)
    g2 = greenlet.greenlet(task2)
    
    g1.switch()  # 相当于开启一个线程
    
    # task1 run
    # task2 run
    # task1 run

    该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield,还是greenlet模块都不能检测I/O操作,遇到i/o时同样进程阻塞状态,所以此时的并发没有任何意义。现在我们需要一种方案既可以检测I/O又能够实现单线程并发,于是引入了gevent模块。

    5.协程

    协程:是单线程下的并发,就是在应用程序中控制多个任务的切换+保存状态,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    需要强调的是:

    1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到I/O或执行时间过长,就会被迫交出CPU执行权限,切换其他线程运行)

    2.单线程内开启协程,一旦遇到I/O,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非I/O操作的切换与效率无关)

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换

    优点:

    1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

    2.单线程内就可以实现并发的效果,最大限度利用CPU

    缺点:

    1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。

    2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

    6.gevent模块

    gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    在Python中使用Gevent模块来实现协程,它可以在多个任务之间进行切换,并且可以自己检测I/O。

    import gevent
    def task1():
        print('task1 run')
        gevent.sleep(3)
        print('task1 run')
    
    def task2():
        print('task2 run')
    
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1, g2])  # 开启所有协程

    上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

    而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

    from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

    或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    from gevent import monkey
    import gevent
    import time
    monkey.patch_all()  
    def task1():
        print('task1 run')
        time.sleep(3)
        print('task1 run')
    
    def task2():
        print('task2 run')
    
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1, g2])  # 开启所有协程

    需要注意:

    1.协程执行时要想使任务执行则必须对协程对象调用join函数

    2.有多个任务时,随便调用哪一个的join都会并发的执行所有任务,但是需要注意如果一个存在io的任务没有被join该任务将无法正常执行完毕

    3.monkey补丁的原理是把原始的阻塞模块替换为修改后的非阻塞模块,即偷梁换柱,来实现IO自定切换,所以打补丁的位置一定要放到导入阻塞模块之前

     

  • 相关阅读:
    自定义checkbox样式
    自定义select样式
    jsonp
    I/O复用 poll简介
    DOS和DDOS攻击
    TCP状态转换图解析
    Makefile入门
    I/O复用select 使用简介
    替换文本内容
    share memory
  • 原文地址:https://www.cnblogs.com/wangke0917/p/10220813.html
Copyright © 2011-2022 走看看