zoukankan      html  css  js  c++  java
  • RabbitMQ4--发后即忘和RPC

      在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.

    fire-and-forget 

      RabbitMQ 解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的.这种隔离也就解耦了应用程序之间的依赖.RabbitMQ的角色就是应用程序中间的路由器.对于消息的发布方来讲这是一种"发后即忘"(fire-and_forget)的发布方式.

    RPC

      RPC需要双向通信,或者说RPC Server需要明确知道要把消息发送给谁.我们可以在payload的数据部分附加"发给谁" 这种EndPoint信息. RabbitMQ提供的解决方案:在每一个AMQP的消息头上有一个reply_to字段.这样消息的producer就可以指定Queue name,RPC Server接受到消息检查reply_to字段,创建一个消息包含Response并把queue name作为routing key,订阅了这个队列的Client就拿到了消息.

      这里有两件事情要保证:

    1. 要为队列创建随机Name
    2. 即使Name随机还是有可能冲突,还需要保证消息通信的独占性。

      看看RabbitMQ是怎么满足这两点的:

    1. 如果创建的队列不指定queue name,RabbitMQ就会创建一个随机的Name.
    2.  独占只需要exclusive参数即可

      总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC 消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指定Exchange(后面我们会提到在实现层面Queue和Exchange的不同,简单讲 queue会有对应的Erlang进程,而exchang只是执行一些模式匹配的检查并没有进程实体对应).看下图:

      Our RPC will work like this:

    • When the Client starts up, it creates an anonymous exclusive callback queue.
    • For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
    • The request is sent to an rpc_queue queue.
    • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
    • The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application

     RPC Client端的代码如下:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import java.util.UUID;
    
    public class RPCClient {
      private Connection connection;
      private Channel channel;
      private String requestQueueName = "rpc_queue";
      private String replyQueueName;
      private QueueingConsumer consumer;
    
      public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
    
        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
      }
    
      public String call(String message) throws Exception {
        String response = null;
        String corrId = UUID.randomUUID().toString();
        BasicProperties props = new BasicProperties
                                    .Builder()
                                    .correlationId(corrId)
                                    .replyTo(replyQueueName)
                                    .build();
    
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody(),"UTF-8");
            break;
          }
        }
        return response;
      }
    
      public void close() throws Exception {
        connection.close();
      }
    
      public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
          fibonacciRpc = new RPCClient();
    
          System.out.println(" [x] Requesting fib(30)");
          response = fibonacciRpc.call("30");
                System.out.println(" [.] Got '" + response + "'");
            }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (fibonacciRpc!= null) {
            try {
              fibonacciRpc.close();
            }
            catch (Exception ignore) {}
          }
        }
      }
    }
    View Code

    RPC SEVER端代码如下:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class RPCServer {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
        //计算斐波切纳数列
        private static int fib(int n) {
            if (n == 0)
                return 0;
            if (n == 1)
                return 1;
            return fib(n - 1) + fib(n - 2);
        }
    
        public static void main(String[] argv) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                connection = factory.newConnection();
                channel = connection.createChannel();
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                channel.basicQos(1);
    
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
                System.out.println(" [x] Awaiting RPC requests");
    
                while (true) {
                    String response = null;
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    BasicProperties props = delivery.getProperties();
                    BasicProperties replyProps = new BasicProperties.Builder()
                            .correlationId(props.getCorrelationId()).build();
    
                    try {
                        String message = new String(delivery.getBody(), "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response = "" + fib(n);
                    } catch (Exception e) {
                        System.out.println(" [.] " + e.toString());
                        response = "";
                    } finally {
                        channel.basicPublish("", props.getReplyTo(), replyProps,
                                response.getBytes("UTF-8"));
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ignore) {
                    }
                }
            }
        }
    }
    View Code

    略有不同

      传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.

      用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.

      总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:

    1. 容错 一个Server崩溃不影响 Client
    2. 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
    3. 在多个RPC Server之间的负载均衡由RabbitMQ完成
  • 相关阅读:
    Android Studio 使用起来很卡,是如何解决的?
    前端嵌入视频直播和聊天支持.m3u8格式
    koa & cors原理
    Sequelize基本操作
    vue内置组件有哪些?
    使用element-ui的时候控制台报TypeError: Cannot read property 'disabled' of null错
    js监听页面元素是否变化
    软件版本: Alpha、Beta、RC、Release版
    前端脚手架BigFish
    mini-css-extract-plugin搭配optimize-css-assets-webpack-plugin
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5613604.html
Copyright © 2011-2022 走看看