zoukankan      html  css  js  c++  java
  • 自实现RPC调用

    服务提供者

    服务接口:

    public interface HelloService {
        public String sayHello(String name);
    }

    服务实现类:

    public class HelloServiceImpl implements HelloService {
        @Override
        public String sayHello(String name) {
            return "hello:" + name;
        }
    }

    服务注册:

    public class ProviderReflect {
        private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(1000), new MyThreadFactory("providerReflact"));
    
        /**
         * 服务发布
         *
         * @param service
         * @param port
         */
        public static void provider(final Object service, int port) {
            ServerSocket serverSocket = null;
    
            try {
                serverSocket = new ServerSocket(port);
                while (true) {
                    Socket socket = serverSocket.accept();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            ObjectOutputStream oos = null;
                            ObjectInputStream ois = null;
                            try {
                                ois = new ObjectInputStream(socket.getInputStream());
    
                                // 请求调用的方法名
                                String methodName = ois.readUTF();
                                // 请求的参数
                                Object[] args = (Object[]) ois.readObject();
    
                                oos = new ObjectOutputStream(socket.getOutputStream());
                                Object result = MethodUtils.invokeExactMethod(service, methodName, args);
                                System.out.println(result);
                                // 写结果
                                oos.writeObject(result);
                            } catch (IOException e) {
                                e.printStackTrace();
                            } catch (InvocationTargetException e) {
                                e.printStackTrace();
                            } catch (NoSuchMethodException e) {
                                e.printStackTrace();
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            } catch (ClassNotFoundException e) {
                                e.printStackTrace();
                            } finally {
                                if (oos != null) {
                                    try {
                                        oos.close();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                                if (ois != null) {
                                    try {
                                        ois.close();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            }
                        }
                    });
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        static class MyThreadFactory implements ThreadFactory {
            /**
             * 线程池中的线程名称前缀
             */
            private String namePrefix;
    
            /**
             * 原子的整型
             */
            private AtomicInteger atomicInteger = new AtomicInteger(1);
    
            public MyThreadFactory(String whatFeaturOfGroup) {
                this.namePrefix = "From MyThreadFactory:" + whatFeaturOfGroup + "-worker-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                String name = namePrefix + atomicInteger.getAndIncrement();
                Thread thread = new Thread(r, name);
                System.out.println(thread.getName());
                return thread;
            }
        }
    
        /**
         * 发布服务
         *
         * @param args
         */
        public static void main(String[] args) {
            HelloService helloService = new HelloServiceImpl();
            ProviderReflect.provider(helloService, 8082);
        }
    }

    pom.xml:

           <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>

    服务消费者

    消费接口:

    public interface HelloService {
        public String sayHello(String name);
    }

    接口的代理:

    public class ConsumerProxy {
        public static <T> T consume(final Class<?> serviceClass, final String host, final int port) {
            return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    Socket socket = null;
                    ObjectOutputStream output = null;
                    ObjectInputStream input = null;
                    try {
                        socket = new Socket(host, port);
                        output = new ObjectOutputStream(socket.getOutputStream());
    //                    output.writeUTF(serviceClass.getName());
                        output.writeUTF(method.getName());
    //                    output.writeObject(method.getParameterTypes());
                        output.writeObject(args);
                        input = new ObjectInputStream(socket.getInputStream());
                        return input.readObject();
                    } finally {
                        if (socket != null) {
                            socket.close();
                        }
                        if (output != null) {
                            output.close();
                        }
                        if (input != null) {
                            input.close();
                        }
                    }
                }
            });
        }
    
        public static void main(String[] args) {
            HelloService helloService = ConsumerProxy.consume(HelloService.class, "127.0.0.1", 8082);
            String response = helloService.sayHello("yangyongjie");
            System.out.println(response);
        }
    }

    输出:

    hello:yangyongjie

  • 相关阅读:
    BZOJ3732: Network(Kruskal重构树)
    AtCoder Beginner Contest 103
    2018.7.21NOIP模拟赛?解题报告
    PE刷题记
    杜教筛入门
    浅谈积性函数的线性筛法
    BZOJ4916: 神犇和蒟蒻(杜教筛)
    BZOJ2818: Gcd(莫比乌斯反演)
    LD1-B(最短路径-SPFA)
    UVa 10837 A Research Problem 欧拉函数
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/11081116.html
Copyright © 2011-2022 走看看