RPC
-
当客户端启动,它创建一个匿名的并且是exclusive的回调queue。
-
在一次RPC请求中,客户端发送的消息有两个属性:
replyTo
,放置的是回调queue的信息。correlationId
,放置的是每个请求唯一的值。 -
请求被发送到一个rpc_queue中。
-
RPC服务端在queue的另一端等待请求。当请求到来时,它处理任务并将消息的结果发送回客户端,使用
replyTo
中设置的queue。 -
客户端在回调queue中等待响应的数据,当消息出现时,它先检查
correlationId
属性。如果匹配的话就将结果返回到应用中。
Callback queue
使用RabbitMQ来进行RPC是非常简单的。客户端发送一个请求到服务端,服务端接收后返回响应的消息。为了接收到响应的消息,我们需要在请求中发送一个callback 的queue地址。我们可以使用默认的queue(在Java的client中它是exclusive的)。
1 callbackQueueName = channel.queueDeclare().getQueue(); 2 3 BasicProperties props = new BasicProperties 4 .Builder() 5 .replyTo(callbackQueueName) 6 .build(); 7 8 channel.basicPublish("", "rpc_queue", props, message.getBytes()); 9 10 // ... then code to read a response message from the callback_queue ...
AMQP协议预定义了消息的14种属性。大部分的都很少使用,除了以下这些:
-
deliveryMode
:标记一条消息是持久化的(使用值2)还是非持久化的(使用其它值)。在第二节中有过介绍。 -
contentType
:用来描述mime类型的编码。例如使用JSON的话就这样设置属性:application/json
。 -
replyTo
:一般用来命名一个回调queue。 -
correlationId
:用来关联RPC的请求和响应。
在之前的方法中我们建议为每个RPC请求创建一个回调queue。这显得有点影响性能,幸运的是有一种更好的方式——每个客户端只创建一个回调queue。 但这产生了一个新问题,无法将相应的Response和Request对应起来。这个时候就需要用到correlationId
属性。对于每个请求它都将有一个唯一的值。 当我们在回调queue中接收到消息之后,检查该属性,看是否与Request匹配。如果是一个未知的correlationId
值,那么我们可以安全的忽略这条消息, 因为它不属于我们的请求。
你也许会问,为什么我们应该忽略回调queue中未知的消息而不是抛出异常?这是因为服务端可能会出现竞争条件。尽管不太常见,但是也有可能RPC server在发送响应后挂了, 并且也没有接收到客户端发送的ack。如果发生了这种情况,RPC server在重启后将会重新处理这个请求。这就是为什么在客户端我们需要优雅的处理重复的响应, RPC应该是幂等的。
1 package com.rabbitmq.www.publish_subscribe.rpc; 2 3 import com.rabbitmq.client.ConnectionFactory; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.AMQP; 8 import com.rabbitmq.client.Envelope; 9 10 import java.io.IOException; 11 import java.util.UUID; 12 import java.util.concurrent.ArrayBlockingQueue; 13 import java.util.concurrent.BlockingQueue; 14 import java.util.concurrent.TimeoutException; 15 16 public class RPCClient { 17 18 private Connection connection; 19 private Channel channel; 20 private String requestQueueName = "rpc_queue"; 21 private String replyQueueName; 22 23 private final static String HOST_ADDR = "172.18.112.102"; 24 25 public RPCClient() throws IOException, TimeoutException { 26 ConnectionFactory factory = new ConnectionFactory(); 27 factory.setHost(HOST_ADDR); 28 29 connection = factory.newConnection(); 30 channel = connection.createChannel(); 31 32 replyQueueName = channel.queueDeclare().getQueue(); 33 } 34 35 public String call(String message) throws IOException, InterruptedException { 36 String corrId = UUID.randomUUID().toString(); 37 38 AMQP.BasicProperties props = new AMQP.BasicProperties 39 .Builder() 40 .correlationId(corrId) 41 .replyTo(replyQueueName) 42 .build(); 43 //调用服务端请求(在制定的queue上发布消息) 44 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); 45 46 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); 47 48 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { 49 @Override 50 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 51 if (properties.getCorrelationId().equals(corrId)) { 52 response.offer(new String(body, "UTF-8")); 53 } 54 } 55 }); 56 57 return response.take(); 58 } 59 60 public void close() throws IOException { 61 connection.close(); 62 } 63 64 public static void main(String[] argv) { 65 RPCClient fibonacciRpc = null; 66 String response = null; 67 try { 68 fibonacciRpc = new RPCClient(); 69 70 System.out.println(" [x] Requesting fib(30)"); 71 response = fibonacciRpc.call("30"); 72 System.out.println(" [.] Got '" + response + "'"); 73 } 74 catch (IOException | TimeoutException | InterruptedException e) { 75 e.printStackTrace(); 76 } 77 finally { 78 if (fibonacciRpc!= null) { 79 try { 80 fibonacciRpc.close(); 81 } 82 catch (IOException _ignore) {} 83 } 84 } 85 } 86 }
package com.rabbitmq.www.publish_subscribe.rpc; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private final static String HOST_ADDR = "172.18.112.102"; private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDR); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); //loop to prevent reaching finally block while(true) { try { Thread.sleep(100); } catch (InterruptedException _ignore) {} } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
server端的代码非常直观:
-
首先创建一个连接、channel和声明一个queue。
-
我们也许想要运行不止一个服务端进程。为了在多个server间做到负载均衡,通过channel.basicQos设置
prefetchCount
。 -
我们使用
basicConsume
来进入queue。然后使用无限循环来等待请求的消息,处理之后再返回响应。
客户端代码有一点点的复杂:
-
我们创建连接和channel,以及声明一个exclusive的回调queue用来接收响应的消息。
-
订阅回调queue,这样就可以接收到RPC服务端响应的消息。
-
call方法发出一个RPC请求。
-
我们首先生成一个唯一的
correlationId
数字并且保存它——在while循环中使用它来匹配相应的response。 -
下一步,发送请求的消息,使用两个属性:
replyTo
和correlationId
。 -
之后就是等待响应的消息返回。
-
在while循环中做了一些简单的工作,检查响应的消息的
correlationId
是否与Request相匹配。如果是的话,则保存响应。 -
最终向用户返回响应。