zoukankan      html  css  js  c++  java
  • Actor模型 异步方法调用,伪代码

    #ifdef _CODEPASTE
    class Message {
    };
    class MQ_Servant
    {
    public:
        MQ_Servant(size_t mq_size);
        // Message queue implementation operations.
        void put_i(const Message &msg);
        Message get_i(void);
        // Predicates.
        bool empty_i(void) const;
        bool full_i(void) const;
    private:
        // Internal Queue representation, e.g., a
        // circular array or a linked list, etc.
    };
    class Message_Future{
    public:
        // Copy constructor binds <this> and <f> to the
        // same <Message_Future_Rep>, which is created if
        // necessary.
        Message_Future(const Message_Future &f);
        // Constructor that initializes <Message_Future> to
        // point to <Message> <m> immediately.
        Message_Future(const Message &m);
        // Assignment operator that binds <this> and <f>
        // to the same <Message_Future_Rep>, which is
        // created if necessary.
        void operator= (const Message_Future &f);
        // ... other constructors/destructors, etc.,
        // Type conversion, which block
        // waiting to obtain the result of the
    // asynchronous method invocation.
        operator Message ();
    };
    class Consumer_Handler
    {
    public:
        Consumer_Handler(void);
        // Put the message into the queue.
        void put(const Message &msg) {
            message_queue_.put(msg);
        }
    private:
        // Proxy to the Active Object.
        MQ_Proxy message_queue_;
        // Connection to the remote consumer.
        SOCK_Stream connection_;
        // Entry point into the new thread.
        static void *svc_run(void *arg);
    };
    Consumer_Handler::Consumer_Handler(void)
    {
        // Spawn a separate thread to get messages
        // from the message queue and send them to
        // the consumer.
        Thread_Manager::instance()->spawn(svc_run,
            this);
    }
    void *
    Consumer_Handler::svc_run(void *args)
    {
        Consumer_Handler *this_obj =
            reinterpret_cast<Consumer_Handler *> (args);
        for (;;) {
            // Conversion of Message_Future from the
            // get() method into a Message causes the
            // thread to block until a message is
            // available.
            Message msg = this_obj->message_queue_.get();
            // Transmit message to the consumer.
            this_obj->connection_.send(msg);
        }
    }
    Supplier_Handler::route_message(const Message &msg)
    {
        // Locate the appropriate consumer based on the
        // address information in the Message.
        Consumer_Handler *ch =
            routing_table_.find(msg.address());
        // Put the Message into the Consumer Handler’s queue.
        ch->put(msg);
    };
    class MQ_Scheduler
    {
    public:
        // Initialize the Activation_Queue to have the
        // specified capacity and make the Scheduler
        // run in its own thread of control.
        MQ_Scheduler(size_t high_water_mark)
            : act_queue_(new Activation_Queue
            (high_water_mark))
        {
            // Spawn a separate thread to dispatch
            // method requests.
            Thread_Manager::instance()->spawn(svc_run,
                this);
        }
        // ... Other constructors/destructors, etc.,
        // Insert the Method Request into
        // the Activation_Queue. This method
        // runs in the thread of its client, i.e.,
        // in the Proxy’s thread.
        void enqueue(Method_Request *method_request) {
            act_queue_->    (method_request);
        }
        // Dispatch the Method Requests on their Servant
        // in the Scheduler’s thread.
        virtual void dispatch(void) {
            // Iterate continuously in a
    // separate thread.
            for (;;) {
                Activation_Queue::iterator i;
                // The iterator’s <begin> call blocks
                // when the <Activation_Queue> is empty.
                for (i = act_queue_->begin();
                    i != act_queue_->end();
                    i++) {
                    // Select a Method Request ‘mr’
                    // whose guard evaluates to true.
                    Method_Request *mr = *i;
                    if (mr->guard()) {
                        // Remove <mr> from the queue first
                        // in case <call> throws an exception.
                        act_queue_->dequeue(mr);
                        mr->call();
                        delete mr;
                    }
                }
            }
        }
    protected:
        // Queue of pending Method_Requests.
        Activation_Queue *act_queue_;
        // Entry point into the new thread.
        static void *svc_run(void *args) {
            MQ_Scheduler *this_obj =
                reinterpret_cast<MQ_Scheduler *> (args);
            this_obj->dispatch();
        }
    };
    
    class Method_Request
    {
    public:
        // Evaluate the synchronization constraint.
        virtual bool guard(void) const = 0;
        // Implement the method.
        virtual void call(void) = 0;
    };
    class Put : public Method_Request
    {
    public:
        Put(MQ_Servant *rep,
            Message arg)
            : servant_(rep), arg_(arg) {}
        virtual bool guard(void) const { // Synchronization constraint: only allow
    // <put_i> calls when the queue is not full.
            return !servant_->full_i();
        }
        virtual void call(void) {
            // Insert message into the servant.
            servant_->put_i(arg_);
        }
    private:
        MQ_Servant *servant_;
        Message arg_;
    };
    class Get : public Method_Request
    {
    public:
        Get(MQ_Servant *rep,
            const Message_Future &f)
            : servant_(rep), result_(f) {}
        bool guard(void) const {
            // Synchronization constraint:
            // cannot call a <get_i> method until
            // the queue is not empty.
            return !servant_->empty_i();
        }
        virtual void call(void) {
            // Bind the dequeued message to the
            // future result object.
            result_ = servant_->get_i();
        }
    private:
        MQ_Servant *servant_;
        // Message_Future result value.
        Message_Future result_;
    };
    class Activation_Queue
    {
    public:
        // Block for an "infinite" amount of time
        // waiting for <enqueue> and <dequeue> methods
        // to complete.
        const int INFINITE = -1;
        // Define a "trait".
        typedef Activation_Queue_Iterator
            iterator;
        // Constructor creates the queue with the
        // specified high water mark that determines
        // its capacity.
        Activation_Queue(size_t high_water_mark);
        // Insert <method_request> into the queue, waiting
        // up to <msec_timeout> amount of time for space
        // to become available in the queue.
        void enqueue(Method_Request *method_request,
            long msec_timeout = INFINITE);
        // Remove <method_request> from the queue, waiting
        // up to <msec_timeout> amount of time for a
        // <method_request> to appear in the queue.
        void dequeue(Method_Request *method_request,
            long msec_timeout = INFINITE);
    private:
        // Synchronization mechanisms, e.g., condition
        // variables and mutexes, and the queue
        // implementation, e.g., an array or a linked
        // list, go here.
        // ...
    };
    class MQ_Proxy
    {
    public:
        // Bound the message queue size.
        enum { MAX_SIZE = 100 };
        MQ_Proxy(size_t size = MAX_SIZE)
            : scheduler_(new MQ_Scheduler(size)),
            servant_(new MQ_Servant(size)) {}
        // Schedule <put> to execute on the active object.
        void put(const Message &m) {
            Method_Request *method_request = new Put(servant_, m);
            scheduler_->enqueue(method_request);
        }
        // Return a Message_Future as the ‘‘future’’
        // result of an asynchronous <get>
        // method on the active object.
        Message_Future get(void) {
            Message_Future result;
            Method_Request *method_request = new Get(servant_, result);
            scheduler_->enqueue(method_request);
            return result;
        }
        // ... empty() and full() predicate implementations ...
    protected:
        // The Servant that implements the
        // Active Object methods.
        MQ_Servant *servant_;
        // A scheduler for the Message Queue.
        MQ_Scheduler *scheduler_;
    };
    #endif
  • 相关阅读:
    剑指offer题解(python版)(更新到第16题)
    Java基础知识详解:值传递
    [LeetCode] 583. Delete Operation for Two Strings
    [LeetCode] 856. Score of Parentheses
    [LeetCode] 1129. Shortest Path with Alternating Colors
    [LeetCode] 1561. Maximum Number of Coins You Can Get
    [LeetCode] 1052. Grumpy Bookstore Owner
    [LeetCode] 991. Broken Calculator
    [LeetCode] 1054. Distant Barcodes
    [LeetCode] 1245. Tree Diameter
  • 原文地址:https://www.cnblogs.com/xuyouzhu/p/15214187.html
Copyright © 2011-2022 走看看