zoukankan      html  css  js  c++  java
  • RPC与hadoop

    rlgdj的这样的话,真正的实现类在Server端,客户端调用方法的时候,只能得到得到从Server端的返回值。看来接口中的抽象方法必须要有返回值啊。ps。右下角的Client端的main()中rpc.refer()方法,返回一个继承了Proxy实现了HelloService的一个代理类。再调用里面的invoke方法。具体可以去看代理模式。

    附上源码:

     1 package test.PRC;
     2 
     3 public class Client {
     4     public static void main(String[] args) throws Exception {    
     5         HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);    
     6         for (int i = 0; i < Integer.MAX_VALUE; i ++) {    
     7             String hello = service.hello("World" + i);    
     8             System.out.println(hello);    
     9             Thread.sleep(1000);    
    10         }    
    11     }    
    12 
    13 }

    服务端:

     1 package test.PRC;
     2 
     3 public class Server {
     4 
     5 
     6     public static void main(String []args) throws Exception {  
     7             HelloService service = new HelloServiceImpl();  
     8             RpcFramework.export(service, 1234);   
     9     }  
    10 
    11 
    12 }

    rpc框架:

      1 package test.PRC;
      2 
      3 import java.io.ObjectInputStream;
      4 import java.io.ObjectOutputStream;
      5 import java.lang.reflect.InvocationHandler;
      6 import java.lang.reflect.Method;
      7 import java.lang.reflect.Proxy;
      8 import java.net.ServerSocket;
      9 import java.net.Socket;
     10 
     11 public class RpcFramework {
     12     /** 
     13     * 暴露服务 
     14     *  
     15     * @param service 服务实现 
     16     * @param port 服务端口 
     17     * @throws Exception 
     18     */  
     19    public static void export(final Object service, int port) throws Exception {  
     20        if (service == null)  
     21             throw new IllegalArgumentException("service instance == null");  
     22         if (port <= 0 || port > 65535)  
     23             throw new IllegalArgumentException("Invalid port " + port);  
     24         System.out.println("Export service " + service.getClass().getName() + " on port " + port);  
     25         ServerSocket server = new ServerSocket(port);  
     26         for(;;) {  
     27             try {  
     28                 final Socket socket = server.accept();//服务器端一旦收到消息,就创建一个线程进行处理  
     29                 new Thread(new Runnable() {  
     30                     public void run() {  
     31                         try {  
     32                             try {  
     33                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
     34                                 try {  
     35                                     String methodName = input.readUTF();//service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称  
     36                                     Class<?>[] parameterTypes = (Class<?>[])input.readObject();//获得参数的类型  
     37                                     Object[] arguments = (Object[])input.readObject();//获得参数  
     38                                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
     39                                     try {  
     40                                         Method method = service.getClass().getMethod(methodName, parameterTypes);//通过反射机制获得方法  
     41                                         Object result = method.invoke(service, arguments);//通过反射机制获得类的方法,并调用这个方法  
     42                                         output.writeObject(result);//将结果发送  
     43                                     } catch (Throwable t) {  
     44                                         output.writeObject(t);  
     45                                     } finally {  
     46                                         output.close();  
     47                                     }  
     48                                 } finally {  
     49                                     input.close();  
     50                                 }  
     51                             } finally {  
     52                                 socket.close();  
     53                             }  
     54                         } catch (Exception e) {  
     55                             e.printStackTrace();  
     56                         }  
     57                     }  
     58                 }).start();  
     59             } catch (Exception e) {  
     60                 e.printStackTrace();  
     61             }  
     62         }  
     63     }  
     64   
     65     /** 
     66      * 引用服务 
     67      *  
     68      * @param <T> 接口泛型 
     69      * @param interfaceClass 接口类型 
     70      * @param host 服务器主机名 
     71      * @param port 服务器端口 
     72      * @return 远程服务 
     73      * @throws Exception 
     74      *///原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果  
     75     @SuppressWarnings("unchecked")  
     76     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {  
     77         if (interfaceClass == null)  
     78             throw new IllegalArgumentException("Interface class == null");  
     79         if (! interfaceClass.isInterface())  
     80             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");  
     81         if (host == null || host.length() == 0)  
     82             throw new IllegalArgumentException("Host == null!");  
     83         if (port <= 0 || port > 65535)  
     84             throw new IllegalArgumentException("Invalid port " + port);  
     85         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);  
     86         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {  
     87             public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {  
     88                 Socket socket = new Socket(host, port);  
     89                 try {  
     90                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
     91                     try {  
     92                         output.writeUTF(method.getName());  
     93                         output.writeObject(method.getParameterTypes());  
     94                         output.writeObject(arguments);  
     95                         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
     96                         try {  
     97                             Object result = input.readObject();  
     98                             if (result instanceof Throwable) {  
     99                                 throw (Throwable) result;  
    100                             }  
    101                             return result;  
    102                         } finally {  
    103                             input.close();  
    104                         }  
    105                     } finally {  
    106                         output.close();  
    107                     }  
    108                 } finally {  
    109                     socket.close();  
    110                  }  
    111              }  
    112          });  
    113      }  
    114 
    115 
    116 }

    接口:

    package test.PRC;
    
    public interface HelloService {
    
        String hello(String name);  
    }

    实现类:

     1 package test.PRC;
     2 
     3 public class HelloServiceImpl implements HelloService{
     4 
     5     public String hello(String name) {  
     6         return "Hello " + name;  
     7     }  
     8 
     9 
    10 }

    具体情况具体部署。

  • 相关阅读:
    输入任意十进制数字,转换为任意进制表示
    Integer 原码解读
    Java 中位移运算符 >>,>>>,<<
    解读源码中的问题
    HashMap 源码解读
    js:插入节点appendChild insertBefore使用方法
    冒泡排序实例
    nodejs学习笔记(2)--Express下安装模版引擎ejs
    nodejs学习笔记(1)--express安装问题:express不是内部也或者外部的命令解决方案
    Jquery取得iframe中元素的几种方法
  • 原文地址:https://www.cnblogs.com/slhzxm16/p/6115352.html
Copyright © 2011-2022 走看看