zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结 第七篇:RCP(远程过程调用协议)

    目录

    RabbitMQ学习总结 第一篇:理论篇
    RabbitMQ学习总结 第二篇:快速入门HelloWorld

    RabbitMQ学习总结 第三篇:工作队列Work Queue

    RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

    RabbitMQ学习总结 第五篇:路由Routing

    RabbitMQ学习总结 第六篇:Topic类型的exchange

    RabbitMQ学习总结 第七篇:RCP(远程过程调用协议)

    http://www.cnblogs.com/leocook/p/mq_rabbitmq_2.html 这篇博文中我们实现了怎么去使用work queue来把比较耗时的任务分散给多个worker。

    但是,如果我们想在远程的机器上的一个函数并等待它返回结果,我们应该怎么办呢?这就是另外一种模式了,它被称为RPC(Remote procedure call)。

    本篇博文中我们来实现怎么用RabbitMQ来构建一个RPC系统:一个client(客户端)和一个可扩展的RPC server(服务端)。这里我们来模拟一个返回斐波拉契数的RPC服务。

    1、Client端接口

    为了说明一个RPC服务时怎么工作的,我们来创建一个简单的client类。这里来实现一个名字为call的方法来发送RPC请求,并发生阻塞,直到接收到回复:

    FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
    String result = fibonacciRpc.call("4");
    System.out.println( "fib(4) is " + result);

    RPC注意事项:

    虽然RPC是一种常用的模式,但它也有一些缺陷。当无法确定使用本地调用还是使用RPC时,问题就出来了。有的时候不确定程序的运行环境,这样来做会给程序的调试增加了一定的复杂度。使用RPC并不能够让代码变得更简洁,滥用的话只会让代码变得更不方便维护。

    伴随着上边的问题,咱们来看看下边的建议:

    • 确定能很明显的分辨的出哪些调用是本地调用,哪些是远程调用。
    • 完善系统的文档。清楚的标记出,模块间的依赖关系。
    • 处理错误情况。当RPC服务挂了之后,客户端应该怎么去处理呢?

    当有疑问时避免使用RPC。如果可以的话,你可以使用异步管道(不用RPC-阻塞),结果被异步推送到下一个计算环节。

    2、回调队列(Callback queue)

    一般用RabbitMQ来实现RPC是很简单的。客户端发送一个请求消息然后服务器端回应一个响应消息。为了接收服务端的响应消息,我们需要在请求中发送一个callback queue地址。我们也可以使用一个默认的queue(Java客户端独有的)。如下:

    callbackQueueName = channel.queueDeclare().getQueue();
    
    //绑定callback queue
    BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
    channel.basicPublish("", "rpc_queue", props, message.getBytes());
    
    // ... then code to read a response message from the callback_queue ...

    消息属性:

    AMQP协议在发送消息时,预定义了14个属性连同消息一起发送出去。很多属性都是很少用到的,除了下边的这些:

    消息的投递模型(deliveryMode):使消息持久化,和work queue里的设置一样。

    上下文类型(contentType):用来描述媒体类型(mime-type)。例如常用的JSON格式,它的mime-type是application/json。

    我们需要导包:

    import com.rabbitmq.client.AMQP.BasicProperties;

    3、Correlation Id

    在上边的方法中建议我们为每个RPC请求都创建一个call queue,这样效率很低。我们有更好的办法,为每一个client创建一个call queue。

    这样处理的话又出现了一个新的问题,无法确定接收到的响应是对应哪个请求的。这时候就需要correlationId属性,我们为每一个请求都设置一个correlationId属性。当我们从callback queue中接收到一条消息之后,我们将会查看correlationId属性,这样就可以用一个请求来与之匹配了。如果从callback queue接收到了一条消息后,发现其中的correlationId未能找到与之匹配的请求,那么将把这条消息丢掉。

    你可能会问我们为什么要要在callback queue里忽略掉不知道的message,而不是报错呢?这是因为服务器端可能会出现的一种情况,虽然可能性很小,但还是有可能性的,有可能在RPC发送了响应之后,在发送确认完成任务的信息之前服务器重启了。如果这种情况发生了的话,重启了RPC服务之后,它将会再次接收到之前的请求,这样的话client将会重复处理响应,RPC服务应该是等幂的。

    4、总结

    我们的RPC工作原理如下:

    • 当Client启动时,它将会创建一个匿名的callback queue。
    • 对于一次RPC请求,client会发送一条含有两个属性的消息:replyTocorrelationId。Reply是设置的callback queue,correlationId是设置的当前请求的标示符。
    • 请求将会被发送到rpc_queue里。
    • RPC的worker(RPC server)等待queue中的请求。当出现一个请求之后,他将会处理任务,并向replyTo队列中发送消息。
    • 客户端会等待callback queue上的消息。当消息出现时,它将会检查correlationId属性是否能与之前发送请求时的属性一直,若一致的话,client将会处理回复的消息。

    5、最终实现

    斐波拉契任务:

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    这里定义计算斐波拉契数的方法,假设传进去的整数都是正整数。

    RPC服务端的代码实现如下RPCServer.java:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
      
    public class RPCServer {
      private static final String RPC_QUEUE_NAME = "rpc_queue";
    
      private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
      }
    
      public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
      
          connection = factory.newConnection();
          channel = connection.createChannel();
          
          channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
      
          //一次只接收一条消息
          channel.basicQos(1);
      
          QueueingConsumer consumer = new QueueingConsumer(channel);
    
          //开启消息应答机制
          channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
          System.out.println(" [x] Awaiting RPC requests");
      
          while (true) {
            String response = null;
            
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            
            //拿到correlationId属性
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                                             .Builder()
                                             .correlationId(props.getCorrelationId())
                                             .build();
            
            try {
              String message = new String(delivery.getBody(),"UTF-8");
              int n = Integer.parseInt(message);
      
              System.out.println(" [.] fib(" + message + ")");
              response = "" + fib(n);
            } catch (Exception e){
              System.out.println(" [.] " + e.toString());
              response = "";
            }
            finally {  
              //拿到replyQueue,并绑定为routing key,发送消息
              channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
              //返回消息确认信息
              channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
          }
        }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (connection != null) {
            try {
              connection.close();
            }
            catch (Exception ignore) {}
          }
        }                    
      }
    }

    服务器端代码实现很简单的:

    • 建立连接,信道,声明队列
    • 为了能把任务压力平均的分配到各个worker上,我们在方法channel.basicQos里设置prefetchCount的值。
    • 我们使用basicConsume来接收消息,并等待任务处理,然后发送响应。

    RPC客户端代码实现RPCClient.java

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import java.util.UUID;
    
    public class RPCClient {
      private Connection connection;
      private Channel channel;
      private String requestQueueName = "rpc_queue";
      private String replyQueueName;
      private QueueingConsumer consumer;
    
      public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
    
        //拿到一个匿名(并非真的匿名,拿到了一个随机生成的队列名)的队列,作为replyQueue。
        replyQueueName = channel.queueDeclare().getQueue(); 
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
      }
      
      public String call(String message) throws Exception {     
        String response = null;
        String corrId = UUID.randomUUID().toString();//拿到一个UUID
        
        //封装correlationId和replyQueue属性
        BasicProperties props = new BasicProperties
                                    .Builder()
                                    .correlationId(corrId)
                                    .replyTo(replyQueueName)
                                    .build();
        //推消息,并加上之前封装好的属性
        channel.basicPublish("", requestQueueName, props, message.getBytes());
        
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          //检验correlationId是否匹配,确定是不是这次的请求
          if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody(),"UTF-8");
            break;
          }
        }
    
        return response; 
      }
        
      public void close() throws Exception {
        connection.close();
      }
      
      public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
          fibonacciRpc = new RPCClient();
      
          System.out.println(" [x] Requesting fib(30)");   
          response = fibonacciRpc.call("30");
          System.out.println(" [.] Got '" + response + "'");
        }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (fibonacciRpc!= null) {
            try {
              fibonacciRpc.close();
            }
            catch (Exception ignore) {}
          }
        }
      }
    }

    参考链接:http://www.rabbitmq.com/tutorials/tutorial-six-java.html

  • 相关阅读:
    tabhost中setup()和setup(LocalActivityManager activityGroup)
    android自定义TabWidget
    Android使用Fragment来实现TabHost的功能(解决切换Fragment状态不保存)以及各个Fragment之间的通信
    底部菜单栏(三)Fragment+FragmentTabHost实现仿新浪微博底部菜单栏
    TabHost 两种使用方法 直接让一个Activity 继承TabActivity 和 利用findViwById()方法取得TagHost组件
    android的消息处理机制(图+源码分析)——Looper,Handler,Message
    Java高级--Java线程运行栈信息的获取 getStackTrace()
    Java中的守护线程 & 非守护线程(简介)
    Fragment之间的通信
    CString——Left、Right、Find、ReverseFind
  • 原文地址:https://www.cnblogs.com/leocook/p/mq_rabbitmq_6.html
Copyright © 2011-2022 走看看