zoukankan      html  css  js  c++  java
  • 官网英文版学习——RabbitMQ学习笔记(八)Remote procedure call (RPC)

    在第四篇学习笔记中,我们学习了如何使用工作队列在多个工作者之间分配耗时的任务。

     

    但是,如果我们需要在远程计算机上运行一个函数并等待结果呢?这是另一回事。这种模式通常称为远程过程调用或RPC。

     

    在本篇学习笔记中,我们将使用RabbitMQ构建一个RPC系统:客户机和可伸缩的RPC服务器。由于我们没有任何值得分发的耗时任务,所以我们将创建一个返回斐波那契数的虚拟RPC服务。

    为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞,直到收到答案:

    FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
    String result = fibonacciRpc.call("4");
    System.out.println( "fib(4) is " + result);

    通常,在RabbitMQ上执行RPC是很容易的。客户端发送请求消息,服务器用响应消息进行响应。为了接收响应,我们需要向请求发送一个“回调”队列地址。我们可以使用默认队列(在Java客户机中是独占的)。让我们试一试:

    callbackQueueName = channel.queueDeclare().getQueue();
    
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    
    channel.basicPublish("", "rpc_queue", props, message.getBytes());
    
    // ... then code to read a response message from the callback_queue ...

    消息属性

     

    AMQP 0-9-1协议预先定义了一组包含消息的14个属性。大多数属性很少被使用,除了以下情况:

     

    deliveryMode:将消息标记为持久化(值为2)或瞬变(任何其他值)。您可能还记得第二个教程中的这个属性。

     

    contentType:用于描述编码的mime类型。例如,对于通常使用的JSON编码,最好将这个属性设置为:application/ JSON。

     

    replyTo:通常用于命名回调队列。

    correlationid:有助于将RPC响应与请求关联起来。

    correlationId作用

    我们为每个请求设置一个唯一值correctionid。用于当队列接收到响应后区分是哪个请求的响应。稍后,当我们在回调队列中接收到消息时,我们将查看此属性,并基于此,我们将能够将响应与请求匹配。如果我们看到一个未知的correlationId值,我们可以安全地丢弃消息—它不属于我们的请求。

    我们的RPC将这样工作:

    当客户端启动时,它将创建一个匿名独占回调队列(官方教程创建的是匿名的)。

     

    对于RPC请求,客户端发送一条消息,该消息具有两个属性:replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。

     

    请求被发送到rpc_queue队列。

     

    RPC工作程序(即:server)正在等待该队列上的请求。当出现请求时,它会执行该任务并使用replyTo字段中的队列将结果发送回客户机。

    客户端等待回调队列上的数据。当消息出现时,它会检查相关属性。如果它匹配来自请求的值,它将向应用程序返回响应。

    服务端代码:

    package com.rabbitmq.cn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class RPCServer {
    //	定义一个远程队列名称
    	private static final String RPCQUEUENAME = "RPCqueue"; 
    //	斐波那契数函数
    	private static int fib(int n) {
    	    if (n ==0) return 0;
    	    if (n == 1) return 1;
    	    return fib(n-1) + fib(n-2);
    	  }
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂获取连接
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setPort(5672);
    		factory.setPassword("123456");
    		factory.setUsername("admin");
    		Connection connection = null;
    		try{
    //		获得连接
    		connection = factory.newConnection();
    //		创建队列
    		Channel channel = connection.createChannel();
    //		声明一个远程的消息队列
    		channel.queueDeclare(RPCQUEUENAME, false, false, false, null);
    //		为了减轻服务器负担,当多个服务一同工作时,可以设置如下参数
    		channel.basicQos(1);
    		System.out.println(" [x] Awaiting RPC requests");
    //		执行客户端发送的请求任务
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    //				设置返回的消息属性
    				AMQP.BasicProperties replyPros = new BasicProperties().builder()
    						.correlationId(properties.getCorrelationId()).build();
    				String response = "";
    				try {
    //				对传递过来的文字版数字解析,解析后调用函数处理,处理以后的结果作为返回值准备返回给客户端
    					String message = new String(body, "utf-8");
    					int n = Integer.parseInt(message);
    					System.out.println(" [.] fib(" + message + ")");
    		            response += fib(n);
    				} catch (Exception e) {
    					// TODO: handle exception
    				}finally{
    //				返回处理后的结果给客户端
    					channel.basicPublish("", properties.getReplyTo(), replyPros, response.getBytes());
    					channel.basicAck(envelope.getDeliveryTag(), false);
    //			    RabbitMq consumer worker thread notifies the RPC server owner thread 
    		            synchronized(this) {
    		            	this.notify();
    				}
    			}
    		}
    	};
    	/*The RPC worker (aka: server) is waiting for requests on that queue. 
    	When a request appears, it does the job and sends a message with the result back to the Client, 
    	using the queue from the replyTo field.*/
    // 		Wait and be prepared to consume the message from RPC client.
    	  channel.basicConsume(RPCQUEUENAME, false, consumer);
         
          while (true) {
          	synchronized(consumer) {
          		try {
          			consumer.wait();
          	    } catch (InterruptedException e) {
          	    	e.printStackTrace();	    	
          	    }
          	}
          }}catch (IOException | TimeoutException e) {
              e.printStackTrace();
          }
          finally {
            if (connection != null)
              try {
                connection.close();
              } catch (IOException _ignore) {}
        	  
          }
    }}

    服务端代码过程显示,通常我们从建立连接、通道和声明队列开始。

     

    我们可能希望运行多个服务器进程。为了将负载均匀地分布到多个服务器上,我们需要在channel.basicQos中设置prefetchCount设置。

     

    我们使用basicconsumption访问队列,在队列中我们以对象(DefaultConsumer)的形式提供回调,该对象将执行该工作并将响应发送回。

    客户端代码:

    package com.rabbitmq.cn;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    public class RPCClient {
    
      private Connection connection;
      private Channel channel;
      private String requestQueueName = "RPCqueue";
      private String replyQueueName;
    
      public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.10.185");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        
        connection = factory.newConnection();
        channel = connection.createChannel();
    //  创建临时队列
        replyQueueName = channel.queueDeclare().getQueue();
      }
    
      public String call(String message) throws IOException, InterruptedException {
    //	 通过uuid生成请求段的correctionId
        final String corrId = UUID.randomUUID().toString();
    //	设置correctionId和replyTo属性
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
    //	发送请求到请求队列中
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    //	定义一个阻塞的消息队列,来挂起线程等待相应
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    //	消费消息
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (properties.getCorrelationId().equals(corrId)) {
              response.offer(new String(body, "UTF-8"));
            }
          }
        });
    
        return response.take();
      }
    
      public void close() throws IOException {
        connection.close();
      }
    
      public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
    //    通过构造函数获取连接,并创建一个临时匿名的队列
          fibonacciRpc = new RPCClient();
    
          System.out.println(" [x] Requesting fib(30)");
          response = fibonacciRpc.call("30");
          System.out.println(" [.] Got '" + response + "'");
        }
        catch  (IOException | TimeoutException | InterruptedException e) {
          e.printStackTrace();
        }
        finally {
          if (fibonacciRpc!= null) {
            try {
              fibonacciRpc.close();
            }
            catch (IOException _ignore) {}
          }
        }
      }
    }
    
    

    客户端代码稍微复杂一些:

     

    我们建立一个连接和通道,并为回复声明一个独占的“回调”队列。

     

    我们订阅“回调”队列,以便接收RPC响应。

     

    我们的调用方法生成实际的RPC请求。

     

    在这里,我们首先生成一个唯一的correlationId号并保存它——我们在DefaultConsumer中实现的handleDelivery将使用这个值来捕获适当的响应。

     

    接下来,我们发布请求消息,有两个属性:replyTo和correlationId。

     

    此时,我们可以坐下来等待合适的答复。

    由于我们的消费者交付处理是在一个单独的线程中进行的,所以在响应到达之前,我们需要一些东西来挂起主线程。使用BlockingQueue是一种可能的解决方案。这里我们创建了ArrayBlockingQueue,它的容量设置为1,因为我们需要等待一个响应。

     

    handleDelivery方法做的是一项非常简单的工作,对于每个消耗的响应消息,它检查correlationId是否是我们要查找的那个。如果是,它将响应放置到BlockingQueue。

     

    与此同时,主线程正在等待响应从BlockingQueue接收。

    最后,我们将响应返回给用户。

    运行后,我们得到结果

  • 相关阅读:
    297. Serialize and Deserialize Binary Tree
    331. Verify Preorder Serialization of a Binary Tree
    332. Reconstruct Itinerary
    329. Longest Increasing Path in a Matrix
    319. Bulb Switcher
    292. Nim Game
    299. Bulls and Cows
    Ice Cream Tower Gym
    B
    C
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9195133.html
Copyright © 2011-2022 走看看