zoukankan      html  css  js  c++  java
  • (转) RabbitMQ学习之远程过程调用(RPC)(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887885

    在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(在Java客户端除外)。

    AMQP协议给消息定义了14个属性。大部分的属性很少使用,除了下面几个:
      deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。
      contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。
      replyTo: 通常用来命名一个回调队列.
      correlationId: 用来关联RPC请求的响应.

    RPC工作流程:

    1)、客户端启动时,创建了一个匿名的回调队列。
    2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。
    3)、请求被发送到rpc_queue队列.
    4)、RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。
    5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。

    1、RPC服务器的RPCServer.java,接收消息调用rpc并返回结果

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.rpc;  
    2.   
    3. import java.security.MessageDigest;  
    4.   
    5. import com.rabbitmq.client.AMQP;  
    6. import com.rabbitmq.client.AMQP.BasicProperties;  
    7. import com.rabbitmq.client.Channel;  
    8. import com.rabbitmq.client.Connection;  
    9. import com.rabbitmq.client.ConnectionFactory;  
    10. import com.rabbitmq.client.QueueingConsumer;  
    11. //RPC调用服务端  
    12. public class RPCServer {  
    13.     private static final String RPC_QUEUE_NAME = "rpc_queue";  
    14.     public static void main(String[] args) throws Exception {  
    15.         //• 先建立连接、通道,并声明队列  
    16.         ConnectionFactory factory = new ConnectionFactory();  
    17.         factory.setHost("192.168.36.217");  
    18.         factory.setUsername("admin");  
    19.         factory.setPassword("admin");  
    20.         factory.setPort(AMQP.PROTOCOL.PORT);  
    21.         Connection connection = factory.newConnection();  
    22.         Channel channel = connection.createChannel();  
    23.         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
    24.         //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。  
    25.         channel.basicQos(1);  
    26.         QueueingConsumer consumer = new QueueingConsumer(channel);  
    27.         //打开应答机制autoAck=false  
    28.         channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
    29.         System.out.println(" [x] Awaiting RPC requests");  
    30.         while (true) {  
    31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    32.             BasicProperties props = delivery.getProperties();  
    33.             BasicProperties replyProps = new BasicProperties.Builder()  
    34.                     .correlationId(props.getCorrelationId()).build();  
    35.             String message = new String(delivery.getBody());  
    36.             System.out.println(" [.] getMd5String(" + message + ")");  
    37.             String response = getMd5String(message);  
    38.             //返回处理结果队列  
    39.             channel.basicPublish("", props.getReplyTo(), replyProps,  
    40.                     response.getBytes());  
    41.             //发送应答   
    42.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
    43.         }  
    44.     }  
    45.     // 模拟RPC方法 获取MD5字符串  
    46.     public static String getMd5String(String str) {  
    47.         MessageDigest md5 = null;  
    48.         try {  
    49.             md5 = MessageDigest.getInstance("MD5");  
    50.         } catch (Exception e) {  
    51.             System.out.println(e.toString());  
    52.             e.printStackTrace();  
    53.             return "";  
    54.         }  
    55.         char[] charArray = str.toCharArray();  
    56.         byte[] byteArray = new byte[charArray.length];  
    57.   
    58.         for (int i = 0; i < charArray.length; i++)  
    59.             byteArray[i] = (byte) charArray[i];  
    60.         byte[] md5Bytes = md5.digest(byteArray);  
    61.         StringBuffer hexValue = new StringBuffer();  
    62.         for (int i = 0; i < md5Bytes.length; i++) {  
    63.             int val = ((int) md5Bytes[i]) & 0xff;  
    64.             if (val < 16)  
    65.                 hexValue.append("0");  
    66.             hexValue.append(Integer.toHexString(val));  
    67.         }  
    68.         return hexValue.toString();  
    69.     }  
    70. }  


    2.客户端RPCClient.java,发送rpc调用消息,接收结果

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.rpc;  
    2.   
    3. import com.rabbitmq.client.AMQP;  
    4. import com.rabbitmq.client.Channel;  
    5. import com.rabbitmq.client.Connection;  
    6. import com.rabbitmq.client.ConnectionFactory;  
    7. import com.rabbitmq.client.QueueingConsumer;  
    8. import com.rabbitmq.client.AMQP.BasicProperties;  
    9.   
    10. //RPC调用客户端  
    11. public class RPCClient {  
    12.     private Connection connection;  
    13.     private Channel channel;  
    14.     private String requestQueueName = "rpc_queue";  
    15.     private String replyQueueName;  
    16.     private QueueingConsumer consumer;  
    17.   
    18.     public RPCClient() throws Exception {  
    19.         //• 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列  
    20.         ConnectionFactory factory = new ConnectionFactory();  
    21.         factory.setHost("192.168.36.217");  
    22.         factory.setUsername("admin");  
    23.         factory.setPassword("admin");  
    24.         factory.setPort(AMQP.PROTOCOL.PORT);  
    25.         connection = factory.newConnection();  
    26.         channel = connection.createChannel();  
    27.         //• 注册'回调'队列,这样就可以收到RPC响应  
    28.         replyQueueName = channel.queueDeclare().getQueue();  
    29.         consumer = new QueueingConsumer(channel);  
    30.         channel.basicConsume(replyQueueName, true, consumer);  
    31.     }  
    32.   
    33.     //发送RPC请求  
    34.     public String call(String message) throws Exception {  
    35.         String response = null;  
    36.         String corrId = java.util.UUID.randomUUID().toString();  
    37.         //发送请求消息,消息使用了两个属性:replyto和correlationId  
    38.         BasicProperties props = new BasicProperties.Builder()  
    39.                 .correlationId(corrId).replyTo(replyQueueName).build();  
    40.         channel.basicPublish("", requestQueueName, props, message.getBytes());  
    41.         //等待接收结果  
    42.         while (true) {  
    43.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    44.             //检查它的correlationId是否是我们所要找的那个  
    45.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
    46.                 response = new String(delivery.getBody());  
    47.                 break;  
    48.             }  
    49.         }  
    50.         return response;  
    51.     }  
    52.     public void close() throws Exception {  
    53.         connection.close();  
    54.     }  
    55. }  

    3、运行client主函数RPCMain.java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.rpc;  
    2.   
    3. public class RPCMain {  
    4.   
    5.     public static void main(String[] args) throws Exception {  
    6.         RPCClient rpcClient = new RPCClient();  
    7.         System.out.println(" [x] Requesting getMd5String(abc)");     
    8.         String response = rpcClient.call("abc");  
    9.         System.out.println(" [.] Got '" + response + "'");  
    10.         rpcClient.close();  
    11.     }  
    12. }  


    先运行服务端,再运行RPCMain,发送消息调用RPC。

    这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点:
    1)如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。
    2)RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。

  • 相关阅读:
    黑色边影,
    拉伸的代码,
    一定是selection的原因啊,要不然呢,
    status bar的差别,
    黄色,
    域名错了,
    node=day4
    PS切片
    移动端插件IScroll.js
    移动web资源概论
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124709.html
Copyright © 2011-2022 走看看