zoukankan      html  css  js  c++  java
  • 基于分布式思想下的rpc解决方案(1)

    手写RPC:

    1.客户端代码

    接口:

    /**
     * 
     *类说明:服务员接口
     */
    public interface TechInterface {
        //洗脚服务
        String XJ(String name);
    }
    
    package enjoyedu.rpc;
    
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    /**
     * 
     *类说明:rpc框架的客户端代理部分
     */
    public class RpcClientFrame {
    
        /*远程服务的代理对象,参数为客户端要调用的的服务*/
        public static <T> T getRemoteProxyObj(final Class<?> serviceInterface)
                throws Exception {
            // 默认端口8888
            InetSocketAddress serviceAddr = new InetSocketAddress("127.0.0.1",8888);
            // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
            //进行实际的服务调用(动态代理)
            return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,serviceAddr));
        }
    
        /*动态代理类,实现了对远程服务的访问*/
        private static class DynProxy implements InvocationHandler {
            //接口
            private final Class<?> serviceInterface;
            //远程调用地址
            private final InetSocketAddress addr;
    
            //构造函数
            public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
                this.serviceInterface = serviceInterface;
                this.addr = addr;
            }
    
            /*动态代理类,增强:实现了对远程服务的访问*/
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                /* 网络增强部分*/
                Socket socket = null;
                //因为传递的大部分是 方法、参数,所以我们使用Object流对象
                ObjectInputStream objectInputStream = null;
                ObjectOutputStream objectOutputStream = null;
                try {
                    //新建一个Socket
                    socket = new Socket();
                    //连接到远程的地址和端口
                    socket.connect(addr);
                    //往远端 发送数据,按照顺序发送数据:类名、方法名、参数类型、参数值
                    //拿到输出的流
                    objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                    //发送 调用方法的 类名,使用UTF-8避免乱码
                    objectOutputStream.writeUTF(serviceInterface.getName());
                    //发送 方法名
                    objectOutputStream.writeUTF(method.getName());
                    //发送 参数类型,使用Object
                    objectOutputStream.writeObject(method.getParameterTypes());
                    //发送 参数的值,使用UTF-8避免乱码
                    objectOutputStream.writeObject(args);
                    //刷新缓冲区,使得数据立马发送
                    objectOutputStream.flush();
                    //立马拿到远程执行的结果
                    objectInputStream = new ObjectInputStream(socket.getInputStream());
                    //我们要把调用的细节打印出来
                    System.out.println("远程调用成功!" + serviceInterface.getName());
                    //最后要网络的请求返回给返回
                    return objectInputStream.readObject();
                } finally {
    
                    //最后记得关闭
                    socket.close();
                    objectOutputStream.close();
                    objectInputStream.close();
    
                }
            }
        }
    }
    
    package enjoyedu;
    
    
    import enjoyedu.rpc.RpcClientFrame;
    import enjoyedu.service.TechInterface;
    
    /**
     * 
     *类说明:rpc的客户端,调用远端服务
     */
    public class Client {
        public static void main(String[] args) throws Exception {
            //动态代理获取我们的对象
            TechInterface techInterface = RpcClientFrame.getRemoteProxyObj(TechInterface.class);
            //进远程调用我们的对象
            System.out.println(techInterface.XJ("king"));
    
        }
    }

    2.服务端代码:

    /**
     * 
     *类说明:服务接口
     */
    public interface TechInterface {
        //洗脚服务
        String XJ(String name);
    }
    package enjoyedu.service.impl;
    
    
    import enjoyedu.service.TechInterface;
    
    /**
     *
     * 类说明:服务实现类
     */
    public class TechImpl implements TechInterface {
        @Override
        public String XJ(String name) {
    
            return "您好,13号技师为你服务:"+name;
        }
    }
    package enjoyedu.register;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     *
     *类说明:服务注册中心
     */
    public class RegisterCenter {
        //线程池
        private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        //定义注册中心的静态对象
        private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
    
        private static boolean isRunning = false;
    
        private static int port;
    
        public RegisterCenter(int port) {
    
            this.port = port;
        }
    
        //服务注册中心启动
        public void start() throws IOException {
            //服务器监听
            ServerSocket server = new ServerSocket();
            //监听绑定端口
            server.bind(new InetSocketAddress(port));
            System.out.println("start server");
            try {
                while (true) {
                    // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行,并且同时将socket送入(server.accept()=socket)
                    executor.execute(new ServiceTask(server.accept()));
                }
            } finally {
                server.close();
            }
        }
        //服务的注册:socket通讯+反射
        public void register(Class serviceInterface, Class impl) {
    
            serviceRegistry.put(serviceInterface.getName(), impl);
        }
    
        //服务的获取运行
        private static class ServiceTask implements Runnable {
            //客户端socket
            Socket clent = null;
    
            public ServiceTask(Socket client) {
                this.clent = client;
            }
            //远程请求达到服务端,我们需要执行请求结果,并且把请求结果反馈至客户端,使用Socket通讯
            public void run() {
                //反射
                //同样适用object流
                ObjectInputStream inputStream = null;
                ObjectOutputStream outputStream = null;
                try {
                    //1.客户端发送的object对象拿到,2.在采用反射的机制进行调用,3.最后给返回结果
                    inputStream = new ObjectInputStream(clent.getInputStream());
                    //顺序发送数据:类名、方法名、参数类型、参数值
                    //拿到接口名
                    String  serviceName = inputStream.readUTF();
                    //拿到方法名
                    String  methodName = inputStream.readUTF();
                    //拿到参数类型
                    Class<?>[] paramTypes = ( Class<?>[])inputStream.readObject();
                    //拿到参数值
                    Object[] arguments = (Object[])inputStream.readObject();
                    //要到注册中心根据 接口名,获取实现类
                    Class serviceClass =serviceRegistry.get(serviceName);
                    //使用反射的机制进行调用
                    Method method = serviceClass.getMethod(methodName,paramTypes);
                    //反射调用方法,把结果拿到
                    Object result = method.invoke(serviceClass.newInstance(),arguments);
                    //通过执行socket返回给客户端
                    outputStream = new ObjectOutputStream(clent.getOutputStream());
                    // /把结果返回给客户端
                    outputStream.writeObject(result);
                    //记得关闭
                    outputStream.close();
                    inputStream.close();
                    clent.close();
    
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
    
        }
    }
    package enjoyedu;
    
    
    import enjoyedu.register.RegisterCenter;
    import enjoyedu.service.TechInterface;
    import enjoyedu.service.impl.TechImpl;
    
    /**
     * 
     *类说明:rpc的服务端,提供服务
     */
    public class Server {
        public static void main(String[] args) throws  Exception{
            new Thread(new Runnable() {
                public void run() {
                    try {
                        //起一个服务中心
                        RegisterCenter serviceServer = new RegisterCenter(8888);
                        //注册技师对象至注册中心
                        serviceServer.register(TechInterface.class, TechImpl.class);
                        //运行我们的服务
                        serviceServer.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
  • 相关阅读:
    MyEclipse中配置Hibernate
    struts2_对Map进行双层迭代
    Hibernate关联关系全集
    CodeIgniter+Smarty配置
    去掉php框架CI默认url中的index.php【整理】
    jquery的show方法是display:block还是display:inline呢?
    Codeigniter中的Error【转】
    去除 inlineblock 空隙终极解决方案
    jquery三级折叠菜单
    css实现页面文字不换行、自动换行、强制换行
  • 原文地址:https://www.cnblogs.com/zqLoveSym/p/12445023.html
Copyright © 2011-2022 走看看