zoukankan      html  css  js  c++  java
  • RabbitMq初探——用队列实现RPC

    rabbitmq构造rpc

    前言


     

    rpc——remote procedure call 远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。

    原理


     

    我们称调用远程服务者为Client,远程服务提供者为Server。

    Client充当生产者,将请求发送到rabbitmq队列中,Server作为消费者,处理Client请求产生结果数据result,此刻Server作为生产者,将result

    通过rabbitmq队列传递到Client,Client作为结果数据的消费者,得到result。

    代码


    rpc_client.php

    <?php
    /**
     * Created by PhpStorm.
     * User: 王大西
     * Date: 2017/10/23
     * Time: 16:36
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class RpcClient
    {
        private $connection = null;
        private $channel = null;
        private $callbackQueue = null;
        private $response = null;
        private $corrId = null;
    
        public function __construct()
        {
            $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
    
            list($this->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false);
            $this->channel->basic_consume($this->callbackQueue, '', false, false, false, false, array($this, 'onResponse'));
        }
    
        public function onResponse($rep)
        {
            if ($rep->get('correlation_id') == $this->corrId) {
                $this->response = $rep->body;
            }
        }
    
        public function call($n)
        {
            $this->response = null;
            $this->corrId = uniqid();
    
            $msg = new AMQPMessage((string) $n, array(
                'correlation_id' => $this->corrId,
                'reply_to' => $this->callbackQueue
            ));
    
            $this->channel->basic_publish($msg, '', 'rpc_queue1');
            while (!$this->response) {
                $this->channel->wait();
            }
            return intval($this->response);
        }
    
    }
    
    $number = isset($argv[1]) ? $argv[1] : 30;
    $objRpcClient = new RpcClient();
    $response = $objRpcClient->call($number);
    
    echo " RPC result $response
    ";

     rpc_server.php

    <?php
    /**
     * rpc server
     * Created by PhpStorm.
     * User: 王大西
     * Date: 2017/10/23
     * Time: 16:36
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->queue_declare('rpc_queue1', false, false, false, false);
    
    function fib($n){
        if ($n == 0) {
            return 0;
        }
        if ($n == 1) {
            return 1;
        }
        return fib($n-1) + fib($n-2);
    }
    
    echo " [x] Awaiting RPC requests
    ";
    $callback = function($req){
        $n = intval($req->body);
        //todo $n empty return
        echo " [.] fib(", $n, ")
    ";
    
        $msg = new AMQPMessage((string) fib($n), array('correlation_id' => $req->get("correlation_id")) );
        $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
    
        $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
    };
    
    $channel->basic_qos(null, 1, null);
    $channel->basic_consume('rpc_queue1', '', false, false, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    测试


    server

    client

     

  • 相关阅读:
    This counter can increment, decrement or skip ahead by an arbitrary amount
    LUT4/MUXF5/MUXF6 logic : Multiplexer 8:1
    synthesisable VHDL for a fixed ratio frequency divider
    Bucket Brigade FIFO SRL16E ( VHDL )
    srl16e fifo verilog
    DualPort Block RAM with Two Write Ports and Bytewide Write Enable in ReadFirst Mode
    Parametrilayze based on SRL16 shift register FIFO
    stm32 spi sdcard fatfs
    SPI bus master for System09 (2)
    SQLSERVER中的自旋锁
  • 原文地址:https://www.cnblogs.com/hejun695/p/7722902.html
Copyright © 2011-2022 走看看