zoukankan      html  css  js  c++  java
  • 分布式开发

    基于Socket搭建简易的Rpc交互

    api组件

    • 提供交互接口以及请求的参数对象
    // 交互接口
    public interface IService {
        String getToken(String userId);
        String receiveMsg();
    }
    
    // 交互数据封装
    public class RpcRequestMsg implements Serializable {
        private String className;
        private String methodName;
        private Object[] args;
    
        public RpcRequestMsg() {
        }
    
        public RpcRequestMsg(String className, String methodName, Object[] args) {
            this.className = className;
            this.methodName = methodName;
            this.args = args;
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    

    server组件

    ①建立Socket端口监听 ②Socket连接后,Socket读事件获取请求信息 ③解析请求信息并执行对应方法 ④Socket写事件返回相应
    ⑤Spring切入托管:在Bean初始化后置处理时扫描RemoteServer注解对象,封装成BeanMethod存放到容器中(用于步骤3反射调用)
    ⑥Spring切入托管:监听ContextApplication对象的Refreshed事件,当容器初始化完成后,触发①操作

    Socket

    Socket监听逻辑 - RpcProxyServer

    public class RpcProxyServer {
        // 采用线程池处理连接请求
        private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    
        public void publisher(int port){
            // 注册端口监听
            try(ServerSocket serverSocket = new ServerSocket(port)){
                while (true){
                    // 当连接被建立后,获取新线程进行处理
                    Socket socket = serverSocket.accept();
                    executorService.execute(new ProcessorHandler(socket));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    Socket处理逻辑 - ProcessorHandler

    // 操作Socket完成读写处理
    public class ProcessorHandler implements Runnable{
    
        private Socket socket = null;
    
        public ProcessorHandler(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try(InputStream is = socket.getInputStream();
                ObjectInputStream ois = new ObjectInputStream(is);
    
                OutputStream os = socket.getOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(os);
            ){
                // 请求信息获取
                RpcRequestMsg rpcRequestMsg = (RpcRequestMsg)ois.readObject();
    
                // 完成请求信息的采集后,委托Mediator执行对应方法并获取返回值
                Object rs = Mediator.getInstance().process(rpcRequestMsg);
    
                // 结果响应
                oos.writeObject(rs);
                oos.flush();
            } catch (IOException | ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
    

    MethodInvoke

    public class Mediator {
        // 维护了一份Key-Method对象的映射,Key=全类名.方法名,Method中包含bean和Method对象
        public static Map<String,BeanMethod> route = new ConcurrentHashMap<>();
    
        private volatile static Mediator instance = null;
    
        private Mediator(){};
    
        // 双重检查锁 - 懒汉式单例
        public static Mediator getInstance(){
            if(instance == null){
                synchronized (Mediator.class){
                    if(instance == null){
                        instance = new Mediator();
                    }
                }
            }
            return instance;
        }
    
        // 具体的处理逻辑
        public Object process(RpcRequestMsg rpcRequestMsg){
            // 根据MethodId 获取到 BeanMethod对象
            String key = rpcRequestMsg.getClassName() + "." + rpcRequestMsg.getMethodName();
            BeanMethod beanMethod = route.get(key);
            if(beanMethod == null) 
                return null;
            
            // 反射调用
            Object obj = beanMethod.getBean();
            Method method = beanMethod.getMethod();
            try {
                return method.invoke(obj,rpcRequestMsg.getArgs());
            } catch (IllegalAccessException | InvocationTargetException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    
    @Data
    public class BeanMethod {
        private Object bean;
        private Method method;
    
        public BeanMethod(Object bean, Method method) {
            this.bean = bean;
            this.method = method;
        }
    }
    

    Spring切入

    // 声明自定义注解,用于Spring切入点中获取特定bean对象
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface RemoteServer {
    }
    
    @RemoteServer
    public class ServiceImpl implements IService {
        @Override
        public String getToken(String userId) {
            System.out.println("ServiceImpl - getToken():" + userId);
            return userId;
        }
    
        @Override
        public String receiveMsg() {
            Date date = new Date();
            System.out.println("ServiceImpl - receiveMsg():" + date.toString());
            return date.toString();
        }
    }
    

    Spring初始化后置处理器,扫描RemoteServer注解,并封装成BeanMethod注册到Mediator.route容器

    @Component
    public class RpcMethodRegisterBeanPostProcessor implements BeanPostProcessor {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if(bean.getClass().isAnnotationPresent(RemoteServer.class)){
                Method[] methods = bean.getClass().getDeclaredMethods();
                for(Method method : methods){
                    String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                    BeanMethod beanMethod = new BeanMethod(bean,method);
                    Mediator.route.put(key,beanMethod);
                }
            }
            return bean;
        }
    }
    

    Spring事件监听器,当容器refreshed完成后,开启RpcProxyServer服务监听

    @Component
    public class SocketServerInitialEvent implements ApplicationListener<ContextRefreshedEvent>{
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            RpcProxyServer rpcProxyServer = new RpcProxyServer();
            rpcProxyServer.publisher(8888);
        }
    }
    

    client组件

    api接口 -> 动态代理对象 -> invokeHandler(){socket操作}

    通过@Bean标签在Configuration类中将iService的代理对象注册为Bean

    @Configuration
    public class MyConfiguration {
        @Bean
        public IService iService(){
            return RpcProxyClient.clientProxy(IService.class,"localhost",8888);
        }
    
        public static void main(String[] args) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MyConfiguration.class);
            IService service = context.getBean(IService.class);
            System.out.println(service.receiveMsg());
        }
    }
    
    public class RpcProxyClient {
        public static <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port){
            return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),new Class[]{interfaceCls},new RemoteInvocationHandler(host,port));
        }
    }
    

    // 动态代理,委托执行器

    public class RemoteInvocationHandler implements InvocationHandler {
        private String host = null;
        private int port;
    
        public RemoteInvocationHandler(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            ClientSocketHandler clientSocketHandler = new ClientSocketHandler(host,port);
    
            // 构造请求信息
            RpcRequestMsg rpcRequestMsg = new RpcRequestMsg(proxy.getClass().getInterfaces()[0].getName(),method.getName(),args);
            return clientSocketHandler.handler(rpcRequestMsg);
        }
    }
    

    // 实际的client socket业务逻辑

    public class ClientSocketHandler {
        private String host;
        private int port;
    
        public ClientSocketHandler(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        private Socket newSocket() throws IOException {
            return new Socket(host,port);
        }
    
        public Object handler(RpcRequestMsg rpcRequestMsg) throws IOException {
            Socket socket = newSocket();
            try (OutputStream os = socket.getOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(os);
    
                 InputStream is = socket.getInputStream();
                 ObjectInputStream ois = new ObjectInputStream(is);
            ){
                oos.writeObject(rpcRequestMsg);
                oos.flush();
    
                String msg = (String) ois.readObject();
                System.out.println("客户端 - 读入:" + msg);
    
                while (socket.isInputShutdown() && socket.isOutputShutdown()) {
                    socket.close();
                }
                return msg;
            } catch (IOException | ClassNotFoundException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    

    欢迎疑问、期待评论、感谢指点 -- kiqi,愿同您为友

    -- 星河有灿灿,愿与之辉

  • 相关阅读:
    表达式目录树(Expression)
    六大设计原则【单一职责】【里氏替换】【 迪米特法则】【依赖倒置原则】【接口隔离原则】【开闭原则】
    lambda,linq
    c#中的特性Attribute
    CentOS7部署Nginx
    NetCore项目的部署
    ABP+NetCore+Vue.js实现增删改查
    Abp数据库迁移注意事项
    MVC分页
    AspNet5 Changes to [Activate] in beta-5
  • 原文地址:https://www.cnblogs.com/kiqi/p/14429029.html
Copyright © 2011-2022 走看看