zoukankan      html  css  js  c++  java
  • RPC

    RPC

    1. 当客户端启动,它创建一个匿名的并且是exclusive的回调queue。

    2. 在一次RPC请求中,客户端发送的消息有两个属性:replyTo,放置的是回调queue的信息。correlationId,放置的是每个请求唯一的值。

    3. 请求被发送到一个rpc_queue中。

    4. RPC服务端在queue的另一端等待请求。当请求到来时,它处理任务并将消息的结果发送回客户端,使用replyTo中设置的queue。

    5. 客户端在回调queue中等待响应的数据,当消息出现时,它先检查correlationId属性。如果匹配的话就将结果返回到应用中。

    Callback queue

    使用RabbitMQ来进行RPC是非常简单的。客户端发送一个请求到服务端,服务端接收后返回响应的消息。为了接收到响应的消息,我们需要在请求中发送一个callback 的queue地址。我们可以使用默认的queue(在Java的client中它是exclusive的)。

     1 callbackQueueName = channel.queueDeclare().getQueue();
     2 
     3 BasicProperties props = new BasicProperties
     4                             .Builder()
     5                             .replyTo(callbackQueueName)
     6                             .build();
     7 
     8 channel.basicPublish("", "rpc_queue", props, message.getBytes());
     9 
    10 // ... then code to read a response message from the callback_queue ...

    Message properties

    AMQP协议预定义了消息的14种属性。大部分的都很少使用,除了以下这些:

    • deliveryMode:标记一条消息是持久化的(使用值2)还是非持久化的(使用其它值)。在第二节中有过介绍。

    • contentType:用来描述mime类型的编码。例如使用JSON的话就这样设置属性:application/json

    • replyTo:一般用来命名一个回调queue。

    • correlationId:用来关联RPC的请求和响应。

    在之前的方法中我们建议为每个RPC请求创建一个回调queue。这显得有点影响性能,幸运的是有一种更好的方式——每个客户端只创建一个回调queue。 但这产生了一个新问题,无法将相应的Response和Request对应起来。这个时候就需要用到correlationId属性。对于每个请求它都将有一个唯一的值。 当我们在回调queue中接收到消息之后,检查该属性,看是否与Request匹配。如果是一个未知的correlationId值,那么我们可以安全的忽略这条消息, 因为它不属于我们的请求。

    你也许会问,为什么我们应该忽略回调queue中未知的消息而不是抛出异常?这是因为服务端可能会出现竞争条件。尽管不太常见,但是也有可能RPC server在发送响应后挂了, 并且也没有接收到客户端发送的ack。如果发生了这种情况,RPC server在重启后将会重新处理这个请求。这就是为什么在客户端我们需要优雅的处理重复的响应, RPC应该是幂等的。

     1 package com.rabbitmq.www.publish_subscribe.rpc;
     2 
     3 import com.rabbitmq.client.ConnectionFactory;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.DefaultConsumer;
     7 import com.rabbitmq.client.AMQP;
     8 import com.rabbitmq.client.Envelope;
     9 
    10 import java.io.IOException;
    11 import java.util.UUID;
    12 import java.util.concurrent.ArrayBlockingQueue;
    13 import java.util.concurrent.BlockingQueue;
    14 import java.util.concurrent.TimeoutException;
    15 
    16 public class RPCClient {
    17 
    18   private Connection connection;
    19   private Channel channel;
    20   private String requestQueueName = "rpc_queue";
    21   private String replyQueueName;
    22   
    23   private final static String HOST_ADDR = "172.18.112.102";
    24 
    25   public RPCClient() throws IOException, TimeoutException {
    26     ConnectionFactory factory = new ConnectionFactory();
    27     factory.setHost(HOST_ADDR);
    28 
    29     connection = factory.newConnection();
    30     channel = connection.createChannel();
    31 
    32     replyQueueName = channel.queueDeclare().getQueue();
    33   }
    34 
    35   public String call(String message) throws IOException, InterruptedException {
    36     String corrId = UUID.randomUUID().toString();
    37 
    38     AMQP.BasicProperties props = new AMQP.BasicProperties
    39             .Builder()
    40             .correlationId(corrId)
    41             .replyTo(replyQueueName)
    42             .build();
    43     //调用服务端请求(在制定的queue上发布消息)
    44     channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    45 
    46     final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    47 
    48     channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
    49       @Override
    50       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    51         if (properties.getCorrelationId().equals(corrId)) {
    52           response.offer(new String(body, "UTF-8"));
    53         }
    54       }
    55     });
    56 
    57     return response.take();
    58   }
    59 
    60   public void close() throws IOException {
    61     connection.close();
    62   }
    63 
    64   public static void main(String[] argv) {
    65     RPCClient fibonacciRpc = null;
    66     String response = null;
    67     try {
    68       fibonacciRpc = new RPCClient();
    69 
    70       System.out.println(" [x] Requesting fib(30)");
    71       response = fibonacciRpc.call("30");
    72       System.out.println(" [.] Got '" + response + "'");
    73     }
    74     catch  (IOException | TimeoutException | InterruptedException e) {
    75       e.printStackTrace();
    76     }
    77     finally {
    78       if (fibonacciRpc!= null) {
    79         try {
    80           fibonacciRpc.close();
    81         }
    82         catch (IOException _ignore) {}
    83       }
    84     }
    85   }
    86 }
    package com.rabbitmq.www.publish_subscribe.rpc;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RPCServer {
    
      private static final String RPC_QUEUE_NAME = "rpc_queue";
      
      private final static String HOST_ADDR = "172.18.112.102";
    
      private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
      }
    
      public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDR);
    
        Connection connection = null;
        try {
          connection      = factory.newConnection();
          Channel channel = connection.createChannel();
    
          channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    
          channel.basicQos(1);
    
          System.out.println(" [x] Awaiting RPC requests");
    
          Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                      .Builder()
                      .correlationId(properties.getCorrelationId())
                      .build();
    
              String response = "";
    
              try {
                String message = new String(body,"UTF-8");
                int n = Integer.parseInt(message);
    
                System.out.println(" [.] fib(" + message + ")");
                response += fib(n);
              }
              catch (RuntimeException e){
                System.out.println(" [.] " + e.toString());
              }
              finally {
                channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
    
                channel.basicAck(envelope.getDeliveryTag(), false);
              }
            }
          };
    
          channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    
          //loop to prevent reaching finally block
          while(true) {
            try {
              Thread.sleep(100);
            } catch (InterruptedException _ignore) {}
          }
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
        finally {
          if (connection != null)
            try {
              connection.close();
            } catch (IOException _ignore) {}
        }
      }
    }

    server端的代码非常直观:

    • 首先创建一个连接、channel和声明一个queue。

    • 我们也许想要运行不止一个服务端进程。为了在多个server间做到负载均衡,通过channel.basicQos设置prefetchCount

    • 我们使用basicConsume来进入queue。然后使用无限循环来等待请求的消息,处理之后再返回响应。

    客户端代码有一点点的复杂:

    • 我们创建连接和channel,以及声明一个exclusive的回调queue用来接收响应的消息。

    • 订阅回调queue,这样就可以接收到RPC服务端响应的消息。

    • call方法发出一个RPC请求。

    • 我们首先生成一个唯一的correlationId数字并且保存它——在while循环中使用它来匹配相应的response。

    • 下一步,发送请求的消息,使用两个属性:replyTocorrelationId

    • 之后就是等待响应的消息返回。

    • 在while循环中做了一些简单的工作,检查响应的消息的correlationId是否与Request相匹配。如果是的话,则保存响应。

    • 最终向用户返回响应。

  • 相关阅读:
    Windows Forms中通过自定义组件实现统一的数据验证(二)
    The WindowsClient.NET Community Site Launches
    二十六岁,仍在路上
    Visual Studio 2008 Express版本下载
    Page Controller及其在ASP.NET中的实现
    iBATIS In Action:使用映射语句(二)
    在VS2005中创建项目模板来提高开发效率
    2007年,听见春天的脚步
    iBATIS In Action:使用映射语句(一)
    iBATIS In Action:序言和目录
  • 原文地址:https://www.cnblogs.com/woms/p/7040873.html
Copyright © 2011-2022 走看看