zoukankan      html  css  js  c++  java
  • eventlet的学习

    转自:http://bingotree.cn/?p=281

    官方网站:http://eventlet.net/

    之前小秦我写了篇python中协程和yield的文章,这里小秦我再总结一下eventlet中比较重要的几个知识点。

    1.安装方法:

    1
    [root@COMPUTE02 ~]# pip install eventlet

    2.基础知识及优点
    eventlet的核心是协程(也叫做green thread)。
    协程的好处是没有线程开销来的大(比如切换代价很小)。同时协程由于调度都由开发者自己决定,所以对lock的需求就很低了。

    3.网络编程模型
    网络变成模型有两种:同步模型和异步模型
    同步模型就是一个请求来了后,给一个线程,这个线程单独的去处理这个请求。对于一些read/write方法如果资源没有就绪的话就阻塞在那里等待。优点是代码简单清晰,缺点是效率低下。
    异步模型就是通过epoll/select/poll这些方法,由一个主线程去轮询,查看那个请求的资源就绪了,就绪的话就调用相关的回调函数去进行处理。有点是效率较高,缺点是代码复杂。
    而通过协程,我们可以使用同步模型的方法写出异步模型效率的代码。

    4.API解释
    Greenthread Spawn大类:
    eventlet.spawn(func, *args, **kw):
    生成一个协程运行对于的func方法。这个会返回greenthread.GreenThread,调用者可以通过greenthread.GreenThread来获取这个协程的信息。

    eventlet.spawn_n(func, *args, **kw):
    作用和eventlet.spawn一样,但是不会返回greenthread.GreenThread。速度比spawn要快些。

    eventlet.spawn_after(seconds, func, *args, **kw):
    和spawn的功能一样,但是会在seconds指定的秒后才会生成协程去运行func的代码。如果想取消运行,可以在返回的greenthread.GreenThread中调用cancel方法。

    Greenthread Control大类:
    eventlet.sleep(seconds=0):
    暂停当前的协程,使之睡眠一段时间。这个方法会把cpu时间让给其它协程。

    class eventlet.GreenPool:
    一个用于控制并行的pool。通过这个pool可以指定运行的协程的上限,这样有助于控制资源的消耗。

    class eventlet.GreenPile:
    代表了一系列的工作。

    class eventlet.Queue:
    用于不同的协程间的通信。

    class eventlet.Timeout:
    用于给某个对象增加一个超时的行为。

    Patching Functions大类:
    eventlet.import_patched(modulename, *additional_modules, **kw_additional_modules):
    加载某个被绿化的公共模块。

    eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False):
    对于那些没有被绿化的模块,可以通过这个把这些模块中使用的相关公共模块绿化。

    Network Convenience Functions大类:
    eventlet.connect(addr, family=2, bind=None):
    用于获取客户端的连接

    eventlet.listen(addr, family=2, backlog=50):
    用于监听信息。

    eventlet.wrap_ssl(sock, *a, **kw):
    将一个普通socket转成一个ssl的socket。

    eventlet.serve(sock, handle, concurrency=1000)?:
    当请求来的时候,生成一个协程,通过handle对请求做出处理。调用这个方法后会的阻塞的,除非你把它放在一个spwan中的协程中运行。

    class eventlet.StopServe:
    用于退出serve的异常。

    5.例子
    5.1 Client Pattern:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
     
    import eventlet
    from eventlet.green import urllib2
     
    def fetch(url):
        return urllib2.urlopen(url).read()
     
    pool = eventlet.GreenPool()
    for body in pool.imap(fetch, urls):
        print "got body", len(body)

    这里建立了一个pool,可以猜测imap方法把urls中的每个url都调用了一个fetch方法去处理,并且这些都会建立独立的协程。每个协程 在read请求的时候,会的把cpu时间交给eventlet manager,同时把自己的socket端口注册到类似于select这类轮询方法中。然后eventlet manager发现某个协程等待的数据到达后,就会把cpu交给它,这个协程处理完数据后就会用yield返回数据,之后则把cpu时间继续交给 eventlet manager。

    5.2 Server Pattern

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import eventlet
     
    def handle(client):
        while True:
            c = client.recv(1)
            if not c: break
            client.sendall(c)
     
    server = eventlet.listen(('0.0.0.0', 6000))
    pool = eventlet.GreenPool(10000)
    while True:
        new_sock, address = server.accept()
        pool.spawn_n(handle, new_sock)

    这里也是通过一个pool限制资源的使用。当每个请求来的时候通过spawn_n方法把对这个请求的handle方法放到独立的协程中去处理。而handle中的recv这些方法都是被绿化过的,所以如果读取不到数据这些方法就会把cpu时间交出来给别的协程使用。

    5.3 Dispatch Pattern

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import eventlet
    feedparser = eventlet.import_patched('feedparser')
     
    pool = eventlet.GreenPool()
     
    def fetch_title(url):
        d = feedparser.parse(url)
        return d.feed.get('title', '')
     
    def app(environ, start_response):
        pile = eventlet.GreenPile(pool)
        for url in environ['wsgi.input'].readlines():
            pile.spawn(fetch_title, url)
        titles = ' '.join(pile)
        start_response('200 OK', [('Content-type', 'text/plain')])
        return [titles]

    这里的pile是个用于保存协程结果的东东,并且是个迭代器。一般可以用pile来获取相关的协程的结果。

    6.Hub
    这个Hub就是小秦我上面讲到的eventlet manager。其实际上就是一个loop的循环,不停的看有没有那些协程可以给它CPU时间了或者哪些定时器可以生效了。在eventlet中这个的实 现有下面几种:epolls,poll,selects,pyevent。可以通过eventlet.hubs.use_hub(hub=None)来决 定使用哪种hub。hub的那个loop所在的协程也叫做main greenlet。
    Hub只有在第一次IO操作的时候才会建立。

    7.eventlet.event.Event类
    event设queue差不多,但是有两个不同:
    1.调用send不会交出自己的cpu时间
    2.send只能被调用一次

    event主要用于在不同协程间传递返回值。比如我协程A需要等协程B做了某件事后的结果,那么我协程A可以建立了一个event evt,然后调用evt.wait()就会开始等待。协程B把事情做好后运行evt.send(XXX)(注意,由于都在一个线程中,所以获取这个evt 甚至不需要锁),这个时候协程A的evt.wait()代码就可以往下运行了,并且Hub会把相关的结果给它。
    比如这个官网上的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    >>> from eventlet import event
    >>> import eventlet
    >>> evt = event.Event()
    >>> def baz(b):
    ...     evt.send(b + 1)
    ...
    >>> _ = eventlet.spawn_n(baz, 3)
    >>> evt.wait()
    4

    下面这个例子也不错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    >>> from eventlet import event
    >>> import eventlet
    >>> evt = event.Event()
    >>> def waiter():
    ...     print 'about to wait'
    ...     result = evt.wait()
    ...     print 'waited for', result
    >>> _ = eventlet.spawn(waiter)
    >>> eventlet.sleep(0)
    about to wait
    >>> evt.send('a')
    >>> eventlet.sleep(0)
    waited for a

    另外可以把一个异常发送给在wait的event,如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    >>> from eventlet import event
    >>> evt = event.Event()
    >>> evt.send_exception(RuntimeError())
    >>> evt.wait()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "eventlet/event.py", line 120, in wait
        current.throw(*self._exc)
    RuntimeError

    如果要把trace也返回,那么:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    >>> import sys
    >>> evt = event.Event()
    >>> try:
    ...     raise RuntimeError()
    ... except RuntimeError:
    ...     evt.send_exception(*sys.exc_info())
    ...
    >>> evt.wait()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "eventlet/event.py", line 120, in wait
        current.throw(*self._exc)
      File "<stdin>", line 2, in <module>
    RuntimeError

    8.backdoor
    参考http://blog.csdn.net/ssdxiao/article/details/17759483中的例子:
    主要用于获取某个长时间运行的进程的状态。其原理是这样的:在程序的代码中,我专门运行一个协程,这个协程一般不会被调度到,所以不会影响程序的正常运行。这个协程中跑了一个backdoor_server,比如下面的这行代码:

    1
    eventlet.spawn(backdoor.backdoor_server,eventlet.listen(('localhost', 3000)), locals=backdoor_locals)

    这里的backdoor_locals是一个字典,key是某个字符串,而value就是对应的方法:

    1
    2
    3
    4
    backdoor_locals = {'exit': _dont_use_this,
                        'quit': _dont_use_this,
                        'off':turn_off_printing,
    }

    由于这个是协程,所以我们的backdoor_server完全有能力修改这个程序的内存中的变量。当这个程序运行起来后,我通过telnet的方 法可以连上这个程序,然后可以通过执行backdoor_locals中指定的三个方法exit,quit,off来控制我们程序的运行行为(比如修改某 个内存中变量的值)。

    来看一个简单的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    from eventlet import backdoor
    import eventlet
     
    def _funca():
        print "abc"
        return "123"
     
    backdoor_locals = {'funca': _funca}
     
    eventlet.spawn(backdoor.backdoor_server, eventlet.listen(('localhost', 3000)),locals=backdoor_locals)
     
    while True:
        print "aaa"
        eventlet.sleep(1)

    当这个程序运行后,我在另一个终端上执行下面的命令就可以看到对应的结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [root@COMPUTE02 ~]# telnet 127.0.0.1 3000
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    Python 2.6.6 (r266:84292, Sep  4 2013, 07:46:00)
    [GCC 4.4.7 20120313 (Red Hat 4.4.7-3)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    (InteractiveConsole)
    >>> funca()
    abc
    '123'
    >>>

    可以看到,print或return都会的把输出返回到telnet的client端。

    9.eventlet.timeout.Timeout类
    用于当指定的协程运行时间超过timeout中指定的时间时,生成一个异常。如:

    1
    2
    3
    4
    5
    >>> Timeout(0.1)
    >>> eventlet.sleep(0.2)
    Traceback (most recent call last):
     ...
    Timeout: 0.1 seconds

    或者下面这个例子:

    1
    2
    3
    4
    5
    6
    7
    data = None
    with Timeout(5, False):
        data = mysock.makefile().readline()
    if data is None:
        ... # 5 seconds passed without reading a line
    else:
        ... # a line was read within 5 seconds
  • 相关阅读:
    ASP.NET Core依赖注入(DI)
    SQLSERVER 创建索引视图注意事项
    Git的基本使用方法(0基础小白也能看懂)
    并发系列64章(异步编程二)第三章
    并发系列64章(异步编程)第二章
    并发系列64章(并发概要)第一章
    SQL Server配置邮件服务器
    SQL Server常用函数及命令
    SQL Server将一段字符串根据特定分隔符转换成一个表变量
    SQL Server双机热备之发布、订阅实现实时同步
  • 原文地址:https://www.cnblogs.com/zmlctt/p/4208928.html
Copyright © 2011-2022 走看看