在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.
fire-and-forget
RPC
这里有两件事情要保证:
- 要为队列创建随机Name
- 即使Name随机还是有可能冲突,还需要保证消息通信的独占性。
看看RabbitMQ是怎么满足这两点的:
- 如果创建的队列不指定queue name,RabbitMQ就会创建一个随机的Name.
- 独占只需要exclusive参数即可
总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC 消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指定Exchange(后面我们会提到在实现层面Queue和Exchange的不同,简单讲 queue会有对应的Erlang进程,而exchang只是执行一些模式匹配的检查并没有进程实体对应).看下图:
Our RPC will work like this:
- When the Client starts up, it creates an anonymous exclusive callback queue.
- For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
- The request is sent to an rpc_queue queue.
- The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
- The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application
RPC Client端的代码如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (Exception ignore) {} } } } }
RPC SEVER端代码如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; //计算斐波切纳数列 private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }
略有不同
传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.
用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.
总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:
- 容错 一个Server崩溃不影响 Client
- 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
- 在多个RPC Server之间的负载均衡由RabbitMQ完成