zoukankan      html  css  js  c++  java
  • ICE异步程序设计-----AMI/AMD

    1 简介

    AMI


    异步方法调用(AMI) 这个术语描述的是客户端的异步编程模型支持。
    如果你使用AMI 发出远地调用,在Ice run time 等待答复的同时,发出调用的线程不会阻塞。相反,发出调用的线程可以继续进行各种活动,当答复最终到达时, Ice run time 会通知应用。通知是通过回调发给应用提供的编程语言对象的

    AMD

    一个服务器在同一时刻所能支持的同步请求数受到Ice run time 的服务器线程池的尺寸限制(参见15.3 节)。如果所有线程都在忙于分派长时间运行的操作,那么就没有线程可用于处理新的请求,客户就会经验到不可接受的无响应状态。异步方法分派(AMD) 是AMI 的服务器端等价物,能够解决这个可伸缩性问题
    在使用AMD 时,服务器可以接收一个请求,然后挂起其处理,以尽快释放分派线程。当处理恢复、结果已得出时,服务器要使用Ice runtime 提供的回调对象,显式地发送响应。

    使用AMD时,客户端如果需要等待返回值,那就一直等待,否则就继续往下执行。
    为什么使用AMD?
    用实际的术语说, AMD 操作通常会把请求数据(也就是,回调对象和操作参数)放入队列 ,供应用的某个线程(或线程池)随后处理用。这样,服务器就使分派线程的使用率降到了最低限度,能够高效地支持数千并发客户。
    另外, AMD 还可用于需要在完成了客户的请求之后继续进行处理的操作。为了使客户的延迟降到最低限度,操作在返回结果后,仍留在分派线程中,继续用分派线程执行其他工作。
    AMD解决的主要是提高服务端的负荷能力。


    区别:

    (1)AMI使得客户端可以异步,AMD使得服务端可以异步。

    (2)使用AMI服务端代码不需要修改  ,使用AMD客户端代码不需要修改。

    (3)对同步模型的支持

    AMI 的语言映射仍然允许应用使用同步调用模型:如果为某个操作指定AMI 元数据,用于同步调用的代理方法会完整地得以保留;同时,还会生成一个额外的代理方法,用以支持异步调用。但是, AMD 操作的语言映射不允许我们的实现同时使用两种分派模型。如果你指定了AMD 元数据,用于同步分派的方法就会被用于异步分派的方法取代。


    2 使用元数据修改代码生成

    程序员如果想要使用异步模型(AMI、AMD,或两者都使用),需要给Slice 定义批注上元数据(4.17 节)。程序员可以在两个层面上指定这种元数据:接口或类的层面,或单个操作的层面。如果你是在为一个接口或类进行指定,那么为它的所有操作生成的代码都将会有异步支持。而如果只有某些操作需要异步支持,那么你可以只为这些操作指定元数据,从而使生成的代码的数据降到最低限度。

    考虑下面的Slice 定义:
    ["ami"] interface I {
    bool isValid();
    float computeRate();
    };
    interface J {
    ["amd"] void startProcess();
    ["ami", "amd"] int endProcess();
    };
    在这个例子中,为接口I 的所有代理方法生成的代码都将具有同步和异步调用支持。在接口J 中, startProcess 操作使用的是异步分派,而endProcess 操作则支持异步的调用和分派。

    3 使用AMI

    3.1  代码映射

    C++ 代码生成器为每个AMI 操作生成以下代码:

    (1)一个抽象的回调类, Ice run time 用它来通知应用,操作已完成。类名是按这样的模式取的   AMI_class_op。

    这个类提供两个方法:

    void ice_response(<params>);

    void ice_exception(const Ice::Exception &);

    (2)一个额外的代理方法
    其名字是操作在映射后的名字,加上_async。这个方法的返回类型是void,第一个参数是一个智能指针,指向上面描述的回调类的一个实例。其他的参数由操作的各个in 参数组成,次序是声明时的次序。

    例如,假定我们定义了下面这个操作:
    interface I {
    ["ami"] int foo(short s, out long l);
    };
    下面是为操作foo 生成的回调类:
    class AMI_I_foo : public ... {
    public:
    virtual void ice_response(Ice::Int, Ice::Long) = 0;
    virtual void ice_exception(const Ice::Exception &) = 0;
    };
    typedef IceUtil::Handle<AMI_I_foo> AMI_I_fooPtr;
    下面是为操作foo 的异步调用生成的 代理方法:
    void foo_async(const AMI_I_fooPtr &, Ice::Short);


    3.2 一个使用AMI的例子

    为了演示Ice 中的AMI 的用法,让我们定义一个简单的计算引擎的
    Slice 接口
    sequence<float> Row;
    sequence<Row> Grid;
    exception RangeError {};
    interface Model {
    ["ami"] Grid interpolate(Grid data, float factor)
    throws RangeError;
    };


    C++ 客户
    我们首先必须定义我们的回调实现类,它派生自生成的
    AMI_Model_interpolate 类:
    class AMI_Model_interpolateI : public AMI_Model_interpolate {
    public:
    virtual void ice_response(const Grid & result)
    {
    cout << "received the grid" << endl;
    // ... postprocessing ...
    }
    virtual void ice_exception(const Ice::Exception & ex)
    {
    try {
    ex.ice_throw();
    } catch (const RangeError & e) {
    cerr << "interpolate failed: range error" << endl;
    } catch (const Ice::LocalException & e) {
    cerr << "interpolate failed: " << e << endl;
    }
    }
    };

    调用interpolate 的代码同样直截了当:
    ModelPrx model =  ModelPrx::checkedCast(base);
    AMI_Model_interpolatePtr cb = new AMI_Model_interpolateI;
    Grid grid;
    initializeGrid(grid);
    model->interpolate_async(cb, grid, 0.5);
    在获取了Model 对象的代理之后,客户实例化一个回调对象,初始化一个栅格,然后调用interpolate 的异步版本。当Ice run time 接收到对这个请求的响应时,会调用客户提供的回调对象。

    3.3 并发问题
    在Ice 中,异步调用由客户线程池提供支持 ( 第15 章),池中的线程主要负责处理答复消息。理解下列与异步调用相关的并发问题很重要:
    • 一个回调对象不能同时用于多个调用。需要聚合来自多个答复的信息的应用可以创建一个单独的对象,让回调对象对它进行委托。
    • 对回调对象的调用来自Ice run time 的客户线程池中的线程,因此,如果在答复到达的同时,应用可能要与回调对象进行交互,就有可能需要进行同步。
    • 客户线程池中的线程的数目决定了,同时可以为多少异步调用发出回调。客户线程池的缺省尺寸是一,意味着对回调对象的调用是序列化的。如果线程池的尺寸增大了,而同一回调对象被用于多个调用,应用就可能需要进行同步。

    4 使用AMD

    异步分派方法的型构与AMI 方法的类似:返回类型是void,参数由一个回调对象、以及操作的in 参数组成。在AMI 中,回调对象是由应用提供的,而在AMD 中,回调对象是由Ice run time 提供的,同时它还提供了一些方法,用于返回操作的结果,或报告异常。我们的实现不需要在分派方法返回之前调用回调对象;回调对象可以在任何时候,由任何线程调用,但只能被调用一次。


    4.1 代码映射

    C++ 代码生成器为每个AMD 操作生成以下代码:
    (1)一个抽象的回调类.

    实现用它来通知Ice run time,操作已完成。

    类名是按这样的模式取的:AMD_class_op。

    类的方法:

    void ice_response(<params>);

    void ice_exception(const Ice::Exception &);服务器可以用这个版本的ice_exception 报告用户异常或本地异常。

    void ice_exception(const std::exception &);服务器可以用这个版本的ice_exception 报告标准的异常。
    void ice_exception()             服务器可以用这个版本的ice_exception 报告未知异常。

    (2)分派方法.

    其名字有后缀_async。

    异步分派方法,它的第一个参数就是由ICE实现的回调类AMD_class_op ,在这个方法里,我们要两种方案:

    1. 直接做具体工作,完成后在末尾调用回调类的ice_response方法告知客户端已完成。这种方案就和之前普通版的服务端一样,是同步执行的。
    2. 把 回调类和请求所需要的参数放入一个指定的位置,再由其它线程取出执行和通知客户端。这种方案就是异步分派方法,具体实现时还可以有多种方式,如使用命令模 式把参数和具体操作直接封装成一个对象放入队列,然后由另一线程(或线程池)取出执行。

    这个方法的返回类型是void,第一个参数是一个智能指针,指向上面描述的回调类的一个实例。其他的参数由操作的各个in 参数组成,次序是声明时的次序。

    例如,假定我们定义了下面这个操作:
    interface I {
    ["amd"] int foo(short s, out long l);
    };
    下面是为操作foo 生成的回调类:

    class AMD_I_foo : public ... {
    public:
    void ice_response(Ice::Int, Ice::Long);
    void ice_exception(const Ice::Exception &);
    void ice_exception(const std::exception &);
    void ice_exception();
    };
    下面是为操作foo 的异步调用生成的分派方法:
    void foo_async(const AMD_I_fooPtr &, Ice::Short);

    4.2 例子

    sequence<float> Row;
    sequence<Row> Grid;
    exception RangeError {};
    interface Model {
    ["ami", "amd"] Grid interpolate(Grid data, float factor)
    throws RangeError;
    };

    服务端的servant(servant就是服务端实际工作的代码)
    我们的servant 类派生自Model,并且提供了interpolate_async
    方法的定义:
    class ModelI : virtual public Model,
    virtual public IceUtil::Mutex {
    public:
    virtual void interpolate_async(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float,const Ice::Current &);
    private:
    std::list<JobPtr> _jobs;
    };
    interpolate_async 的实现使用了同步来在一个Job 中安全地记录回调对象,并把它放进一个队列中:

    void ModelI::interpolate_async(const AMD_Model_interpolatePtr & cb,const Grid & data,Ice::Float factor,const Ice::Current & current)
    {
    IceUtil::Mutex::Lock sync(*this);
    JobPtr job = new Job(cb, data, factor);
    _jobs.push_back(job);
    }
    在把信息放进队列之后,该操作把控制返回给Ice run time,使分派线程能够去处理另外的请求。一个应用线程从队列中移除下一个Job,并调用
    execute 来进行插值。下面是Job 的定义:
    class Job : public IceUtil::Shared {
    public:
    Job(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float);
    void execute();

    private:
    bool interpolateGrid();

    AMD_Model_interpolatePtr _cb;
    Grid _grid;
    Ice::Float _factor;
    };
    typedef IceUtil::Handle<Job> JobPtr;
    execute 的实现使用interpolateGrid ( 没有给出) 来完成计算工
    作:
    Job::Job(const AMD_Model_interpolatePtr & cb,const Grid & grid,Ice::Float factor) :_cb(cb), _grid(grid), _factor(factor)
    {
    }
    void Job::execute()
    {
    if(!interpolateGrid()) {
    _cb->ice_exception(RangeError());
    return;
    }
    _cb->ice_response(_grid);

    }
    如果interpolateGrid 返回false, ice_exception 就会被调用,表明发生了范围错误。在调用了ice_exception 之后,使用return 语句是必要的,因为ice_exception 并没有抛出异常;它仅仅是整编了参数,并把它发送给客户。如果插值成功, ice_response 会被调用,把修改后的栅格返回给客户。


    一个完整的例子

    (1)Slice定义(hello.ice)

    interface Hello

    {

       ["ami", "amd"] idempotent void sayHello(int delay)

            throws RequestCanceledException;

       void shutdown();

    };

    (2)服务端声明文件代码:(helloI.h)

    #include <Hello.h>

    #include <WorkQueue.h>

    class HelloI : virtual publicDemo::Hello

    {

    public:

       HelloI(const WorkQueuePtr&);

       //分派函数

       virtual void sayHello_async(const Demo::AMD_Hello_sayHelloPtr&, int,const   Ice::Current&);

       virtual void shutdown(const Ice::Current&);

    private:

       WorkQueuePtr _workQueue;

    };

    (3)服务端实现文件代码(helloI.cpp)

    HelloI::HelloI(constWorkQueuePtr& workQueue) :

       _workQueue(workQueue)

    {

    }

    void

    HelloI::sayHello_async(

    constDemo::AMD_Hello_sayHelloPtr& cb,

     int delay, const Ice::Current&)

    {

       if(delay == 0)

       {

            cout << "Hello World!"<< endl;

            cb->ice_response();

       }

       else

       {

            _workQueue->add(cb, delay);

       }

    }


    void

    HelloI::shutdown(constIce::Current& curr)

    {

        cout << "Shutting down..."<< endl;

        _workQueue->destroy();

       curr.adapter->getCommunicator()->shutdown();

    }


    int

    AsyncServer::run(int argc, char*argv[])

    {

       if(argc > 1)

       {

            cerr << appName() <<": too many arguments" << endl;

            return EXIT_FAILURE;

       }

       callbackOnInterrupt();

       Ice::ObjectAdapterPtr adapter =communicator()->createObjectAdapter("Hello");

       _workQueue = new WorkQueue();

        Demo::HelloPtr hello = newHelloI(_workQueue); //定义服务端servrant

       adapter->add(hello, communicator()->stringToIdentity(“hello”));//serverant加入适配器

        // 启动工作队列

       _workQueue->start();

       adapter->activate(); //

       communicator()->waitForShutdown();

       _workQueue->getThreadControl().join();//等待工作队列停止

       return EXIT_SUCCESS;

    }

    class WorkQueue : publicIceUtil::Thread

    {

    public:

       WorkQueue();

       virtual void run();

       //将回调对象加入队列

       void add(const Demo::AMD_Hello_sayHelloPtr&, int);

       void destroy();

    private:

       //存储回调对象以及调用参数的的结构体

       struct CallbackEntry

       {

            Demo::AMD_Hello_sayHelloPtr cb;

            int delay;

       };

       IceUtil::Monitor<IceUtil::Mutex> _monitor;

       std::list<CallbackEntry> _callbacks;

       bool _done;

    };

    typedefIceUtil::Handle<WorkQueue> WorkQueuePtr;


    VoidWorkQueue::run()

    {

       IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);

       while(!_done)

       {

            if(_callbacks.size() == 0)

           {

                _monitor.wait();

           }

            if(_callbacks.size() != 0)

           {

                //从队列中取下一个回调对象

      CallbackEntry entry = _callbacks.front();

      //等待delay

      _monitor.timedWait(IceUtil::Time::milliSeconds(entry.delay));

                if(!_done)

               {

          _callbacks.pop_front();

                    //…处理业务逻辑 Do Something here

                   entry.cb->ice_response();

               }

           }

       }


    void WorkQueue::add(const Demo::AMD_Hello_sayHelloPtr&cb,int delay)

    {

       IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);

       if(!_done)

       {

           CallbackEntry entry;

           entry.cb =cb;

           entry.delay = delay;

            if(_callbacks.size() == 0)

           {

                _monitor.notify();

           }

            _callbacks.push_back(entry);

       }

       else

       {

      //发送异常通知

     cb->ice_exception(Demo::RequestCanceledException());

       }

    }



    5 总结 

    同步的远地调用是对本地的方法调用的自然扩展,它利用了程序员的面向对象编程经验,使初学分布式应用开发的程序员的学习曲线平缓下来。但是,同步调用的阻塞本质使得有些应用任务的实现变得更为困难,甚至不可能,因此, Ice 提供了一个直截了当的接口,你可以用这个接口来访问Ice 的异步设施。如果使用异步方法调用,发出调用的线程可以调用一个操作,然后马上就重获控制,不用阻塞起来等待操作完成。当Ice run time 收到结果时,它会通过回调通知应用。与此类似,异步方法分派允许servant 在任何时候发送操作的结果,而不一定要在操作实现中发送。通过把费时的请求放在队列中,后面再进行处理, servant 可以改善可伸缩性,并节省线程资源。


    注:本文内容主要来源于Ice官方文档

  • 相关阅读:
    管程|| 同步与互斥的实现策略
    sqlserver 用户定义表类型
    C# Post Get 方式发送请求
    linux centos Supervisor守护.netcore进程安装配置
    linux cenos开放端口
    SqlServer DbHelper
    C# 执行js
    sql 多行、一行 互转
    sql 删除完全表中完全重复的数据保留一条
    sql 行列互转
  • 原文地址:https://www.cnblogs.com/catkins/p/5270462.html
Copyright © 2011-2022 走看看