zoukankan      html  css  js  c++  java
  • RabbitDemo —— RPC实现

    RPCServer:

    public class RPCServer {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = Common.getFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建接收队列
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, true, null);
            //可运行多个服务器进程,可将负载平均分配到多台服务器上
            channel.basicQos(1);
            
            System.out.println("receive start");
            
            Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //CorrelationId:每个请求都被设置成唯一的值(这里要处理后返回给客户端进行判断)
                    BasicProperties replyProps = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build();
                    String msg = new String(body);
                    System.out.println("Get "+msg+" md5:");
                    String msgMd5 = getMd5String(msg);
                    //处理完放至回调队列
                    channel.basicPublish("", properties.getReplyTo(), replyProps, msgMd5.getBytes());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        }
        
        // 模拟RPC方法 获取MD5字符串  
        public static String getMd5String(String str) {  
            MessageDigest md5 = null;  
            try {  
                md5 = MessageDigest.getInstance("MD5");  
            } catch (Exception e) {  
                System.out.println(e.toString());  
                e.printStackTrace();  
                return "";  
            }  
            char[] charArray = str.toCharArray();  
            byte[] byteArray = new byte[charArray.length];  
      
            for (int i = 0; i < charArray.length; i++)  
                byteArray[i] = (byte) charArray[i];  
            byte[] md5Bytes = md5.digest(byteArray);  
            StringBuffer hexValue = new StringBuffer();  
            for (int i = 0; i < md5Bytes.length; i++) {  
                int val = ((int) md5Bytes[i]) & 0xff;  
                if (val < 16)  
                    hexValue.append("0");  
                hexValue.append(Integer.toHexString(val));  
            }  
            return hexValue.toString();  
        }  
    }
    View Code

    RPCClient:

    public class RPCClient {
        private static final String RPC_QUEUE_NAME = "rpc_queue";  
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = Common.getFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建接收队列
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, true, null);
            /**
             * 注册回调队列,用于接收rpc响应
             */
            String replyQueueName = channel.queueDeclare().getQueue();
            String CorrId = UUID.randomUUID().toString();
            //注册关联Id和响应队列名称 
            //correlationId提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来
            //replyTo指定响应队列
            BasicProperties props = new BasicProperties().builder().correlationId(CorrId).replyTo(replyQueueName).build();
            Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    if(properties.getCorrelationId().equals(CorrId)) {
                        String str = new String(body);
                        System.out.println("返回md5消息:"+str);
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //指定回调消费队列
            channel.basicConsume(replyQueueName, false, consumer);
            
            /**
             * 向队列发送消息
             */
            channel.basicPublish("", RPC_QUEUE_NAME, props, "you are".getBytes());
            
    //        channel.close();
    //        connection.close();
        }
    }
    View Code
  • 相关阅读:
    Win7系统重启网卡批处理
    第一个应用程序HelloWorld
    JS异步流程控制(序列模式、并发模式、有限并发模式)
    bootstrap+MVC3在Moon.Orm中的应用(含有代码下载)
    google guava使用例子/示范
    证明Hadoop工作的正确性和可靠性只需4步图文并茂的过程
    python 图 自身遍历 及弱引用使用
    界面布局决定系统设计的成败
    .NET:栈的生长与消亡.
    IIS 6 & Server.MapPath
  • 原文地址:https://www.cnblogs.com/yifanSJ/p/9022381.html
Copyright © 2011-2022 走看看