zoukankan      html  css  js  c++  java
  • RabbitMQ入门(6)——远程过程调用(RPC)

    RabbitMQ入门(2)——工作队列中,我们学习了如何使用工作队列处理在多个工作者之间分配耗时任务。如果我们需要运行远程主机上的某个方法并等待结果怎么办呢?这种模式就是常说的远程过程调用(Remote Procedure Call),简称RPC。

    RPC
    尽管RPC在计算机中是一种常见的模式,却经常饱受诟病。当程序员不知道方法的调用是本地的还是速度慢的RPC时,可能导致系统不可控、代码难以调试。因此,滥用RPC可能导致代码无法维护。

    使用RPC的建议:

    • 明确方法的调用时本地的还是远程的。
    • 提供文档记录,让组件之间的依赖关系清楚明了。
    • 记录错误情况,当RPC服务器长时间宕机时,客户端如何处理

    当有疑问是,要避免使用RPC,如果可以,尽量使用异步管道结果被异步推到下一个阶段,而不是像RPC那样阻塞。

    这一篇我们将使用RabbitMQ建立一个PRC系统:一个客户端和一个可伸缩的PRC服务器。我们将创建一个返回Fibonacci数的虚拟服务。

    客户端接口

    为了说明RPC如何使用,首先创建一个客户端的类,它暴露一个名为call的方法发送RPC请求,直到收到应答。

    FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
    String result = fibonacciRpc.call("4");
    System.out.println( "fib(4) is " + result);
    

    回调队列(Callback queue)

    一般来说,在RabbitMQ上执行RPC是比较简单的。客户端发送请求消息,服务器响应消息。为了接收响应,我们需要发送一个带有“回调”队列地址的请求。可以使用默认的队列,它在java客户端具有唯一性。

    callbackQueueName = channel.queueDeclare().getQueue();
    
    BasicProperties props = new BasicProperties
                                .Builder()
                                .replyTo(callbackQueueName)
                                .build();
    
    channel.basicPublish("", "rpc_queue", props, message.getBytes());
    

    消息属性(Message properties)

    AMQP 0-9-1协议中预先定义了一组14个带有消息的属性。以下几个是比较常用的:

    • deliveryMode :将消息标记为持久的(值为2)或瞬态(任何其他值)。
    • contentType : 用于描述编码的mime类型。例如,对于经常使用的JSON编码,将该属性设置为:application/JSON是一种很好的做法。
    • replyTo : 通常用来命名一个回调队列
    • correlationId : 有助于将RPC响应与请求关联起来

    关联ID(Correlation Id)

    上面的方法中为每一个RPC请求创建一个回调队列的方式效率很低。幸好我们可以为每个客户端创建一个回调队列。然而这又引出了一个新的问题:回调队列中收到的响应我们并不知道是哪个请求发出的。这就是correlationId 属性的作用,我们将为每个请求设置一个唯一的correlationId 。当我们从回调队列收到一条消息时,首先查看correlationId 属性,这样我们就能将请求和响应对应起来了。如果发现一个未知的correlationId,说明它与我们的请求无关,就可以安全地丢弃这条消息了。

    也许你会问为什么忽略回调队列的未知消息而不是处理错误?这可能是服务端的竞态条件导致的。RPC服务器可能在发送响应之后,在发送请求的确认消息之前宕机。这就是为什么我们在客户端必须优雅地处理重复的响应,而RPC在理想情况下应该是幂等的。

    总结

    RPC工作流程

    • 当客户端启动时,它将创建一个匿名的唯一回调队列
    • 对于RPC请求,客户端会发送一个包含两个属性的消息,replyTo属性设置回调队列,correlationId为每一个请求设置唯一的值
    • 请求被发送到rpc_queue队列
    • RPC服务器正在等待该队列上的请求,当请求到达时,服务器执行任务并使用replyTo字段的队列将结果返回给客户端。
    • 客户端等待回调队列。当消息到达时,首先检查correlationId属性值,如果它和请求的值匹配,它将返回对应用程序的响应

    代码清单

    RPCServer:

    package com.xxyh.rabbitmq;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RPCServer {
    
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = null;
            try {
                connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    
                channel.basicQos(1);
    
                System.out.println("等待RPC请求......");
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
    
                        String response = "";
    
                        try {
                            String message = new String(body,"UTF-8");
                            int n = Integer.parseInt(message);
    
                            System.out.println(" 计算fib(" + message + ")");
                            response += fib(n);
                        }
                        catch (RuntimeException e){
                            System.out.println(" [.] " + e.toString());
                        }
                        finally {
                            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
    
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    
                while (true) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private static int fib(int n) {
            if (n == 0) {
                return 0;
            }
            if (n == 1) {
                return 1;
            }
            return fib(n - 1) + fib(n - 2);
        }
    }
    

    RPCClient:

    package com.xxyh.rabbitmq;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    public class RPCClient {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        private Connection connection;
        private Channel channel;
        private String replyQueueName;
    
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
        }
    
        public String call(String message) throws IOException, InterruptedException {
            String corrId = UUID.randomUUID().toString();
    
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("utf-8"));
    
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    
            channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        response.offer(new String(body, "utf-8"));
                    }
                }
            });
            return response.take();
        }
    
        public void close() throws IOException {
            connection.close();
        }
    
        public static void main(String[] args) {
            RPCClient fibonacciRpc = null;
            String response = null;
    
            try {
                fibonacciRpc = new RPCClient();
    
                System.out.println(Thread.currentThread().getName() + " 请求 fib(30)");
                response = fibonacciRpc.call("30");
                System.out.println(Thread.currentThread().getName() + " 获得结果: " + response);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (fibonacciRpc != null) {
                    try {
                        fibonacciRpc.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
  • 相关阅读:
    H5新增——html概述
    H5新增———html5概述
    ASP.NET Web API Demo OwinSelfHost 自宿主 Swagger Swashbuckle 在线文档
    如何写个死循环,既不独占线程,又不阻塞UI线程?
    C# 扩展TaskScheduler实现独立线程池,支持多任务批量处理,互不干扰,无缝兼容Task
    C# async await 异步执行方法封装 替代 BackgroundWorker
    巨坑!
    C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
    一个简单的例子看明白 async await Task
    一个非常轻量级的 Web API Demo
  • 原文地址:https://www.cnblogs.com/xiaoxiaoyihan/p/7355363.html
Copyright © 2011-2022 走看看