zoukankan      html  css  js  c++  java
  • ZeroC ICE的远程调用框架

    想搞清楚slice为我们生成了什么样的框架代码,就先搞明白Ice的远程调用框架暗中为我们做了些什么?

    Ice将Ice Object的方法调用分为三个阶段(或步骤),分别是begin,process和end。如下图:

    或者说Ice将一次完整的Ice Object方法调用划分成上面三个阶段。基于上面的划分,又将三个阶段应用到远程调用中。其中将begin和end阶段应用在Object Proxy端,而process阶段应用在Object Servant端。远程调用实质在应用ActiveObject模式,模式有两个角色分别是代理Proxy以及主动对象ActiveObject。之所以叫主动对象,是因为对象的执行不在控制线程上,而是有自己专有的线程去执行它的方法。访问主动对象,只能通过其代理在调用方的控制线程上向主动对象发出调用请求。由于应用了ActiveObject模式,调用方和对象执行不在同一线程(,包括不同主机上的线程)。因此产生了同步和异步的问题。所以同样需要应用Future模式。而这个Future的实现就由AsyncResult来担当。

    现在我们就很清楚slice会为我们生成哪些框架性的代码了。

    slice为我们生成上面三阶段的框架代码,begin,process和end三个阶段。begin和end阶段的代码放在代理端(Proxy),也就是stub存根;而process阶段的代码放在饲服端(Servant),也就是skeleton骨架。

    毫无疑问地,begin阶段的框架代码就是为我们完成如何将请求包装成消息包,并发送出去。end阶段的框架代码就是为我们完成如何将回来的响应拆装出结果。所以每一个接口方法,都会在stub类生成一个_iceI_begin_Method的方法,以及五个简单包装使用它的参数重载版本begin_Method的方法,一个end_Method的方法,最后还有一个调用的简易版本Method,这个简易版本的Method只是简单地在使用end_Method(begin_Method())。你也可以自行组织begin和end阶段的函数来完成一次请求。

    process阶段的框架代码理所当然地会放在skeleton端。为什么叫做skeleton骨架,那看它为我们完成了什么工作。它为我们提供了一个_iceDispatch分派调用的函数,它将请求分派到目标的接口方法入口,并交给接口方法的实现体去执行调用。换句话说,每个接口方法,都会在skeleton类生成一个_iceD_Method的入口成员函数,以及一个空的Method虚成员函数。_iceD_Method为我们完成请求参数的拆装,以及输出参数的准备,然后调用Method虚成员函数,最后将结果包装成响应消息包。在这个skeleton骨架中,我们要做的就是如何去实现虚成员函数Method。我们从生成的函数名字就可以加深印象,_iceD_Method就是由_iceDispatch分派调用请求时调用到的入口,这个入口又为我们完成几个框架步骤的工作,并调用我们的实现的虚函数。

    另外一点要注意,在上面说到的框架并没有涉及网络的通讯操作(发送,接收或侦听)。因为网络服务是由Communicator提供,并不是调用框架的一部分,而框架依赖的环境的一个服务。

    还有一点要注意的是,返回结果和out方向的参数并不是分开特别看待的,返回结果与out方向的参数都是响应消息中的payload,请参数《ZeroC ICE的协议》,encpas部分。返回结果是默认是第一个out方向参数,我们所有定义的out方向参数都向后顺延一个位置。

    每个skeleton骨架类都会为我们自动实现ice_id,ice_ids,ice_isA和ice_ping接口方法。

    下面是以Ice项目包中test/acm为例的代码参考:

    ::Test::TestIntfPrx getTestIntf(const ::Ice::Context& context = ::Ice::noExplicitContext)
        {
            return end_getTestIntf(_iceI_begin_getTestIntf(context, ::IceInternal::dummyCallback, 0, true));
        }
    
        ::Ice::AsyncResultPtr begin_getTestIntf(const ::Ice::Context& context = ::Ice::noExplicitContext)
        {
            return _iceI_begin_getTestIntf(context, ::IceInternal::dummyCallback, 0);
        }
    
        ::Ice::AsyncResultPtr begin_getTestIntf(const ::Ice::CallbackPtr& del, const ::Ice::LocalObjectPtr& cookie = 0)
        {
            return _iceI_begin_getTestIntf(::Ice::noExplicitContext, del, cookie);
        }
    
        ::Ice::AsyncResultPtr begin_getTestIntf(const ::Ice::Context& context, const ::Ice::CallbackPtr& del, const ::Ice::LocalObjectPtr& cookie = 0)
        {
            return _iceI_begin_getTestIntf(context, del, cookie);
        }
    
        ::Ice::AsyncResultPtr begin_getTestIntf(const ::Test::Callback_RemoteObjectAdapter_getTestIntfPtr& del, const ::Ice::LocalObjectPtr& cookie = 0)
        {
            return _iceI_begin_getTestIntf(::Ice::noExplicitContext, del, cookie);
        }
    
        ::Ice::AsyncResultPtr begin_getTestIntf(const ::Ice::Context& context, const ::Test::Callback_RemoteObjectAdapter_getTestIntfPtr& del, const ::Ice::LocalObjectPtr& cookie = 0)
        {
            return _iceI_begin_getTestIntf(context, del, cookie);
        }
    
        ::Test::TestIntfPrx end_getTestIntf(const ::Ice::AsyncResultPtr&);
        
    private:
    
        ::Ice::AsyncResultPtr _iceI_begin_getTestIntf(const ::Ice::Context&, const ::IceInternal::CallbackBasePtr&, const ::Ice::LocalObjectPtr& cookie = 0, bool sync = false);
        
    slice为stub存根生成的函数
    ::Ice::AsyncResultPtr
    IceProxy::Test::RemoteObjectAdapter::_iceI_begin_getTestIntf(const ::Ice::Context& context, const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie, bool sync)
    {
        _checkTwowayOnly(iceC_Test_RemoteObjectAdapter_getTestIntf_name, sync);
        ::IceInternal::OutgoingAsyncPtr result = new ::IceInternal::CallbackOutgoing(this, iceC_Test_RemoteObjectAdapter_getTestIntf_name, del, cookie, sync);
        try
        {
            result->prepare(iceC_Test_RemoteObjectAdapter_getTestIntf_name, ::Ice::Normal, context);
            result->writeEmptyParams();
            result->invoke(iceC_Test_RemoteObjectAdapter_getTestIntf_name);
        }
        catch(const ::Ice::Exception& ex)
        {
            result->abort(ex);
        }
        return result;
    }
    
    ::Test::TestIntfPrx
    IceProxy::Test::RemoteObjectAdapter::end_getTestIntf(const ::Ice::AsyncResultPtr& result)
    {
        ::Ice::AsyncResult::check(result, this, iceC_Test_RemoteObjectAdapter_getTestIntf_name);
        ::Test::TestIntfPrx ret;
        if(!result->waitForResponse())
        {
            try
            {
                result->throwUserException();
            }
            catch(const ::Ice::UserException& ex)
            {
                throw ::Ice::UnknownUserException(__FILE__, __LINE__, ex.ice_id());
            }
        }
        ::Ice::InputStream* istr = result->startReadParams();
        istr->read(ret);
        result->endReadParams();
        return ret;
    }
    begin和end阶段的框架实现
    bool
    Test::RemoteObjectAdapter::_iceDispatch(::IceInternal::Incoming& in, const ::Ice::Current& current)
    {
        ::std::pair<const ::std::string*, const ::std::string*> r = ::std::equal_range(iceC_Test_RemoteObjectAdapter_all, iceC_Test_RemoteObjectAdapter_all + 8, current.operation);
        if(r.first == r.second)
        {
            throw ::Ice::OperationNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
        }
    
        switch(r.first - iceC_Test_RemoteObjectAdapter_all)
        {
            case 0:
            {
                return _iceD_activate(in, current);
            }
            case 1:
            {
                return _iceD_deactivate(in, current);
            }
            case 2:
            {
                return _iceD_getTestIntf(in, current);
            }
            case 3:
            {
                return _iceD_hold(in, current);
            }
            case 4:
            {
                return _iceD_ice_id(in, current);
            }
            case 5:
            {
                return _iceD_ice_ids(in, current);
            }
            case 6:
            {
                return _iceD_ice_isA(in, current);
            }
            case 7:
            {
                return _iceD_ice_ping(in, current);
            }
            default:
            {
                assert(false);
                throw ::Ice::OperationNotExistException(__FILE__, __LINE__, current.id, current.facet, current.operation);
            }
        }
    }
    
    bool
    Test::RemoteObjectAdapter::_iceD_getTestIntf(::IceInternal::Incoming& inS, const ::Ice::Current& current)
    {
        _iceCheckMode(::Ice::Normal, current.mode);
        inS.readEmptyParams();
        ::Test::TestIntfPrx ret = this->getTestIntf(current);
        ::Ice::OutputStream* ostr = inS.startWriteParams();
        ostr->write(ret);
        inS.endWriteParams();
        return false;
    }
    skeleton为我们完成的框架工作
    class RemoteObjectAdapter : public virtual ::Ice::Object
    {
    public:
    
       // ...
    
        virtual ::Test::TestIntfPrx getTestIntf(const ::Ice::Current& = ::Ice::noExplicitCurrent) = 0;
        bool _iceD_getTestIntf(::IceInternal::Incoming&, const ::Ice::Current&);
    
        // other generated code by slice
    
        virtual bool _iceDispatch(::IceInternal::Incoming&, const ::Ice::Current&);
    
    protected:
    
        virtual void _iceWriteImpl(::Ice::OutputStream*) const;
        virtual void _iceReadImpl(::Ice::InputStream*);
    };
    slice为skeleton骨架生成的函数

    我们除了使用slice为我们生成的stub类的begin和end函数来进行Ice Object的请求调用外,还可以使用::IceProxy::Ice::object提供的通常版本ice_invoke来完成调用。ice_invoke也是分为begin和end两个阶段,分别对应有函数begin_ice_invoke以及end_ice_invoke。之所以是通用版本,是因为你必须要构建Operation对象,而且接受的参数并不是不逐个逐个地接受的,而是作为两条序列化的流。也就是说,如果使用invoke版本来进行请求调用,你必须手动完成参数的序列包装,和结果的反序列拆装。

    bool
    IceProxy::Ice::Object::ice_invoke(const string& operation,
                                      OperationMode mode,
                                      const vector<Byte>& inEncaps,
                                      vector<Byte>& outEncaps,
                                      const Context& context)
    {
        pair<const Byte*, const Byte*> inPair;
        if(inEncaps.empty())
        {
            inPair.first = inPair.second = 0;
        }
        else
        {
            inPair.first = &inEncaps[0];
            inPair.second = inPair.first + inEncaps.size();
        }
        return ice_invoke(operation, mode, inPair, outEncaps, context);
    }
    
    AsyncResultPtr
    IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation,
                                              OperationMode mode,
                                              const vector<Byte>& inEncaps,
                                              const Context& ctx,
                                              const ::IceInternal::CallbackBasePtr& del,
                                              const ::Ice::LocalObjectPtr& cookie,
                                              bool sync)
    {
        pair<const Byte*, const Byte*> inPair;
        if(inEncaps.empty())
        {
            inPair.first = inPair.second = 0;
        }
        else
        {
            inPair.first = &inEncaps[0];
            inPair.second = inPair.first + inEncaps.size();
        }
        return _iceI_begin_ice_invoke(operation, mode, inPair, ctx, del, cookie);
    }
    
    bool
    IceProxy::Ice::Object::end_ice_invoke(vector<Byte>& outEncaps, const AsyncResultPtr& result)
    {
        AsyncResult::check(result, this, ice_invoke_name);
        bool ok = result->waitForResponse();
        if(_reference->getMode() == Reference::ModeTwoway)
        {
            const Byte* v;
            Int sz;
            result->readParamEncaps(v, sz);
            vector<Byte>(v, v + sz).swap(outEncaps);
        }
        return ok;
    }
    AsyncResultPtr
    IceProxy::Ice::Object::_iceI_begin_ice_invoke(const string& operation,
                                              OperationMode mode,
                                              const pair<const Byte*, const Byte*>& inEncaps,
                                              const Context& ctx,
                                              const ::IceInternal::CallbackBasePtr& del,
                                              const ::Ice::LocalObjectPtr& cookie,
                                              bool sync)
    {
        OutgoingAsyncPtr result = new CallbackOutgoing(this, ice_invoke_name, del, cookie, sync);
        try
        {
            result->prepare(operation, mode, ctx);
            result->writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first));
            result->invoke(operation);
        }
        catch(const Exception& ex)
        {
            result->abort(ex);
        }
        return result;
    }
    
    bool
    IceProxy::Ice::Object::_iceI_end_ice_invoke(pair<const Byte*, const Byte*>& outEncaps, const AsyncResultPtr& result)
    {
        AsyncResult::check(result, this, ice_invoke_name);
        bool ok = result->waitForResponse();
        if(_reference->getMode() == Reference::ModeTwoway)
        {
            Int sz;
            result->readParamEncaps(outEncaps.first, sz);
            outEncaps.second = outEncaps.first + sz;
        }
        return ok;
    }

    这里的inEncaps以及outEncaps的最终版本是pair<const Byte*, const Byte*>类型,其实是一个encaps数组的iter_begin以及iter_end。由于一个encaps只是stream中的一段,没有具体的固定(长度的)类型,所以一个encaps数组也就是一系列一段段的流,它的数组容器就是这一段段的流的总流。

  • 相关阅读:
    Redis
    Redux架构
    Dapper.NET
    JS的异步模式
    Session Redis Nginx
    .NET CORE的TagHelper智能提示
    一个Redis实现的分布式锁
    Redisson使用起来很方便,但是需要redis环境支持eval命令
    The Little Redis Book
    mybatis UpdateByExampleMapper UpdateByExampleSelectiveMapper
  • 原文地址:https://www.cnblogs.com/bbqzsl/p/6559581.html
Copyright © 2011-2022 走看看