zoukankan      html  css  js  c++  java
  • 实现RPC就是这么简单

       RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现

           服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。

           阻塞通线模型,是server对每一个请求都开启一条线程去执行请求,此种方式的缺点是服务器端线程的数量和客户端并发访问请求树呈1:1的正比关系。

         PRC 原理

                请求方 按照一定格式发送请求到服务端

                服务端 按照请求要求进行计算,并将结果按照约定格式进行返回

               请求方从返回数据中解析出具体结果

             此处对此作出了一定优化,伪异步IO通信,将所有用户请求放到线程池中处理。

    /**
     *
     * @author zhangwei_david
     * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $
     */
    public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
    
        /**服务端口号**/
        private int                port       = 12000;
    
        private ServerSocket       server;
    
        //线程池
        @Autowired
        private Executor           executorService;
    
        public Map<String, Object> handlerMap = new ConcurrentHashMap<>();
    
        private void publishedService() throws Exception {
    
            server = new ServerSocket(port);
    
            // 一直服务
            for (;;) {
                try {
                    // 获取socket
                    final Socket socket = server.accept();
                    executorService.execute(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                // 获取input
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                ObjectOutputStream output = new ObjectOutputStream(socket
                                    .getOutputStream());
                                try {
                                    // 获取引用
                                    String interfaceName = input.readUTF();
                                    //获取 方法名
                                    String methodName = input.readUTF();
                                    //
                                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                    Object[] arguments = (Object[]) input.readObject();
                                    try {
                                        Object service = handlerMap.get(interfaceName);
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
    
                            } catch (Exception e) {
    
                            }
                        }
                    });
                } catch (Exception e) {
    
                }
            }
    
        }
    
        public void init() {
    
        }
    
        /**
         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            //发布服务
            publishedService();
        }
    
        /**
         * @see org.springframework.context.Lifecycle#start()
         */
        @Override
        public void start() {
        }
    
        /**
         * @see org.springframework.context.Lifecycle#stop()
         */
        @Override
        public void stop() {
            if (server != null) {
                try {
                    server.close();
    
                } catch (IOException e) {
    
                }
            }
        }
    
        /**
         * @see org.springframework.context.Lifecycle#isRunning()
         */
        @Override
        public boolean isRunning() {
            return false;
        }
    
        /**
         * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
         */
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
            System.out.println(serviceBeanMap);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
                            .getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }
    
        /**
         * Setter method for property <tt>executorService</tt>.
         *
         * @param executorService value to be assigned to property executorService
         */
        public void setExecutorService(Executor executorService) {
            this.executorService = executorService;
        }
    
    }
     
    
    /**
     *
     * @author zhangwei_david
     * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $
     */
    @Documented
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface SRPC {
    
        public Class<?> interf();
    }
    

      

     至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。

    /**
     *
     * @author zhangwei_david
     * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $
     */
    public class Client {
        /**
         * 引用服务
         *
         * @param <T> 接口泛型
         * @param interfaceClass 接口类型
         * @param host 服务器主机名
         * @param port 服务器端口
         * @return 远程服务
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
                                                                                                   throws Exception {
            if (interfaceClass == null || !interfaceClass.isInterface()) {
                throw new IllegalArgumentException("必须指定服务接口");
            }
    
            if (host == null || host.length() == 0) {
                throw new IllegalArgumentException("必须指定服务器的地址和端口号");
            }
    
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[] { interfaceClass }, new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] arguments)
                                                                                         throws Throwable {
                        Socket socket = new Socket(host, port);
                        try {
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            try {
                                output.writeUTF(interfaceClass.getName());
                                output.writeUTF(method.getName());
                                output.writeObject(method.getParameterTypes());
                                output.writeObject(arguments);
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    Object result = input.readObject();
                                    if (result instanceof Throwable) {
                                        throw (Throwable) result;
                                    }
                                    return result;
                                } finally {
                                    input.close();
                                }
                            } finally {
                                output.close();
                            }
                        } finally {
                            socket.close();
                        }
                    }
                });
        }
    }
    

      

        上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。

    /**
     *
     * @author zhangwei_david
     * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $
     */
    public class SrpcRequest implements Serializable {
    
        /**  */
        private static final long serialVersionUID = 6132853628325824727L;
        // 请求Id
        private String            requestId;
        // 远程调用接口名称
        private String            interfaceName;
        //远程调用方法名称
        private String            methodName;
        // 参数类型
        private Class<?>[]        parameterTypes;
        // 参数值
        private Object[]          parameters;
    
        /**
         * Getter method for property <tt>requestId</tt>.
         *
         * @return property value of requestId
         */
        public String getRequestId() {
            return requestId;
        }
    
        /**
         * Setter method for property <tt>requestId</tt>.
         *
         * @param requestId value to be assigned to property requestId
         */
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        /**
         * Getter method for property <tt>interfaceName</tt>.
         *
         * @return property value of interfaceName
         */
        public String getInterfaceName() {
            return interfaceName;
        }
    
        /**
         * Setter method for property <tt>interfaceName</tt>.
         *
         * @param interfaceName value to be assigned to property interfaceName
         */
        public void setInterfaceName(String interfaceName) {
            this.interfaceName = interfaceName;
        }
    
        /**
         * Getter method for property <tt>methodName</tt>.
         *
         * @return property value of methodName
         */
        public String getMethodName() {
            return methodName;
        }
    
        /**
         * Setter method for property <tt>methodName</tt>.
         *
         * @param methodName value to be assigned to property methodName
         */
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        /**
         * Getter method for property <tt>parameterTypes</tt>.
         *
         * @return property value of parameterTypes
         */
        public Class<?>[] getParameterTypes() {
            return parameterTypes;
        }
    
        /**
         * Setter method for property <tt>parameterTypes</tt>.
         *
         * @param parameterTypes value to be assigned to property parameterTypes
         */
        public void setParameterTypes(Class<?>[] parameterTypes) {
            this.parameterTypes = parameterTypes;
        }
    
        /**
         * Getter method for property <tt>parameters</tt>.
         *
         * @return property value of parameters
         */
        public Object[] getParameters() {
            return parameters;
        }
    
        /**
         * Setter method for property <tt>parameters</tt>.
         *
         * @param parameters value to be assigned to property parameters
         */
        public void setParameters(Object[] parameters) {
            this.parameters = parameters;
        }
    
    }
     
    
     
    
    /**
     *
     * @author zhangwei_david
     * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $
     */
    public class SrpcResponse implements Serializable {
        /**  */
        private static final long serialVersionUID = -5934073769679010930L;
        // 请求的Id
        private String            requestId;
        // 异常
        private Throwable         error;
        // 响应
        private Object            result;
    
        /**
         * Getter method for property <tt>requestId</tt>.
         *
         * @return property value of requestId
         */
        public String getRequestId() {
            return requestId;
        }
    
        /**
         * Setter method for property <tt>requestId</tt>.
         *
         * @param requestId value to be assigned to property requestId
         */
        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }
    
        /**
         * Getter method for property <tt>error</tt>.
         *
         * @return property value of error
         */
        public Throwable getError() {
            return error;
        }
    
        /**
         * Setter method for property <tt>error</tt>.
         *
         * @param error value to be assigned to property error
         */
        public void setError(Throwable error) {
            this.error = error;
        }
    
        /**
         * Getter method for property <tt>result</tt>.
         *
         * @return property value of result
         */
        public Object getResult() {
            return result;
        }
    
        /**
         * Setter method for property <tt>result</tt>.
         *
         * @param result value to be assigned to property result
         */
        public void setResult(Object result) {
            this.result = result;
        }
    
    }
     
    
    /**
     *
     * @author zhangwei_david
     * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $
     */
    public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {
    
        /**服务端口号**/
        private int                port       = 12000;
    
        private ServerSocket       server;
    
        //线程池
        @Autowired
        private Executor           executorService;
    
        public Map<String, Object> handlerMap = new ConcurrentHashMap<>();
    
        private void publishedService() throws Exception {
    
            server = new ServerSocket(port);
    
            // 一直服务
            for (;;) {
                try {
                    // 获取socket
                    final Socket socket = server.accept();
                    executorService.execute(new Runnable() {
    
                        @Override
                        public void run() {
                            try {
                                // 获取input
                                ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                try {
                                    // 获取RPC请求
                                    SrpcRequest request = (SrpcRequest) input.readObject();
                                    ObjectOutputStream output = new ObjectOutputStream(socket
                                        .getOutputStream());
                                    try {
                                        SrpcResponse response = doHandle(request);
                                        output.writeObject(response);
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
    
                            } catch (Exception e) {
    
                            }
                        }
                    });
                } catch (Exception e) {
    
                }
            }
    
        }
    
        private SrpcResponse doHandle(SrpcRequest request) {
            SrpcResponse response = new SrpcResponse();
            response.setRequestId(request.getRequestId());
            try {
                Object service = handlerMap.get(request.getInterfaceName());
                Method method = service.getClass().getMethod(request.getMethodName(),
                    request.getParameterTypes());
                response.setResult(method.invoke(service, request.getParameters()));
    
            } catch (Exception e) {
                response.setError(e);
            }
            return response;
        }
    
        public void init() {
    
        }
    
        /**
         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            //发布
            publishedService();
        }
    
        /**
         * @see org.springframework.context.Lifecycle#start()
         */
        @Override
        public void start() {
        }
    
        /**
         * @see org.springframework.context.Lifecycle#stop()
         */
        @Override
        public void stop() {
            if (server != null) {
                try {
                    server.close();
    
                } catch (IOException e) {
    
                }
            }
        }
    
        /**
         * @see org.springframework.context.Lifecycle#isRunning()
         */
        @Override
        public boolean isRunning() {
            return false;
        }
    
        /**
         * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
         */
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
            System.out.println(serviceBeanMap);
            if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()
                            .getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }
    
        /**
         * Setter method for property <tt>executorService</tt>.
         *
         * @param executorService value to be assigned to property executorService
         */
        public void setExecutorService(Executor executorService) {
            this.executorService = executorService;
        }
    
    }
     
    
    /**
     *
     * @author zhangwei_david
     * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $
     */
    public class Client {
        /**
         * 引用服务
         *
         * @param <T> 接口泛型
         * @param interfaceClass 接口类型
         * @param host 服务器主机名
         * @param port 服务器端口
         * @return 远程服务
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)
                throws Exception {
            if (interfaceClass == null || !interfaceClass.isInterface()) {
                throw new IllegalArgumentException("必须指定服务接口");
            }
    
            if (host == null || host.length() == 0) {
                throw new IllegalArgumentException("必须指定服务器的地址和端口号");
            }
    
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[] { interfaceClass }, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)
                        throws Throwable {
                    Socket socket = new Socket(host, port);
                    try {
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            SrpcRequest request = new SrpcRequest();
                            request.setRequestId(UUID.randomUUID().toString());
                            request.setInterfaceName(interfaceClass.getName());
                            request.setMethodName(method.getName());
                            request.setParameterTypes(method.getParameterTypes());
                            request.setParameters(arguments);
                            output.writeObject(request);
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                SrpcResponse response = (SrpcResponse) input.readObject();
                                if (response.getError() != null
                                        && response.getError() instanceof Throwable) {
                                    throw response.getError();
                                }
                                return response.getResult();
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        }
    }
    

      

  • 相关阅读:
    获得树形json串
    淘宝分布式 key/value 存储引擎Tair安装部署过程及Javaclient測试一例
    ARC下dealloc过程及.cxx_destruct的探究
    连类比事-category和extension
    category和关联对象
    静态构造函数c# 静态块java initallize oc
    + (void)initialize vs 静态构造方法
    Servlet中文乱码原因 解决 Get 和 Post 和客户端
    double int 类型的区别
    待解决问题 oc
  • 原文地址:https://www.cnblogs.com/wei-zw/p/8797749.html
Copyright © 2011-2022 走看看