zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记5-简单的RPC调用

    利用空的queue名字("")让rabbitMQ生成一个唯一的队列名称,同时指定队列是:临时的(auto-delete)、私有的(exclusive)。

    在发送的RPC调用消息里设置消息的属性(com.rabbitmq.client.AMQP.BasicProperties)的reply_to字段来传递上面那个随机的队列名称。

    基本流程如下:

    代码如下:

    服务端RpcServer.java

     1 package com.yzl.test4;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.log4j.Logger;
     6 
     7 import com.rabbitmq.client.AMQP.BasicProperties;
     8 import com.rabbitmq.client.Channel;
     9 import com.rabbitmq.client.Connection;
    10 import com.rabbitmq.client.ConnectionFactory;
    11 import com.rabbitmq.client.DefaultConsumer;
    12 import com.rabbitmq.client.Envelope;
    13 
    14 /**
    15  * 基于rabbitMQ的RPC服务的服务提供者
    16  * 消息消费者角色,被动接收消息,然后回复消息
    17  * @author: yzl
    18  * @date: 2016-10-23
    19  */
    20 public class RpcServer {
    21     //交换器名称
    22     private static final String EXCHANGE_NAME = "rpcDirectExchange";
    23     //client调用的消息存储queue
    24     private static final String QUEUE_NAME = "rpcQueue";
    25     //服务提供方的路由key,实际使用场景会是应用的appid或者appName
    26     private static final String SERVER_ROUTING_KEY = "rpc_server1";
    27     private static final Logger logger = Logger.getLogger(RpcServer.class);
    28     
    29     /**
    30      * @param args
    31      * @throws  
    32      * @throws IOException 
    33      */
    34     public static void main(String[] args) throws Exception {
    35         ConnectionFactory factory = new ConnectionFactory();
    36         factory.setHost("127.0.0.1");
    37         factory.setPort(5672);
    38         factory.setVirtualHost("/");
    39         factory.setUsername("guest");
    40         factory.setPassword("guest");
    41         
    42         Connection connection = factory.newConnection();
    43         final Channel channel = connection.createChannel();
    44         
    45         //定义RPC的direct的交换器
    46         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    47         //声明接收client调动请求的队列
    48         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    49         //绑定队列
    50         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, SERVER_ROUTING_KEY);
    51         
    52         logger.info("服务启动完成,等待接收请求.....");
    53         
    54         //接收client端的请求
    55         channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
    56             @Override
    57             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    58                     throws IOException {
    59                 //回复ack响应
    60                 channel.basicAck(envelope.getDeliveryTag(), false);
    61                 
    62                 logger.info("收到client调用请求-----------");
    63                 String msg = new String(body);
    64                 logger.info("收到的消息如下:" + msg);
    65                 String resp = null;
    66                 
    67                 //模拟解析需要调用的方法
    68                 if(msg.indexOf("add") != -1){
    69                     String parameter = msg.substring(msg.indexOf("(")+1, msg.indexOf(")"));
    70                     String[] args = parameter.split(",");
    71                     resp = String.valueOf(add(Integer.valueOf(args[0]), Integer.valueOf(args[1])));
    72                 }else{
    73                     resp = "method is not found!!";
    74                 }
    75                 logger.info("需要回调的client的queueName:" + properties.getReplyTo());
    76                 try {
    77                     Integer time = new java.util.Random().nextInt(5000);
    78                     logger.info("休眠" + time + "毫秒");
    79                     //随即休眠,模拟服务调用方法正常业务方法耗时
    80                     Thread.sleep(time);
    81                 } catch (InterruptedException e) {
    82                     // TODO Auto-generated catch block
    83                     e.printStackTrace();
    84                 }
    85                 //发送给默认的direct交换器,路由键为发送方的queueName,默认交换机会把消息转发给此路由键名称的queue
    86                 channel.basicPublish("", properties.getReplyTo(), properties, (msg + ", resp is:" + resp).getBytes());
    87                 logger.info("回复clent完成");
    88             }
    89         });
    90     }
    91 
    92     //模拟的server中的方法
    93     private static int add(int num1, int num2){
    94         logger.info("call add method, para num1 is :" + num1 + ",num2 is :" + num2);
    95         return num1 + num2;
    96     }
    97 }

    客户端

      1 package com.yzl.test4;
      2 
      3 import java.io.IOException;
      4 import java.util.UUID;
      5 import java.util.concurrent.CountDownLatch;
      6 import java.util.concurrent.ExecutorService;
      7 import java.util.concurrent.Executors;
      8 
      9 import org.apache.log4j.Logger;
     10 
     11 import com.rabbitmq.client.AMQP.BasicProperties;
     12 import com.rabbitmq.client.AMQP.Queue;
     13 import com.rabbitmq.client.Channel;
     14 import com.rabbitmq.client.Connection;
     15 import com.rabbitmq.client.ConnectionFactory;
     16 import com.rabbitmq.client.DefaultConsumer;
     17 import com.rabbitmq.client.Envelope;
     18 import com.rabbitmq.client.QueueingConsumer;
     19 import com.rabbitmq.client.QueueingConsumer.Delivery;
     20 
     21 /**
     22  * 基于rabbitMQ的RPC服务的服务调用者
     23  * 消费生产者角色,发送调用请求,然后读取回复
     24  * 发送的交换器 和 接收回应的交换器是不同的
     25  * @author: yzl
     26  * @date: 2016-10-23
     27  */
     28 public class RpcClient {
     29     //交换器名称
     30     private static final String EXCHANGE_NAME = "rpcDirectExchange";
     31     //服务提供方的路由key
     32     private static final String SERVER_ROUTING_KEY = "rpc_server1";
     33     
     34     private static final Logger logger = Logger.getLogger(RpcClient.class);
     35     /**
     36      * @param args
     37      */
     38     public static void main(String[] args) throws Exception {
     39         ConnectionFactory factory = new ConnectionFactory();
     40         factory.setHost("127.0.0.1");
     41         factory.setPort(5672);
     42         factory.setVirtualHost("/");
     43         factory.setUsername("guest");
     44         factory.setPassword("guest");
     45         
     46         Connection connection = factory.newConnection();
     47         final Channel channel = connection.createChannel();
     48 
     49         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
     50         
     51         int threadCount = 1;
     52         
     53         //使用CountDownLatch控制10个线程一起运行
     54         final CountDownLatch cdl = new CountDownLatch(threadCount);
     55         
     56         //生成10个线程同时访问服务
     57         ExecutorService pool = Executors.newFixedThreadPool(threadCount);
     58         
     59         for(int i=0; i<threadCount; i++){
     60             final int index = i;
     61             pool.submit(new Runnable() {
     62                 @Override
     63                 public void run() {
     64                     try {
     65                         cdl.await();
     66                         
     67                         //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
     68                         //默认direct exchange                      匿名的、               私有、自动删除
     69                         Queue.DeclareOk queue = channel.queueDeclare("", false, true, true, null);
     70                         //获取rabbit帮我们生成的随即的队列名称,来接收服务端返回的数据
     71                         String queueName = queue.getQueue();
     72                         
     73                         final String messageId = UUID.randomUUID().toString();
     74                         //定义replyTo属性 和 消息ID
     75                         BasicProperties props = new BasicProperties.Builder().replyTo(queueName).correlationId(messageId).build();
     76                         
     77                         logger.info("发送rpc调用请求,消息index:" + index);
     78                         
     79                         //发送RPC方法调用
     80                         channel.basicPublish(EXCHANGE_NAME, SERVER_ROUTING_KEY, props, ("add(" + index + "," + (index+1) + ")").getBytes());
     81                         
     82                         logger.info("等待服务器响应");
     83                         
     84                         //定义队列式的消费者处理器,之前是用的DefaultConsumer
     85                         QueueingConsumer consumer = new QueueingConsumer(channel);
     86                         
     87                         //把消费者处理器和队列对照起来
     88                         channel.basicConsume(queueName, consumer);
     89                         
     90                         //这里会堵塞,直到取到值或者超时
     91                         logger.info("尝试从consumer里取返回的值");
     92                         Delivery delivery = consumer.nextDelivery();
     93                         //Delivery delivery = consumer.nextDelivery(5000);
     94                         logger.info("成功取到消息,开始处理");
     95                         if(delivery.getProperties().getCorrelationId().equals(messageId)){
     96                             logger.info("收到服务器回复-----------");
     97                             String msg = new String(delivery.getBody());
     98                             logger.info("回复消息id:" + delivery.getProperties().getCorrelationId() + "内容:" + msg);
     99                         }
    100                         
    101                         /*
    102                         //该取消息的方式是异步的,不会堵塞,会导致后面的logger.info先于handleDelivery执行
    103                         channel.basicConsume(queueName, new DefaultConsumer(channel){
    104                             @Override
    105                             public void handleDelivery(String consumerTag, Envelope envelope,
    106                                     BasicProperties properties, byte[] body) throws IOException {
    107                                 if(properties.getCorrelationId().equals(messageId)){
    108                                     logger.info("收到服务器回复-----------");
    109                                     String msg = new String(body);
    110                                     logger.info("回复消息id:" + properties.getCorrelationId() + "内容:" + msg);
    111                                 }
    112                             }
    113                         });
    114                         */
    115                         
    116                         logger.info("消息处理完成");
    117                     } catch (Exception e) {
    118                         e.printStackTrace();
    119                     }
    120                 }
    121             });
    122         }
    123         for(int i=0; i<threadCount; i++){
    124             cdl.countDown();
    125         }
    126     }
    127 }

    先运行server后运行client,输入如下:

    1 2016-10-23 21:22:38,961 [com.yzl.test4.RpcServer]-[INFO] 服务启动完成,等待接收请求.....
    2 2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 收到client调用请求-----------
    3 2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 收到的消息如下:add(0,1)
    4 2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] call add method, para num1 is :0,num2 is :1
    5 2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 需要回调的client的queueName:amq.gen-iek4oYqWiqQ-HU7-i2g6mA
    6 2016-10-23 21:22:42,414 [com.yzl.test4.RpcServer]-[INFO] 休眠3999毫秒
    7 2016-10-23 21:22:46,415 [com.yzl.test4.RpcServer]-[INFO] 回复clent完成
    1 2016-10-23 21:22:42,398 [com.yzl.test4.RpcClient]-[INFO] 发送rpc调用请求,消息index:0
    2 2016-10-23 21:22:42,414 [com.yzl.test4.RpcClient]-[INFO] 等待服务器响应
    3 2016-10-23 21:22:42,414 [com.yzl.test4.RpcClient]-[INFO] 尝试从consumer里取返回的值
    4 2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 成功取到消息,开始处理
    5 2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 收到服务器回复-----------
    6 2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 回复消息id:1e4b7fb3-1728-41e2-8cac-b13fe88d5a20内容:add(0,1), resp is:1
    7 2016-10-23 21:22:46,415 [com.yzl.test4.RpcClient]-[INFO] 消息处理完成
  • 相关阅读:
    献给正在奋斗的人
    Delphi TRzTreeView 或者TRzCheckTree或者TTreeView离开焦点还显示灰色的选择状态
    笑话(三)
    王永庆建立企业奖励机制
    DbGridEh表格Tile居中,但是内容左对齐的做法
    DelPhi LockWindowUpdate的函数的用法
    Delphi DbgridEh实现鼠标拖动选中列,并使复选框选中
    国足输球,总结原因
    TPath
    TMemoryStream、String与OleVariant互转
  • 原文地址:https://www.cnblogs.com/yangzhilong/p/5991066.html
Copyright © 2011-2022 走看看