zoukankan      html  css  js  c++  java
  • RabbitMq初探——用队列实现RPC

    rabbitmq构造rpc

    前言


     

    rpc——remote procedure call 远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。

    原理


     

    我们称调用远程服务者为Client,远程服务提供者为Server。

    Client充当生产者,将请求发送到rabbitmq队列中,Server作为消费者,处理Client请求产生结果数据result,此刻Server作为生产者,将result

    通过rabbitmq队列传递到Client,Client作为结果数据的消费者,得到result。

    代码


    rpc_client.php

    <?php
    /**
     * Created by PhpStorm.
     * User: 王大西
     * Date: 2017/10/23
     * Time: 16:36
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class RpcClient
    {
        private $connection = null;
        private $channel = null;
        private $callbackQueue = null;
        private $response = null;
        private $corrId = null;
    
        public function __construct()
        {
            $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
    
            list($this->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false);
            $this->channel->basic_consume($this->callbackQueue, '', false, false, false, false, array($this, 'onResponse'));
        }
    
        public function onResponse($rep)
        {
            if ($rep->get('correlation_id') == $this->corrId) {
                $this->response = $rep->body;
            }
        }
    
        public function call($n)
        {
            $this->response = null;
            $this->corrId = uniqid();
    
            $msg = new AMQPMessage((string) $n, array(
                'correlation_id' => $this->corrId,
                'reply_to' => $this->callbackQueue
            ));
    
            $this->channel->basic_publish($msg, '', 'rpc_queue1');
            while (!$this->response) {
                $this->channel->wait();
            }
            return intval($this->response);
        }
    
    }
    
    $number = isset($argv[1]) ? $argv[1] : 30;
    $objRpcClient = new RpcClient();
    $response = $objRpcClient->call($number);
    
    echo " RPC result $response
    ";

     rpc_server.php

    <?php
    /**
     * rpc server
     * Created by PhpStorm.
     * User: 王大西
     * Date: 2017/10/23
     * Time: 16:36
     */
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->queue_declare('rpc_queue1', false, false, false, false);
    
    function fib($n){
        if ($n == 0) {
            return 0;
        }
        if ($n == 1) {
            return 1;
        }
        return fib($n-1) + fib($n-2);
    }
    
    echo " [x] Awaiting RPC requests
    ";
    $callback = function($req){
        $n = intval($req->body);
        //todo $n empty return
        echo " [.] fib(", $n, ")
    ";
    
        $msg = new AMQPMessage((string) fib($n), array('correlation_id' => $req->get("correlation_id")) );
        $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
    
        $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
    };
    
    $channel->basic_qos(null, 1, null);
    $channel->basic_consume('rpc_queue1', '', false, false, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    测试


    server

    client

     

  • 相关阅读:
    PLINQ 简介
    windows phone 网易云阅读hubtile效果实现
    windows phone 生产含logo的二维码
    windows phone 生产二维码和解码本地二维码图片
    element loading源码
    element input-number源码
    element Image组件
    element form源码
    element dropdown源码
    element Divider源码
  • 原文地址:https://www.cnblogs.com/hejun695/p/7722902.html
Copyright © 2011-2022 走看看