zoukankan      html  css  js  c++  java
  • RabbitMQ(五)

      之前将的创建方法和调用方法都是在本地服务上的,而实际情况肯定是在不同服务器上的,这种模式通常被称为远程过程调用或者RPC。

      远程方法调用的注意事项:

         RPC在软件开发中非常常见,也经常被批评。当一个程序员对代码不熟悉的时候,跟踪RPC的性能问题是出在本地还是远程服务器就非常麻烦,对于RPC的使用,有几点需要特别说明:

        • 使用远程调用时的本地函数最好独立出来
        • 保证代码组件之间的依赖关系清晰明了,并用日志记录不同的执行过程和时间
        • 发生客户端运行缓慢或者假死时,先确认RPC服务器是否还活着!
        • 尽量使用异步队列来处理RPC请求,尽量不要用同步阻塞的方式运行RPC请求

      Callback Queue

        一般做rpc在RabbitMQ是比较容易的,一个客户端发送一个请求信息和一个响应信息的服务器回复,为了得到一个响应,我们需要发送一个回调队列地址请求。如下

               

        AMQP协议一共预定义了14个属性,但是大多数属性很少使用,下面几个可能用的比较多

          deliveryMode:有2个值,一个是持久,另一个表示短暂(第二篇说过)

          contentType:内容类型:用来描述编码的MIME类型。例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。

          replyTo:经常使用的是回调队列的名字

          correlationid:RPC响应请求的相关应用

      Correlation Id

        在队列上接收到一个响应,但它并不清楚响应属于哪一个,当我们使用CorrelationId属性的时候,我们就可以将它设置为每个请求的唯一值,稍后当我们在回调队列中接收消息的时候,我们会看到这个属性,如果我们看到一个

        未知的CorrelationId,我们就以安全地忽略信息-它不属于我们的请求。为什么我们应该忽略未知的消息在回调队列中,而不是失败的错误?这是由于服务器端的一个竞争条件的可能性。

        比如还未发送了一个确认信息给请求,但是此时RPC服务器挂了。如果这种情况发生,将再次重启RPC服务器处理请求。这就是为什么在客户端必须处理重复的反应。

               

      我们的rpc工作方式如下:

        1:当客户端启动时,它创建一个匿名的独占回调队列。

        2:对于rpc请求,客户端发送2个属性,一个是replyTo设置回调队列,另一是correlationId为每个队列设置唯一值

        3:请求被发送到一个rpc_queue队列中

        4:rpc服务器是等待队列的请求,当收到一个请求的时候,他就把消息返回的结果返回给客户端,使请求结束。

        5:客户端等待回调队列上的数据,当消息出现的时候,他检查correlationId,如果它和从请求返回的值匹配,就进行响应。

      个人简单总结:其实客户端和服务端都身兼两职,发送方和接收方。

          客户端作为发送方时:创建一个发送请求的队列和随机的回调队列,并将回调队列的名称,correlationId等信息放在创建队列中。

               接收方时:监听对应的回调队列,并判断服务端返回的correlationId是否和发送的一样,是则获取回调队列中的消息,否则忽略。

          服务端作为接收方时:监听客户端请求的队列,获取发送来的主体信息和回调队列的名称、correlationId等信息。

               发送方时:将相应的信息,correlationId等信息发送到回调队列中。

      现在,我们创建一个简单的斐波那契的RPC例子:

      一:服务端代码:

    package com.mq.rpc;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RPCServer {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
        
        private static int fib(int n){
            if(n == 0){
                return 0;
            }
            if(n == 1){
                return 1;
            }
            return fib(n-1) + fib(n-2);
        }
        
        public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false,consumer);
            
            System.out.println("RPCServer Awating RPC request");
            
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                System.out.println("RPCServer get correlationId is :" + props.getCorrelationId());
                
                BasicProperties replyProps = new AMQP.BasicProperties().builder()
                                                .correlationId(props.getCorrelationId()).build();
                System.out.println("RPCServer callback correlationId is :" + replyProps.getCorrelationId());
                
                String message = new String(delivery.getBody(),"UTF-8");
                int num = Integer.parseInt(message);
                System.out.println("RPCServer fib(" + num +")");
                
                String response = "" + fib(num);
                //服务端相应客户端的请求后返回的消息、回调队列的名称,RPC相应请求的相关应用correlationId
                channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes());
                
                //消息完成确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                
            }
        }
    }

      服务端代码比较简单

      1:建立连接,通道,队列

      2:我们可能运行多个服务器进程,为了分散负载服务器压力,我们设置channel.basicQos(1);

      3:我们用basicconsume访问队列。然后进入循环,在其中我们等待请求消息并处理消息然后发送响应。

      二:客户端代码:

    package com.mq.rpc;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RPCClient {
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
        private QueueingConsumer consumer;
        
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
            consumer = new QueueingConsumer(channel);
            
            channel.basicConsume(replyQueueName, true, consumer);
        }
        
        public String call(String message) throws UnsupportedEncodingException, IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            String response;
            //随机生成correlationId
            String corrID = UUID.randomUUID().toString();
            
            //设置回调队列的名称,correlationId等信息
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                     .correlationId(corrID).replyTo(replyQueueName).build();
            
            //客户端发送请求给服务端
            channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    
            //等待服务器的相应
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                if (delivery.getProperties().getCorrelationId().equals(corrID)) {
                    response = new String(delivery.getBody(), "UTF-8");
                    break;
                }
            }
            return response;
        }
        
        public void close() throws Exception {
            connection.close();
        }
        
        public static void main(String[] args) throws Exception {
             RPCClient rpcClient = null;
             String response;
             try{
                 rpcClient = new RPCClient();
                 System.out.println("RPCClient Requesting fib(20)");
                 
                 response = rpcClient.call("20");
                 System.out.println("RPCClient call :" + response);
                 
             }catch(Exception e){
                 e.printStackTrace();
             }finally{
                 if(rpcClient != null){
                     rpcClient.close();
                 }
             }
        }
    }

      客户端代码解读

      1:建立一个连接和通道,并声明了一个唯一的“回调”队列的答复

      2:我们订阅回调队列,这样就可以得到RPC的响应

      3:定义一个call方法用于发送当前的回调请求

      4:生成一个唯一的correlationid,然后通过while循环来捕获合适的回应

      5:我们请求信息,发送2个属性,replyTo 和correlationId

      6:然后就是等待直到有合适的回应到达

      7:while循环是做一个非常简单的工作,对于每一个响应消息,它检查是否有correlationid然后进行匹配。然后是就进行响应。

      8:最后把响应返回到客户端。

      三:先运行服务端进行监听队列,然后运行客户端,运行结果如下:

      

      

         

    作者:哀&RT
    出处:博客园哀&RT的技术博客--http://www.cnblogs.com/Tony-Anne/
    您的支持是对博主最大的鼓励,感谢您的认真阅读。
    本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    黄聪:WordPress wp_head()优化:去除不必要的元素标签(转)
    黄聪:IE6下用控制图片最大显示尺寸
    黄聪:wordpress wp_head()函数 浏览器顶部 空白28px 解决办法(转)
    黄聪:在Photoshop中创建多种样式的网格背景图案(转)
    黄聪:如何WP中获取文章分类名称、分类ID、归档分类链接
    黄聪:Wordpress如何不显示(只显示)置顶文章
    黄聪:淘宝用户在宝贝详情页想看到什么
    黄聪:Windows7立体声混音设置方法(stereo mix)(转)
    黄聪:wordpress博客用Slimbox2实现lightbox效果(免插件)(转)
    黄聪:tor 解决 连接中继目录failed 没有可用的链路
  • 原文地址:https://www.cnblogs.com/Tony-Anne/p/6485162.html
Copyright © 2011-2022 走看看