zoukankan      html  css  js  c++  java
  • 柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)

    二、Remote procedure call (RPC)(using the Java client)

    三、Client interface(客户端接口)
    为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果。代码如下:
    在CODE上查看代码片派生到我的代码片
    1. fibonacci_rpc = FibonacciRpcClient()    
    2. result = fibonacci_rpc.call(4)    
    3. print "fib(4) is %r" % (result,)    
    四、 总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:
    1. result = channel.queue_declare(exclusive=True)    
    2. callback_queue = result.method.queue    
    3.     
    4. channel.basic_publish(exchange='',    
    5.                       routing_key='rpc_queue',    
    6.                       properties=pika.BasicProperties(    
    7.                             reply_to = callback_queue,    
    8.                             ),    
    9.                       body=request)    
    10.     
    Message properties

    AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

    • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。
    • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
    • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
    • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。
    四、Correlation Id  在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

    这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

    五、(总结)
    工作流程:
    • 当客户端启动时,它创建了匿名的exclusive callback queue.
    • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
    • 请求将被发送到an rpc_queue queue.
    • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
    • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。
    六、
    Putting it all together
    在CODE上查看代码片派生到我的代码片
    1. private static int fib(int n) throws Exception {  
    2.     if (n == 0) return 0;  
    3.     if (n == 1) return 1;  
    4.     return fib(n-1) + fib(n-2);  
    5. }  
     RPCServer.java :
    
    
    private static final String RPC_QUEUE_NAME = "rpc_queue";  
      
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setHost("localhost");  
      
    Connection connection = factory.newConnection();  
    Channel channel = connection.createChannel();  
      
    channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
      
    channel.basicQos(1);  
      
    QueueingConsumer consumer = new QueueingConsumer(channel);  
    channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
      
    System.out.println(" [x] Awaiting RPC requests");  
      
    while (true) {  
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
      
        BasicProperties props = delivery.getProperties();  
        BasicProperties replyProps = new BasicProperties  
                                         .Builder()  
                                         .correlationId(props.getCorrelationId())  
                                         .build();  
      
        String message = new String(delivery.getBody());  
        int n = Integer.parseInt(message);  
      
        System.out.println(" [.] fib(" + message + ")");  
        String response = "" + fib(n);  
      
        channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());  
      
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
    } 

    服务器代码相当简单:
    • 像往常一样,我们首先建立连接、通道和声明队列。
    • 我们可能想要运行多个服务器进程。为了分散负载同样在多个服务器,我们需要设置在channel.basicQos prefetchCount设置。
    • 我们使用basicConsume访问队列。然后我们进入while循环,我们等待请求消息,并发送响应工作。

     RPCClient.java:

        private Connection connection;  
        private Channel channel;  
        private String requestQueueName = "rpc_queue";  
        private String replyQueueName;  
        private QueueingConsumer consumer;  
          
        public RPCClient() throws Exception {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            connection = factory.newConnection();  
            channel = connection.createChannel();  
          
            replyQueueName = channel.queueDeclare().getQueue();   
            consumer = new QueueingConsumer(channel);  
            channel.basicConsume(replyQueueName, true, consumer);  
        }  
          
        public String call(String message) throws Exception {       
            String response = null;  
            String corrId = java.util.UUID.randomUUID().toString();  
          
            BasicProperties props = new BasicProperties  
                                        .Builder()  
                                        .correlationId(corrId)  
                                        .replyTo(replyQueueName)  
                                        .build();  
          
            channel.basicPublish("", requestQueueName, props, message.getBytes());  
          
            while (true) {  
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
                if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
                    response = new String(delivery.getBody());  
                    break;  
                }  
            }  
          
            return response;   
        }  
          
        public void close() throws Exception {  
            connection.close();  
        }  

    客户端代码部分涉及到:
    • 我们建立了一个"connecttion"(连接) 和 "channel"(通道)并且为replies(回复)声明一个独一无二的"callback"(回调);
    • 我们订阅了"callback"(回调)队列,这样我们就可以收到RPC的回应了;
    • 我们调用的方法是实际的RPC;
    • 接下来我们publish(发布)请求消息,有两个属性,分别是:replyTo 和 correlationId;
    • 在这点,我们可以坐下来,直到适当的响应到达;
    • while循环做了一件非常简单的工作,它会检查每一个消息响应,如果当前的最后,我们将响应给用户;
    客户端请求:
     
        RPCClient fibonacciRpc = new RPCClient();  
          
        System.out.println(" [x] Requesting fib(30)");     
        String response = fibonacciRpc.call("30");  
        System.out.println(" [.] Got '" + response + "'");  
          
        fibonacciRpc.close();  

    现在是时候,该看看我们的整体完整的示例源代码了:RPCClent.java(包括基本的异常处理)和RPCServer.java,像往常一样编译和设置路径(可以参考前面的教程)
     
    RPCClient.java:
        import com.rabbitmq.client.ConnectionFactory;  
        import com.rabbitmq.client.Connection;  
        import com.rabbitmq.client.Channel;  
        import com.rabbitmq.client.QueueingConsumer;  
        import com.rabbitmq.client.AMQP.BasicProperties;  
        import java.util.UUID;  
              
        public class RPCClient {  
              
          private Connection connection;  
          private Channel channel;  
          private String requestQueueName = "rpc_queue";  
          private String replyQueueName;  
          private QueueingConsumer consumer;  
              
          public RPCClient() throws Exception {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            connection = factory.newConnection();  
            channel = connection.createChannel();  
          
            replyQueueName = channel.queueDeclare().getQueue();   
            consumer = new QueueingConsumer(channel);  
            channel.basicConsume(replyQueueName, true, consumer);  
          }  
            
          public String call(String message) throws Exception {       
            String response = null;  
            String corrId = UUID.randomUUID().toString();  
              
            BasicProperties props = new BasicProperties  
                                        .Builder()  
                                        .correlationId(corrId)  
                                        .replyTo(replyQueueName)  
                                        .build();  
              
            channel.basicPublish("", requestQueueName, props, message.getBytes());  
              
            while (true) {  
              QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
              if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
                response = new String(delivery.getBody(),"UTF-8");  
                break;  
              }  
            }  
          
            return response;   
          }  
              
          public void close() throws Exception {  
            connection.close();  
          }  
            
          public static void main(String[] argv) {  
            RPCClient fibonacciRpc = null;  
            String response = null;  
            try {  
              fibonacciRpc = new RPCClient();  
            
              System.out.println(" [x] Requesting fib(30)");     
              response = fibonacciRpc.call("30");  
              System.out.println(" [.] Got '" + response + "'");  
            }  
            catch  (Exception e) {  
              e.printStackTrace();  
            }  
            finally {  
              if (fibonacciRpc!= null) {  
                try {  
                  fibonacciRpc.close();  
                }  
                catch (Exception ignore) {}  
              }  
            }  
          }  
        }<strong>  
        </strong>  

    RPCServer.java:
     
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
      
    public class RPCServer {
      
      private static final String RPC_QUEUE_NAME = "rpc_queue";
      
      private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
      }
        
      public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
      
          connection = factory.newConnection();
          channel = connection.createChannel();
          
          channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
      
          channel.basicQos(1);
      
          QueueingConsumer consumer = new QueueingConsumer(channel);
          channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
      
          System.out.println(" [x] Awaiting RPC requests");
      
          while (true) {
            String response = null;
            
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                                             .Builder()
                                             .correlationId(props.getCorrelationId())
                                             .build();
            
            try {
              String message = new String(delivery.getBody(),"UTF-8");
              int n = Integer.parseInt(message);
      
              System.out.println(" [.] fib(" + message + ")");
              response = "" + fib(n);
            }
            catch (Exception e){
              System.out.println(" [.] " + e.toString());
              response = "";
            }
            finally {  
              channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
      
              channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
          }
        }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (connection != null) {
            try {
              connection.close();
            }
            catch (Exception ignore) {}
          }
        }                    
      }
    }


     
    $ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java
    我们的RPC service现在准备好了,我们开始启动server:
    $ java -cp $CP RPCServer
     [x] Awaiting RPC requests
    发布一个fibonacci 数字,运行在client(客户端):
    $ java -cp $CP RPCClient
     [x] Requesting fib(30)
    本节提供的设计并不是唯一的RPC服务实现,但它还是有一定的优点的:
    • 如果RPC server(服务器)太慢了,你仅仅需要运行另一个,就可以扩展;尝试在新的控制台,运行第二个吧;
    • 在客户端,RPC需要发送和接收的消息只有一个,不需要像queueDeclare 同步调用,因为RPC客户端为了一个RPC请求,只需要一个网络往返;
    我们的代码依然很简单,不试图去解决更加繁杂的问题,但是非常重要,像以下这样:
    • 如果没有服务运行,客户端将怎么去做?
    • 客户端应该有RPC超时么?
    • 如果服务器出现故障,爆出一个异常,应该发给客户端么?
    • 防止传入错误的消息(如范围检查、类型)前处理
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    [Python] Read and Parse Files in Python
    [React] Write Compound Components
    [Python] Reuse Code in Multiple Projects with Python Modules
    [Parcel] Bundle a React App with Parcel
    [Javascript] Specify this using .call() or .apply()
    [Javascript] this in Function Calls
    [Python] Create a Log for your Python application
    [Python] Create Unique Unordered Collections in Python with Set
    [Python] Manipulate Data with Dictionaries in Python
    SVN:常用命令
  • 原文地址:https://www.cnblogs.com/timssd/p/4658914.html
Copyright © 2011-2022 走看看