zoukankan      html  css  js  c++  java
  • 基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇

    前提

    前置文章:

    在前置的《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》一文中已经定义了一个相对简单的RPC私有协议,并且实现了对应的编码和解码模块。这篇文章基于协议篇,完成Server端代码调用的编写。考虑到目前相对主流的IOC容器是Spring,这里选用了spring-boot-starter(非MVC容器,只是单纯管理Bean),依赖JDK1.8+

    思路

    首先RPC私有协议定义了Client端会传过来四个和服务调用息息相关的字符:接口全类名interfaceName、方法名methodName、方法参数签名字符串数组methodArgumentSignatures(可选,这个参数不是必须传入的)以及方法参数数组methodArguments(可选,空方法列表的时候不需要传入参数)。主要流程如下:

    • Server端的所有服务端(实现)类交由IOC容器托管。
    • Client端发起RPC请求。
    • 通过前面提到的最多四个参数,从Server服务实例的IOC容器中匹配出吻合度最高的一个方法java.lang.reflect.Method实例、该方法实例的宿主类以及宿主类对应的Bean实例,如果这一步匹配的目标方法超过1个或者为0个,可以直接返回异常信息。
    • 把前一步得到的Method实例、宿主类Bean实例,结合方法参数数组methodArguments进行反射调用,得到调用结果。
    • Server端把响应结果封装到payload通过私有协议发送回Client端。

    Server端代码实现

    为了暂时方便起见,部分数组入参被重新封装为ArrayList,实际上编写RPC框架的时候应该优先考虑性能问题,像JDK提供的集合类库等等应该尽可能少用(以ArrayList为例,扩容的时候存在底层Object[]拷贝,造成性能损失和额外的内存消耗),极尽可能使用基本类型和数组。

    先定义方法匹配器MethodMatcher相关的类:

    public interface MethodMatcher {
    
        /**
         * 查找一个匹配度最高的方法信息
         *
         * @param input input
         * @return output
         */
        MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input);
    }
    
    // 输入值
    @EqualsAndHashCode
    @Data
    public class MethodMatchInput {
    
        private String interfaceName;
    
        private String methodName;
    
        private List<String> methodArgumentSignatures;
    
        private int methodArgumentArraySize;
    }
    
    // 输出值
    @Data
    public class MethodMatchOutput {
    
        /**
         * 目标方法实例
         */
        private Method targetMethod;
    
        /**
         * 目标实现类 - 这个有可能是被Cglib增强过的类型,是宿主类的子类,如果没有被Cglib增强过,那么它就是宿主类
         */
        private Class<?> targetClass;
    
        /**
         * 宿主类
         */
        private Class<?> targetUserClass;
    
        /**
         * 宿主类Bean实例
         */
        private Object target;
    
        /**
         * 方法参数类型列表
         */
        private List<Class<?>> parameterTypes;
    }
    

    目标方法匹配的逻辑大致如下:

    1. 方法名称和方法实例的宿主类型一定作为匹配条件的一部分。
    2. 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配。
    3. 如果没有传入参数签名列表,那么使用参数的数量进行匹配。
    4. 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配。
    5. 考虑到方法匹配解析的过程相对耗时,需要把结果缓存起来。

    分析至此,可以基于反射,编写一个抽象的方法匹配器BaseMethodMatcher,然后把获取宿主类信息的功能委托到子类:

    public class MethodMatchException extends RuntimeException {
    
        public MethodMatchException(String message) {
            super(message);
        }
    
        public MethodMatchException(String message, Throwable cause) {
            super(message, cause);
        }
    
        public MethodMatchException(Throwable cause) {
            super(cause);
        }
    }
    
    @Data
    public class HostClassMethodInfo {
    
        private Class<?> hostClass;
        private Class<?> hostUserClass;
        private Object hostTarget;
    }
    
    @Slf4j
    abstract class BaseMethodMatcher implements MethodMatcher {
    
        private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();
    
        @Override
        public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) {
            return cache.computeIfAbsent(input, in -> {
                try {
                    MethodMatchOutput output = new MethodMatchOutput();
                    Class<?> interfaceClass = Class.forName(in.getInterfaceName());
                    // 获取宿主类信息
                    HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass);
                    List<Method> targetMethods = Lists.newArrayList();
                    ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> {
                        String methodName = method.getName();
                        Class<?> declaringClass = method.getDeclaringClass();
                        List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures())
                                .map(mas -> {
                                    List<Class<?>> list = Lists.newArrayList();
                                    mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null)));
                                    return list;
                                }).orElse(Lists.newArrayList());
                        output.setParameterTypes(inputParameterTypes);
                        // 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配
                        if (!inputParameterTypes.isEmpty()) {
                            List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                            return Objects.equals(methodName, in.getMethodName()) &&
                                    Objects.equals(info.getHostUserClass(), declaringClass) &&
                                    Objects.equals(parameterTypes, inputParameterTypes);
                        }
                        // 如果没有传入参数签名列表,那么使用参数的数量进行匹配
                        if (in.getMethodArgumentArraySize() > 0) {
                            List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                            return Objects.equals(methodName, in.getMethodName()) &&
                                    Objects.equals(info.getHostUserClass(), declaringClass) &&
                                    in.getMethodArgumentArraySize() == parameterTypes.size();
    
                        }
                        // 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配
                        return Objects.equals(methodName, in.getMethodName()) &&
                                Objects.equals(info.getHostUserClass(), declaringClass);
    
                    });
                    if (targetMethods.size() != 1) {
                        throw new MethodMatchException(String.format("查找到目标方法数量不等于1,interface:%s,method:%s",
                                in.getInterfaceName(), in.getMethodName()));
                    }
                    Method targetMethod = targetMethods.get(0);
                    output.setTargetClass(info.getHostClass());
                    output.setTargetMethod(targetMethod);
                    output.setTargetUserClass(info.getHostUserClass());
                    output.setTarget(info.getHostTarget());
                    return output;
                } catch (Exception e) {
                    log.error("查找匹配度最高的方法失败,输入参数:{}", JSON.toJSONString(in), e);
                    if (e instanceof MethodMatchException) {
                        throw (MethodMatchException) e;
                    } else {
                        throw new MethodMatchException(e);
                    }
                }
            });
        }
    
        /**
         * 获取宿主类的信息
         *
         * @param interfaceClass interfaceClass
         * @return HostClassMethodInfo
         */
        abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass);
    }
    

    接着,通过接口类型获取宿主类的功能就委托给Spring实现,从IOC容器中获取,定义SpringMethodMatcher

    @Component
    public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {
    
        private DefaultListableBeanFactory beanFactory;
    
        @Override
        public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
            this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        }
    
        @Override
        HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) {
            HostClassMethodInfo info = new HostClassMethodInfo();
            // 从容器中通过接口类型获取对应的实现,实现必须只有一个
            Object bean = beanFactory.getBean(interfaceClass);
            info.setHostTarget(bean);
            info.setHostClass(bean.getClass());
            info.setHostUserClass(ClassUtils.getUserClass(bean.getClass()));
            return info;
        }
    }
    

    至此,目标方法匹配的模块已经编写完毕,接下来需要处理方法参数列表的反序列化。编写协议的时候,笔者把方法参数列表methodArguments存放在Object数组中,传输的时候序列化为byte数组,经过协议解析之后,方法参数列表的实际类型为ByteBuf数组(这是因为Netty中的字节容器就是ByteBuf),那么需要考虑把ByteBuf数组转换为目标方法的参数类型实例。主要步骤如下:

    1. 如果方法参数列表为空,那么什么都不用做,也就是调用了无参数的方法。
    2. 如果方法参数列表不为空同时方法参数类型列表不为空,优先选用方法参数类型列表进行转换。
    3. 如果方法参数列表不为空同时方法参数类型列表为空,则使用Method#getParameterTypes()得到的方法参数列表类型进行转换。

    定义一个方法参数转换器接口MethodArgumentConverter

    public interface MethodArgumentConverter {
    
        ArgumentConvertOutput convert(ArgumentConvertInput input);
    }
    
    @Data
    public class ArgumentConvertInput {
    
        /**
         * 目标方法
         */
        private Method method;
    
        /**
         * 方法参数类型列表
         */
        private List<Class<?>> parameterTypes;
    
        /**
         * 方法参数列表
         */
        private List<Object> arguments;
    }
    
    @Data
    public class ArgumentConvertOutput {
    
    
        private Object[] arguments;
    }
    

    方法参数转换器的默认实现如下:

    @Slf4j
    @Component
    public class DefaultMethodArgumentConverter implements MethodArgumentConverter {
    
        private final Serializer serializer = FastJsonSerializer.X;
    
        @Override
        public ArgumentConvertOutput convert(ArgumentConvertInput input) {
            ArgumentConvertOutput output = new ArgumentConvertOutput();
            try {
                if (null == input.getArguments() || input.getArguments().isEmpty()) {
                    output.setArguments(new Object[0]);
                    return output;
                }
                List<Class<?>> inputParameterTypes = input.getParameterTypes();
                int size = inputParameterTypes.size();
                if (size > 0) {
                    Object[] arguments = new Object[size];
                    for (int i = 0; i < size; i++) {
                        ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
                        int readableBytes = byteBuf.readableBytes();
                        byte[] bytes = new byte[readableBytes];
                        byteBuf.readBytes(bytes);
                        arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i));
                        byteBuf.release();
                    }
                    output.setArguments(arguments);
                    return output;
                }
                Class<?>[] parameterTypes = input.getMethod().getParameterTypes();
                int len = parameterTypes.length;
                Object[] arguments = new Object[len];
                for (int i = 0; i < len; i++) {
                    ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
                    int readableBytes = byteBuf.readableBytes();
                    byte[] bytes = new byte[readableBytes];
                    byteBuf.readBytes(bytes);
                    arguments[i] = serializer.decode(bytes, parameterTypes[i]);
                    byteBuf.release();
                }
                output.setArguments(arguments);
                return output;
            } catch (Exception e) {
                throw new ArgumentConvertException(e);
            }
        }
    }    
    

    所有前置工作都完成了,现在编写一个Server端的入站处理器ServerHandler,暂时不做代码逻辑优化,只做实现,把反射调用的模块直接在此类中编写:

    @Component
    @Slf4j
    public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {
    
        @Autowired
        private MethodMatcher methodMatcher;
    
        @Autowired
        private MethodArgumentConverter methodArgumentConverter;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
            log.info("服务端接收到:{}", packet);
            MethodMatchInput input = new MethodMatchInput();
            input.setInterfaceName(packet.getInterfaceName());
            input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures())
                    .map(Lists::newArrayList).orElse(Lists.newArrayList()));
            input.setMethodName(packet.getMethodName());
            Object[] methodArguments = packet.getMethodArguments();
            input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0);
            MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input);
            log.info("查找目标实现方法成功,目标类:{},宿主类:{},宿主方法:{}",
                    output.getTargetClass().getCanonicalName(),
                    output.getTargetUserClass().getCanonicalName(),
                    output.getTargetMethod().getName()
            );
            Method targetMethod = output.getTargetMethod();
            ArgumentConvertInput convertInput = new ArgumentConvertInput();
            convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList());
            convertInput.setMethod(output.getTargetMethod());
            convertInput.setParameterTypes(output.getParameterTypes());
            ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput);
            ReflectionUtils.makeAccessible(targetMethod);
            // 反射调用
            Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments());
            ResponseMessagePacket response = new ResponseMessagePacket();
            response.setMagicNumber(packet.getMagicNumber());
            response.setVersion(packet.getVersion());
            response.setSerialNumber(packet.getSerialNumber());
            response.setAttachments(packet.getAttachments());
            response.setMessageType(MessageType.RESPONSE);
            response.setErrorCode(200L);
            response.setMessage("Success");
            response.setPayload(JSON.toJSONString(result));
            log.info("服务端输出:{}", JSON.toJSONString(response));
            ctx.writeAndFlush(response);
        }
    }
    

    编写一个Server的启动类ServerApplication,在Spring容器启动之后,启动Netty服务:

    @SpringBootApplication(scanBasePackages = "club.throwable.server")
    @Slf4j
    public class ServerApplication implements CommandLineRunner {
    
        @Value("${netty.port:9092}")
        private Integer nettyPort;
    
        @Autowired
        private ServerHandler serverHandler;
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(ServerApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            int port = nettyPort;
            ServerBootstrap bootstrap = new ServerBootstrap();
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                                ch.pipeline().addLast(new LengthFieldPrepender(4));
                                ch.pipeline().addLast(new RequestMessagePacketDecoder());
                                ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
                                ch.pipeline().addLast(serverHandler);
                            }
                        });
                ChannelFuture future = bootstrap.bind(port).sync();
                log.info("启动NettyServer[{}]成功...", port);
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    

    最后,编写契约包和契约实现:

    - ch0-custom-rpc-protocol          项目根目录
      - club.throwable
        - utils                        工具类
        - protocol                     协议
        - exception                    异常
        - contract                     契约
          - HelloService               契约接口
        - server                       服务端
          - contract
            - DefaultHelloService      契约接口实现
    
    public interface HelloService {
    
        String sayHello(String name);
    }
    
    // 实现
    @Service
    public class DefaultHelloService implements HelloService {
    
        @Override
        public String sayHello(String name) {
            return String.format("%s say hello!", name);
        }
    }
    

    先启动服务端ServerApplication,再启动上一节提到的TestProtocolClient,输出结果:

    // 服务端日志
    2020-01-15 00:05:57.898  INFO 14420 --- [           main] club.throwable.server.ServerApplication  : 启动NettyServer[9092]成功...
    2020-01-15 00:06:05.980  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)])
    2020-01-15 00:06:07.448  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
    2020-01-15 00:06:07.521  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":""doge say hello!"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
    
    // 客户端日志
    00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 启动NettyClient[9092]成功...
    ...省略...
    00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":""doge say hello!"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
    

    可见RPC调用成功。

    小结

    编写RPCServer端技巧在于处理目标方法和宿主类的查找,在转换方法参数的时候,需要考虑简化处理和提高效率,剩下的就是做好异常处理和模块封装。限于篇幅,后面会先分析Client端的处理,再分析心跳处理、服务端优化、甚至是对接注册中心等等,在NettySpringBoot等优秀框架的加持下编写一个RPC框架其实并不困难,困难的是性能优化和生态圈的支持。

    Demo项目地址:

    (本文完 c-1-d e-a-20200115)

    技术公众号(《Throwable文摘》),不定期推送笔者原创技术文章(绝不抄袭或者转载):

    娱乐公众号(《天天沙雕》),甄选奇趣沙雕图文和视频不定期推送,缓解生活工作压力:

  • 相关阅读:
    uva 10491 Cows and Cars
    uva 10910 Marks Distribution
    uva 11029 Leading and Trailing
    手算整数的平方根
    uva 10375 Choose and divide
    uva 10056 What is the Probability?
    uva 11027 Palindromic Permutation
    uva 10023 Square root
    Ural(Timus) 1081. Binary Lexicographic Sequence
    扩展欧几里得(求解线性方程)
  • 原文地址:https://www.cnblogs.com/throwable/p/12194713.html
Copyright © 2011-2022 走看看