zoukankan      html  css  js  c++  java
  • 十、RPC(远程过程调用)

    相关概念

    RPC,是Remote Procedure Call的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。RPC的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。

    通俗点来说,假设有两台服务器A和B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数或者方法,由于不在同一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

    一般在RabbitMQ中进行RPC是很简单。客户端发送请求消息,服务端回复响应的消息。为了接收响应的消息,我们需要在请求消息中发送一个回调队列,直接用默认队列即可,如下面代码:

    String replyQueueName = channel.queueDeclare().getQueue();
    AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    channel.basicPublish("", QUEUENAME, props, message.getBytes("UTF-8"));
    

    代码中的BasicProperties类,包含了14个属性,这里用到了两个属性:

    • replyTo:用来设置一个回调队列。

      如果像上面代码为每个RPC请求创建一个回调队列会很低效,可以为每个客户端创建一个单一的回调队列。

      对于回调队列,在接收到一条回复消息后,并不知道这条消息应该与哪个请求匹配,这时就用到correlationId属性了,只要为每个请求设置一个correlationId,在回调队列接收到回复时,匹配correlationId属性从而匹配到相应请求。

    • correlationId:用来关联请求(request)和其调用RPC之后的回复(response)匹配。

    RPC工作流程:

    1. 客户端启动时,创建了一个匿名的回调队列。
    2. 在一个RPC请求中,客户端发送一个消息,它有两个属性:
      1. replyTo:用来设置回调队列;
      2. correlationId,对于每个请求都被设置唯一的关联ID。
    3. 请求被发送到rpc_queue队列.
    4. RPC服务器等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定队列。
    5. 客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。

    客户端代码:

    public class RPCClient {
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        public static void main(String[] argv) {
            RPCClient rpcClient = null;
            String response = null;
            try {
                rpcClient = new RPCClient();
                for (int i = 0; i < 32; i++) {
                    String i_str = Integer.toString(i);
                    System.out.println("请求数(" + i_str + ")");
                    response = rpcClient.call(i_str);
                    System.out.println("接收返回数 '" + response + "'");
                }
            } catch (IOException | TimeoutException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (rpcClient != null) {
                    try {
                        rpcClient.close();
                    } catch (IOException _ignore) {
                    }
                }
            }
        }
        public RPCClient() throws IOException, TimeoutException {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();
        }
        public String call(String message) throws IOException, InterruptedException {
            //生成correlationId
            final String corrId = UUID.randomUUID().toString();
            //创建回调队列
            String replyQueueName = channel.queueDeclare().getQueue();
            //设置BasicProperties
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)      //设置correlationId
                    .replyTo(replyQueueName)    //设置回调队列
                    .build();
            //向服务端发送消息,并设置BasicProperties参数
            channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
            //使用阻塞队列保存返回的数据,因为服务器返回数据是在一个单独的线程中进行,
            //客户端在获取到数据之前要暂停当前主线程,创建大小为1的ArrayBlockingQueue来保存一条数据
            final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
    
            String ctag = channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //服务器返回的数据后,匹配correlationId后,保存数据
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "UTF-8"));
                    }
                }
            });
            //从阻塞队列中获取数据
            String result = response.take();
            //获取到数据后,取消订阅
            channel.basicCancel(ctag);
            return result;
        }
        public void close() throws IOException {
            connection.close();
        }
    }
    

    服务端代码:

    public class RPCServer {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
        public static void main(String[] argv) {
            Connection connection = null;
            try {
                connection = ConnectionUtils.getConnection();
                final Channel channel = connection.createChannel();
                //创建队列
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                //清空队列
                channel.queuePurge(RPC_QUEUE_NAME);
                //消费端接收的消息数
                channel.basicQos(1);
                System.out.println("等待RPC请求...");
                //打开消息确认机制
                boolean autoAck = false;
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //获取correlationId,设置BasicProperties,以便回调给客户端匹配
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
    
                        String response = "";
                        try {
                            String message = new String(body, "UTF-8");
                            int n = Integer.parseInt(message);
                            System.out.println(" 接收到(" + message + ")");
                            response += fib(n);
                            System.out.println("处理后 = " + response);
                        } catch (RuntimeException e) {
                            System.out.println(" 异常: " + e.toString());
                        } finally {
                            //返回处理后结果给客户端
                            channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                            //发送消费端确认
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            //唤醒等待线程继续等待下一个消息接收
                            synchronized (this) {
                                this.notify();
                            }
                        }
                    }
                };
                channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
                // 等待接收来自RPC客户端的消息
                while (true) {
                    synchronized (consumer) {
                        try {
                            consumer.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException _ignore) {
                    }
                }
            }
        }
        //斐波那契函数
        private static int fib(int n) {
            if (n == 0) {
                return 0;
            }
            if (n == 1) {
                return 1;
            }
            return fib(n - 1) + fib(n - 2);
        }
    }
    

    先运行服务端代码等待客户端消息,在运行客户端代码,会在发送消息给服务端处理后再返回消息。

    上面代码设计只是简单的实现,具有的一些优势:

    • 如果RPC服务端太慢,可以运行多个服务端扩展
    • 在客户端,RPC只需要发送和接收一条消息。不需像queueDeclare的同步调用。对于单个RPC请求,RPC客户端只需要一次网络往返。

    还需要解决的更复杂的问题:

    • 如果没有运行服务器,客户端如何反应
    • 客户端是否应该为RPC设置超时
    • 服务端故障引发异常,是否将其转发给客户端
    • 在处理之前防止无效的传入消息(例如边界检查等)
  • 相关阅读:
    View载入具体解释
    七、备忘录模式Memento(行为型模式)
    排序算法之直接插入排序
    IOS
    Matlab得到二值图像中最大连通区域
    MVC模式利用xib文件定制collectionCell
    五大算法思想—贪心算法
    jQuery鼠标悬停显示提示信息窗体
    J2EE基础总结(5)——EJB
    iOS 打开扬声器以及插入耳机的操作
  • 原文地址:https://www.cnblogs.com/zenghi-home/p/10065317.html
Copyright © 2011-2022 走看看