zoukankan      html  css  js  c++  java
  • 38 协程 greenlet模块实现并发 Gevent

    异步回调

    ### 什么是异步回调

    异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

    ### 为什么需要异步回调

    之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

    ### 异步回调的使用

    先来看一个案例:

    在编写爬虫程序时,通常都是两个步骤:

    ​ 1.从服务器下载一个网页文件

    ​ 2.读取并且解析文件内容,提取有用的数据

    按照以上流程可以编写一个简单的爬虫程序

    要请求网页数据则需要使用到第三方的请求库requests可以通过pip或是pycharm来安装,在pycharm中点击settings->解释器->点击+号->搜索requests->安装

    ```python
    import requests,re,os,random,time
    from concurrent.futures import ProcessPoolExecutor

    def get_data(url):
    print("%s 正在请求%s" % (os.getpid(),url))
    time.sleep(random.randint(1,2))
    response = requests.get(url)
    print(os.getpid(),"请求成功 数据长度",len(response.content))
    #parser(response) # 3.直接调用解析方法 哪个进程请求完成就那个进程解析数据 强行使两个操作耦合到一起了
    return response

    def parser(obj):
    data = obj.result()
    htm = data.content.decode("utf-8")
    ls = re.findall("href=.*?com",htm)
    print(os.getpid(),"解析成功",len(ls),"个链接")

    if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.python.org",
    "https://www.tmall.com",
    "https://www.mysql.com",
    "https://www.apple.com.cn"]
    # objs = []
    for url in urls:
    # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
    # parser(res)

    obj = pool.submit(get_data,url) #
    obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
    # objs.append(obj)

    # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
    # for obj in objs:
    # res = obj.result()
    # parser(res)
    # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
    # 2.解析任务变成了串行,
    ```

    总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数,

    如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。

    注意:

    1. 使用进程池时,回调函数都是主进程中执行执行
    2. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程
    3. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果

     线程队列

    1.Queue 先进先出队列

    与多进程中的Queue使用方式完全相同,区别仅仅是不能被多进程共享。

    ```python
    q = Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get(timeout=1))
    print(q.get(timeout=1))
    print(q.get(timeout=1))
    ```

    2.LifoQueue 后进先出队列

    该队列可以模拟堆栈,实现先进后出,后进先出

    ```python
    lq = LifoQueue()

    lq.put(1)
    lq.put(2)
    lq.put(3)

    print(lq.get())
    print(lq.get())
    print(lq.get())
    ```

    3.PriorityQueue 优先级队列

    该队列可以为每个元素指定一个优先级,这个优先级可以是数字,字符串或其他类型,但是必须是可以比较大小的类型,取出数据时会按照从小到大的顺序取出

    ```python
    pq = PriorityQueue()
    # 数字优先级
    pq.put((10,"a"))
    pq.put((11,"a"))
    pq.put((-11111,"a"))

    print(pq.get())
    print(pq.get())
    print(pq.get())
    # 字符串优先级
    pq.put(("b","a"))
    pq.put(("c","a"))
    pq.put(("a","a"))

    print(pq.get())
    print(pq.get())
    print(pq.get())
    ```

    .线程事件Event

    ### 什么是事件

    事件表示在某个时间发生了某个事情的通知信号,用于线程间协同工作。

    因为不同线程之间是独立运行的状态不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

    ### Event介绍

    Event象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    可用方法:

    ```python
    event.isSet():返回event的状态值;
    event.wait():将阻塞线程;知道event的状态为True
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。
    ```

    使用案例:

    ```python
    # 在链接mysql服务器前必须保证mysql已经启动,而启动需要花费一些时间,所以客户端不能立即发起链接 需要等待msyql启动完成后立即发起链接
    from threading import Event,Thread
    import time

    boot = False
    def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(5)
    print("服务器启动完成!")
    boot = True

    def connect():
    while True:
    if boot:
    print("链接成功")
    break
    else:
    print("链接失败")
    time.sleep(1)

    Thread(target=start).start()
    Thread(target=connect).start()
    Thread(target=connect).start()
    ```

    使用Event改造后:

    ```python
    from threading import Event,Thread
    import time

    e = Event()
    def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(3)
    print("服务器启动完成!")
    e.set()

    def connect():
    e.wait()
    print("链接成功")

    Thread(target=start).start()
    Thread(target=connect).start()
    Thread(target=connect).start()
    ```

    增加需求,每次尝试链接等待1秒,尝试次数为3次

    ```python
    from threading import Event,Thread
    import time

    e = Event()
    def start():
    global boot
    print("正正在启动服务器.....")
    time.sleep(5)
    print("服务器启动完成!")
    e.set()

    def connect():
    for i in range(1,4):
    print("第%s次尝试链接" % i)
    e.wait(1)
    if e.isSet():
    print("链接成功")
    break
    else:
    print("第%s次链接失败" % i)
    else:
    print("服务器未启动!")

    Thread(target=start).start()
    Thread(target=connect).start()
    # Thread(target=connect).start()
    ```





    单线程实现并发
    并发:指的是多个任务同时发生,看起来好像是同时都在进行

    并行:指的是多个任务真正的同时进行

    如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。
    并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发
    python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!
    利用生成器来实现并发执行:
    def task1():
    while True:
    yield
    print("task1 run")

    def task2():
    g = task1()
    while True:
    next(g)
    print("task2 run")
    task2()
     两个计算任务一个采用生成器切换并发执行  一个直接串行调用
    import time
    def task1():
    a = 0
    for i in range(10000000):
    a += i
    yield

    def task2():
    g = task1()
    b = 0
    for i in range(10000000):
    b += 1
    next(g)
    s = time.time()
    task2()
    print("并发执行时间",time.time()-s)

    # 单线程下串行执行两个计算任务 效率反而比并发高 因为并发需要切换和保存
    def task1():
    a = 0
    for i in range(10000000):
    a += i
    def task2():
    b = 0
    for i in range(10000000):
    b += 1
    s = time.time()
    task1()
    task2()
    print("串行执行时间",time.time()-s)
    ```
    可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言是没有必要的

    协程

    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。

    协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

    1.可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
    
    2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

    优点如下:

    1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    2. 单线程内就可以实现并发的效果,最大限度地利用cpu

    缺点如下:

    1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

    总结协程特点:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

    可以看到对于纯计算任务而言,单线程并发反而使执行效率下降了一半左右,所以这样的方案对于纯计算任务而言是没有必要的
    greenlet模块实现并发

    ```python
    def task1(name):
    print("%s task1 run1" % name)
    g2.switch(name) # 切换至任务2
    print("task1 run2")
    g2.switch() # 切换至任务2

    def task2(name):
    print("%s task2 run1" % name)
    g1.switch() # 切换至任务1
    print("task2 run2")

    g1 = greenlet.greenlet(task1)
    g2 = greenlet.greenlet(task2)
    g1.switch("jerry") # 为任务传参数
    ```

    现在我们需要一种方案 即可检测IO 又能够实现单线程并发,于是gevent闪亮登场
    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
    #用法
    g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
    
    g2=gevent.spawn(func2)
    
    g1.join() #等待g1结束
    
    g2.join() #等待g2结束
    
    #或者上述两步合作一步:gevent.joinall([g1,g2])
    
    g1.value#拿到func1的返回值
    遇到IO阻塞时会自动切换任务
    # gevent 不具备检测IO的能力  需要为它打补丁  打上补丁之后就能检测IO
    # 注意补丁一定打在最上面 必须保证导入模块前就打好补丁
    from gevent import monkey
    monkey.patch_all()

    from threading import current_thread
    import gevent,time


    def task1():
    print(current_thread(),1)
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")

    def task2():
    print(current_thread(),2)
    print("task2 run")
    print("task2 over")

    # spawn 用于创建一个协程任务
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)

    # 任务要执行,必须保证主线程没挂 因为所有协程任务都是主线在执行 ,必须调用join来等待协程任务
    # g1.join()
    # g2.join()
    # 理论上等待执行时间最长的任务就行 , 但是不清楚谁的时间长 可以全部join

    gevent.joinall([g1,g2])
    print("over")

    gevent.sleep(3)模拟的是gevent可以识别的io阻塞,

    而time.sleep(3)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

    from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

     

  • 相关阅读:
    母牛的故事
    实现图的邻接矩阵和邻接表的存储
    各个位数和,找最终和为个位数
    排序5之归并排序
    排序2之冒泡与选择排序
    神奇的魔方
    关于SaveChanges
    ADO.NET Entity Framework 4.0 Self Tracking Entity
    EF4.0自跟踪实体使用小结
    ADO.NET Entity Framework 4.0 新特性
  • 原文地址:https://www.cnblogs.com/komorebi/p/10986972.html
Copyright © 2011-2022 走看看