zoukankan      html  css  js  c++  java
  • rabbitMQ学习笔记(七) RPC 远程过程调用

    关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726

       现在来看看Rabbitmq中RPC吧!RPC的工作示意图如下:


       上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:

    1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

    2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

    3、客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

      对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:

    English代码  收藏代码
    1 Message properties
    2 The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
    3 
    4 delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial. 
    5 content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json. 
    6 reply_to: Commonly used to name a callback queue. 
    7 correlation_id: Useful to correlate RPC responses with requests. 

     delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;   

      content_type : 用来描述MIME的类型。如把其类型设定为JSON;

      reply_to : 用于命名一个回调Queue;

      correlation_id : 用于与相关联的请求的RPC响应.

    当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务。

    其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是  

    客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

    示例:

     1 package com.zf.rabbitmq07;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.AMQP.BasicProperties;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 import com.rabbitmq.client.ConsumerCancelledException;
    10 import com.rabbitmq.client.QueueingConsumer;
    11 import com.rabbitmq.client.QueueingConsumer.Delivery;
    12 import com.rabbitmq.client.ShutdownSignalException;
    13 
    14 public class RPCServer {
    15     
    16     public static final String RPC_QUEUE_NAME = "rpc_queue";
    17     
    18     public static String sayHello(String name){
    19         return "hello " + name ;
    20     }
    21     
    22     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    23         
    24         ConnectionFactory connFac = new ConnectionFactory() ;
    25         connFac.setHost("localhost");
    26         
    27         Connection conn = connFac.newConnection() ;
    28         
    29         Channel channel = conn.createChannel() ;
    30         
    31         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
    32         
    33         QueueingConsumer consumer = new QueueingConsumer(channel);
    34         
    35         channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
    36         
    37         while(true){
    38             System.out.println("服务端等待接收消息..");  
    39             Delivery deliver = consumer.nextDelivery() ;
    40             System.out.println("服务端成功收到消息..");
    41             BasicProperties props =  deliver.getProperties() ;
    42             
    43             String message = new String(deliver.getBody() , "UTF-8") ;
    44             
    45             String responseMessage = sayHello(message) ;
    46             
    47             BasicProperties responseProps = new BasicProperties.Builder()
    48             .correlationId(props.getCorrelationId())  
    49             .build() ;
    50             
    51             //将结果返回到客户端Queue
    52             channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
    53              
    54             //向客户端确认消息
    55             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
    56             System.out.println("服务端返回消息完成..");
    57         }
    58         
    59     }
    60 
    61 }
     1 package com.zf.rabbitmq07;
     2 
     3 import java.io.IOException;
     4 import java.util.UUID;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 import com.rabbitmq.client.ConsumerCancelledException;
    10 import com.rabbitmq.client.QueueingConsumer;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 import com.rabbitmq.client.QueueingConsumer.Delivery;
    13 import com.rabbitmq.client.ShutdownSignalException;
    14 
    15 public class RPCClient {
    16 
    17     public static final String RPC_QUEUE_NAME = "rpc_queue";
    18 
    19     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    20 
    21         ConnectionFactory connFac = new ConnectionFactory() ;
    22         connFac.setHost("localhost");
    23         Connection conn = connFac.newConnection() ;
    24         Channel channel = conn.createChannel() ;
    25 
    26         //响应QueueName ,服务端将会把要返回的信息发送到该Queue
    27         String responseQueue = channel.queueDeclare().getQueue() ;
    28 
    29         String correlationId = UUID.randomUUID().toString() ;
    30 
    31         BasicProperties props = new BasicProperties.Builder()
    32         .replyTo(responseQueue)
    33         .correlationId(correlationId)
    34         .build();
    35 
    36         String message = "is_zhoufeng";
    37         channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
    38 
    39         QueueingConsumer consumer = new QueueingConsumer(channel)    ;
    40 
    41         channel.basicConsume( responseQueue , consumer) ;
    42 
    43         while(true){
    44             
    45             Delivery delivery = consumer.nextDelivery() ;
    46             
    47             if(delivery.getProperties().getCorrelationId().equals(correlationId)){
    48                 String result = new String(delivery.getBody()) ;  
    49                 System.out.println(result);
    50             }
    51             
    52         }
    53     }
    54 
    55 }
  • 相关阅读:
    Python自动化运维答疑解惑
    MySQL基础
    Centos下常用MySQL语法
    PDO
    生成静态页面的好处
    页面纯静态
    源码安装LNMP
    Nginx URL重写(rewrite)
    防盗链
    自定义菜单
  • 原文地址:https://www.cnblogs.com/jianliang-Wu/p/5684893.html
Copyright © 2011-2022 走看看