zoukankan      html  css  js  c++  java
  • An Introduction to Asynchronous Programming and Twisted (3)

    Part 11: Your Poetry is Served

    A Twisted Poetry Server

    Now that we’ve learned so much about writing clients with Twisted, let’s turn around and re-implement our poetry server with Twisted too. And thanks to the generality of Twisted’s abstractions, it turns out we’ve already learned almost everything we need to know.

    class PoetryProtocol(Protocol):
     
        def connectionMade(self):
            self.transport.write(self.factory.poem)
            self.transport.loseConnection()
    
    class PoetryFactory(ServerFactory):
        
        protocol = PoetryProtocol
        def __init__(self, poem):
            self.poem = poem
    
    def main():
        options, poetry_file = parse_args()
        poem = open(poetry_file).read()
        factory = PoetryFactory(poem)
     
        from twisted.internet import reactor
        port = reactor.listenTCP(options.port or 0, factory,
                                 interface=options.iface)
      
        reactor.run()

    可见server和client基本原理上是一致的, reactor loop侦听事件, 事件到达时使用protocol去处理, factory用于管理protocol, 继承自ServerFactory.

    Part 12: A Poetry Transformation Server

    这节中实现一个复杂些的server, 根据client发送不同的请求, 将poem做不同的转换并发回client, 这就需要一个协议使得client和server可以正常沟通.

    Twisted includes support for several protocols we could use to solve this problem, including XML-RPC, Perspective Broker, and AMP.

    但是为了是我们的例子足够简单以至于容易理解, 我们使用自己的一个简单的协议,

    <transform-name>.<text of the poem>

    当server接收到从客户端发出的这样的request后, 根据transform-name将text of the poem进行相应的transform, 并发送回client.

    class TransformProtocol(NetstringReceiver):
     
        def stringReceived(self, request):
            if '.' not in request: # bad request
                self.transport.loseConnection()
                return
     
            xform_name, poem = request.split('.', 1)
     
            self.xformRequestReceived(xform_name, poem)
     
        def xformRequestReceived(self, xform_name, poem):
            new_poem = self.factory.transform(xform_name, poem)
     
            if new_poem is not None:
                self.sendString(new_poem)
     
            self.transport.loseConnection()
            
    class TransformFactory(ServerFactory):
     
        protocol = TransformProtocol
     
        def __init__(self, service):
            self.service = service
     
        def transform(self, xform_name, poem):
            thunk = getattr(self, 'xform_%s' % (xform_name,), None)
     
            if thunk is None: # no such transform
                return None
     
            try:
                return thunk(poem)
            except:
                return None # transform failed
     
        def xform_cummingsify(self, poem):
            return self.service.cummingsify(poem)
        
    class TransformService(object):
     
        def cummingsify(self, poem):
            return poem.lower()
    
    def main():
        service = TransformService()
        factory = PoetryFactory(service)
     
        from twisted.internet import reactor
        port = reactor.listenTCP(options.port or 0, factory,
                                 interface=options.iface)
        reactor.run()

    来看看这段代码,

    首先, TransformProtocol继承自NetstringReceiver而非Protocol, NetstringReceiver是一种专门用来处理string的协议, 这儿可以使用和继承Twisted开发框架提供的各种协议来简化代码, 而不用每次从头开发, 这就是使用框架的好处.

    在TransformProtocol中对于poem具体的transform逻辑上, 调用self.factory.transform, 把变数扔给factory, 而保持protocol的高度抽象, transform逻辑变化,添减, 都保持protocol不需要有任何改动.

    其次, 在TransformFactory中, 使用python强大的getattr来避免使用大量的if…else.

    但这儿只提供了cummingsify service, 如果要增加或删除service, TransformFactory和TransformService难免需要修改...

    这段代码已经写的不错...不过缺少些Twisted的感觉...如果加上deferred的callback机制, 应该可以写出更highlevel的代码.

    Part 13: Deferred All The Way Down

    Introduction

    Recall poetry client 5.1 from Part 10.The client used a Deferred to manage a callback chain that included a call to a poetry transformation engine. In client 5.1, the engine was implemented as a synchronous function call implemented in the client itself.

    Client5.1中异步去获取poem, 然后调用callback函数cummingsify做transform, 现在我们在Part12中实现了TransformService, 即poem transform也要用异步的方式让服务器去完成.

    这其实是个比较自然的想法, 由于reactor的特性, 任何callback都必须是unblock的, 但实际上, 很多callback处理是需要花费较长的时间的, 这个时候在callback内也必须异步处理, 来保证callback本身的unblock, 即callback本身也无法直接返回结果, 而只能返回deferred对象.

    如下图, 当碰到这种inner deferred时,

    The outer deferred needs to wait until the inner deferred is fired. Of course, the outer deferred can’t block either, so instead the outer deferred suspends the execution of the callback chain and returns control to the reactor

    image

    And how does the outer deferred know when to resume? Simple — by adding a callback/errback pair to the inner deferred. Thus, when the inner deferred is fired the outer deferred will resume executing its chain. If the inner deferred succeeds (i.e., it calls the callback added by the outer deferred), then the outer deferred calls its N+1 callback with the result. And if the inner deferred fails (calls the errback added by the outer deferred), the outer deferred calls the N+1 errback with the failure.

    image 

    下面这段代码给出了怎么样封装inner deferred来提供异步callback,

    class TransformClientProtocol(NetstringReceiver):
     
        def connectionMade(self):
            self.sendRequest(self.factory.xform_name, self.factory.poem)
     
        def sendRequest(self, xform_name, poem):
            self.sendString(xform_name + '.' + poem)
     
        def stringReceived(self, s):
            self.transport.loseConnection()
            self.poemReceived(s)
     
        def poemReceived(self, poem):
            self.factory.handlePoem(poem)
            
    class TransformClientFactory(ClientFactory):
     
        protocol = TransformClientProtocol
     
        def __init__(self, xform_name, poem):
            self.xform_name = xform_name
            self.poem = poem
            self.deferred = defer.Deferred()
     
        def handlePoem(self, poem):
            d, self.deferred = self.deferred, None
            d.callback(poem)
     
        def clientConnectionLost(self, _, reason):
            if self.deferred is not None:
                d, self.deferred = self.deferred, None
                d.errback(reason)
     
        clientConnectionFailed = clientConnectionLost
        
    class TransformProxy(object):
        """
        I proxy requests to a transformation service.
        """
     
        def __init__(self, host, port):
            self.host = host
            self.port = port
     
        def xform(self, xform_name, poem):
            factory = TransformClientFactory(xform_name, poem)
            from twisted.internet import reactor
            reactor.connectTCP(self.host, self.port, factory)
            return factory.deferred
    
    def cummingsify(poem):
        d = proxy.xform('cummingsify', poem)
     
        def fail(err):
            print >>sys.stderr, 'Cummingsify failed!'
            return poem
     
        return d.addErrback(fail)

    最后这个函数就是封装好的异步callback, 大家可以和之前part10的callback对比一下...

    def cummingsify(poem):
        print 'First callback, cummingsify'
        poem = engine.cummingsify(poem)
        return poem
    
    def cummingsify_failed(err):
        if err.check(GibberishError):
            print 'Second errback, cummingsify_failed, use original poem'
            return err.value.args[0] #return original poem
        return err

    再来看一下part10中的callback顺序图, 此时cummingsify为异步callback, cummingsify_failed被加到inner deferred中, 当这个inner deferred被fired时, outer deferred会根据inner deferred情况去调用, got_poem或poem_failed. 其中具体过程似乎是透明的...或者说我也不清楚

    作者在这儿也没有讲清, 个人认为这儿如果能参照Part10给个完整的代码例子, 会更清晰一些...

    image_thumb[9]

    Part 14: When a Deferred Isn’t

    We’ll make a caching proxy server. When a client connects to the proxy, the proxy will either fetch the poem from the external server or return a cached copy of a previously retrieved poem.
    这儿可见, 如果是直接从cache返回的话可以直接同步处理, 如需要去external server获取的话就需要异步处理.

    这样的有时需要同步, 有时需要异步的情况, 怎么办?

    如下代码中, get_poem可能返回的是poem, 也有可能是deferred对象, 对于调用者怎么处理...

    class ProxyService(object):
     
        poem = None # the cached poem
     
        def __init__(self, host, port):
            self.host = host
            self.port = port
     
        def get_poem(self):
            if self.poem is not None:
                print 'Using cached poem.'
                return self.poem
     
            print 'Fetching poem from server.'
            factory = PoetryClientFactory()
            factory.deferred.addCallback(self.set_poem)
            from twisted.internet import reactor
            reactor.connectTCP(self.host, self.port, factory)
            return factory.deferred
     
        def set_poem(self, poem):
            self.poem = poem
            return poem
    
    class PoetryProxyProtocol(Protocol):
     
        def connectionMade(self):
            d = maybeDeferred(self.factory.service.get_poem)
            d.addCallback(self.transport.write)
            d.addBoth(lambda r: self.transport.loseConnection())
     
    class PoetryProxyFactory(ServerFactory):
     
        protocol = PoetryProxyProtocol
     
        def __init__(self, service):
            self.service = service

    使用maybeDeferred来解决这个问题, 这个函数会把poem也封装成一个already-fired deferred

      • If the function returns a deferred, maybeDeferred returns that same deferred, or
      • If the function returns a Failure, maybeDeferred returns a new deferred that has been fired (via .errback) with that Failure, or
      • If the function returns a regular value, maybeDeferred returns a deferred that has already been fired with that value as the result, or
      • If the function raises an exception, maybeDeferred returns a deferred that has already been fired (via .errback()) with that exception wrapped in a Failure.

    An already-fired deferred may fire the new callback (or errback, depending on the state of the deferred) immediately, i.e., right when you add it.

    或者使用succeed函数, The defer.succeed function is just a handy way to make an already-fired deferred given a result.

    def get_poem(self):
        if self.poem is not None:
            print 'Using cached poem.'
            # return an already-fired deferred
            return succeed(self.poem)
     
        print 'Fetching poem from server.'
        factory = PoetryClientFactory()
        factory.deferred.addCallback(self.set_poem)
        from twisted.internet import reactor
        reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred
  • 相关阅读:
    ContextLoaderListener和Spring MVC中的DispatcherServlet学习 随手记
    Linux目录详解
    9---PIP 管理工具的使用
    8. Python自定义模块humansize
    1. Hyper上的CentOS 6.5 网络配置
    Python基础学习8---list列表的操作
    Python基础学习6---存储器
    Python基础学习3---数据结构
    Python基础学习2---模块
    Python基础学习1---函数
  • 原文地址:https://www.cnblogs.com/fxjwind/p/2177172.html
Copyright © 2011-2022 走看看