zoukankan      html  css  js  c++  java
  • 基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

    前提

    前置文章:

    前一篇文章相对简略地介绍了RPC服务端的编写,而这篇博文最要介绍客户端(Client)的实现。RPC调用一般是面向契约编程的,而Client的核心功能就是:把契约接口方法的调用抽象为使用NettyRPC服务端通过私有协议发送一个请求。这里最底层的实现依赖于动态代理,因此动态代理是动态实现接口的最简单方式(如果字节码研究得比较深入,可以通过字节码编程实现接口)。需要的依赖如下:

    • JDK1.8+
    • Netty:4.1.44.Final
    • SpringBoot:2.2.2.RELEASE

    动态代理的简单使用

    一般可以通过JDK动态代理或者Cglib的字节码增强来实现此功能,为了简单起见,不引入额外的依赖,这里选用JDK动态代理。这里重新搬出前面提到的契约接口HelloService

    public interface HelloService {
    
        String sayHello(String name);
    }
    

    接下来需要通过动态代理为此接口添加一个实现:

    public class TestDynamicProxy {
    
        public static void main(String[] args) throws Exception {
            Class<HelloService> interfaceKlass = HelloService.class;
            InvocationHandler handler = new HelloServiceImpl(interfaceKlass);
            HelloService helloService = (HelloService)
                    Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler);
            System.out.println(helloService.sayHello("throwable"));
        }
    
        @RequiredArgsConstructor
        private static class HelloServiceImpl implements InvocationHandler {
    
            private final Class<?> interfaceKlass;
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 这里应该根据方法的返回值类型去决定返回结果
                return String.format("[%s#%s]方法被调用,参数列表:%s", interfaceKlass.getName(), method.getName(),
                        JSON.toJSONString(args));
            }
        }
    }
    // 控制台输出结果
    [club.throwable.contract.HelloService#sayHello]方法被调用,参数列表:["throwable"]
    

    这里可以确认两点:

    1. InvocationHandler实现后会对被代理接口生成一个动态实现类。
    2. 动态实现类(接口)方法被调用的时候,实际上是调用InvocationHandler对应实例的invoke()方法,传入的参数就是当前方法调用的元数据。

    Client端代码实现

    Client端需要通过动态代理为契约接口生成一个动态实现类,然后提取契约接口调用方法时候所能提供的元数据,通过这些元数据和Netty客户端的支持(例如NettyChannel)基于私有RPC协议组装请求信息并且发送请求。这里先定义一个请求参数提取器接口RequestArgumentExtractor

    @Data
    public class RequestArgumentExtractInput {
    
        private Class<?> interfaceKlass;
    
        private Method method;
    }
    
    @Data
    public class RequestArgumentExtractOutput {
    
        private String interfaceName;
    
        private String methodName;
    
        private List<String> methodArgumentSignatures;
    }
    
    // 接口
    public interface RequestArgumentExtractor {
    
        RequestArgumentExtractOutput extract(RequestArgumentExtractInput input);
    }
    

    简单实现一下,解析结果添加到缓存中,实现类DefaultRequestArgumentExtractor代码如下:

    public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {
    
        private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();
    
        @Override
    
        public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
            Class<?> interfaceKlass = input.getInterfaceKlass();
            Method method = input.getMethod();
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
                    Lists.newArrayList(parameterTypes)), x -> {
                RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
                output.setInterfaceName(interfaceKlass.getName());
                List<String> methodArgumentSignatures = Lists.newArrayList();
                for (Class<?> klass : parameterTypes) {
                    methodArgumentSignatures.add(klass.getName());
                }
                output.setMethodArgumentSignatures(methodArgumentSignatures);
                output.setMethodName(methodName);
                return output;
            });
        }
    
        @RequiredArgsConstructor
        private static class CacheKey {
    
            private final String interfaceName;
            private final String methodName;
            private final List<Class<?>> parameterTypes;
    
            @Override
            public boolean equals(Object o) {
                if (this == o) return true;
                if (o == null || getClass() != o.getClass()) return false;
                CacheKey cacheKey = (CacheKey) o;
                return Objects.equals(interfaceName, cacheKey.interfaceName) &&
                        Objects.equals(methodName, cacheKey.methodName) &&
                        Objects.equals(parameterTypes, cacheKey.parameterTypes);
            }
    
            @Override
            public int hashCode() {
                return Objects.hash(interfaceName, methodName, parameterTypes);
            }
        }
    }
    

    在不考虑重连、断连等情况下,新增一个类ClientChannelHolder用于保存Netty客户端的Channel实例:

    public class ClientChannelHolder {
    
        public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
    }
    

    接着新增一个契约动态代理工厂(工具类)ContractProxyFactory,用于为契约接口生成代理类实例:

    public class ContractProxyFactory {
    
        private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
        private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
    
        @SuppressWarnings("unchecked")
        public static <T> T ofProxy(Class<T> interfaceKlass) {
            // 缓存契约接口的代理类实例
            return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
                    Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
                        RequestArgumentExtractInput input = new RequestArgumentExtractInput();
                        input.setInterfaceKlass(interfaceKlass);
                        input.setMethod(method);
                        RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
                        // 封装请求参数
                        RequestMessagePacket packet = new RequestMessagePacket();
                        packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
                        packet.setVersion(ProtocolConstant.VERSION);
                        packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
                        packet.setMessageType(MessageType.REQUEST);
                        packet.setInterfaceName(output.getInterfaceName());
                        packet.setMethodName(output.getMethodName());
                        packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
                        packet.setMethodArguments(args);
                        Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
                        // 发起请求
                        channel.writeAndFlush(packet);
                        // 这里方法返回值需要进行同步处理,相对复杂,后面专门开一篇文章讲解,暂时统一返回字符串
                        // 如果契约接口的返回值类型不是字符串,这里方法返回后会抛出异常
                        return String.format("[%s#%s]调用成功,发送了[%s]到NettyServer[%s]", output.getInterfaceName(),
                                output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
                    }));
        }
    }
    

    最后编写客户端ClientApplication的代码:

    @Slf4j
    public class ClientApplication {
    
        public static void main(String[] args) throws Exception {
            int port = 9092;
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            try {
                bootstrap.group(workerGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
                bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                        ch.pipeline().addLast(new LengthFieldPrepender(4));
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
                        ch.pipeline().addLast(new ResponseMessagePacketDecoder());
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
                                Object targetPayload = packet.getPayload();
                                if (targetPayload instanceof ByteBuf) {
                                    ByteBuf byteBuf = (ByteBuf) targetPayload;
                                    int readableByteLength = byteBuf.readableBytes();
                                    byte[] bytes = new byte[readableByteLength];
                                    byteBuf.readBytes(bytes);
                                    targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
                                    byteBuf.release();
                                }
                                packet.setPayload(targetPayload);
                                log.info("接收到来自服务端的响应消息,消息内容:{}", JSON.toJSONString(packet));
                            }
                        });
                    }
                });
                ChannelFuture future = bootstrap.connect("localhost", port).sync();
                // 保存Channel实例,暂时不考虑断连重连
                ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
                // 构造契约接口代理类实例
                HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
                String result = helloService.sayHello("throwable");
                log.info(result);
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    先启动《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》一文中的ServerApplication,再启动ClientApplication,控制台输出如下:

    // 服务端日志
    2020-01-16 22:34:51 [main] INFO  c.throwable.server.ServerApplication - 启动NettyServer[9092]成功...
    2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
    2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
    2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":""throwable say hello!"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
    
    // 客户端日志
    2020-01-16 22:36:35 [main] INFO  c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]调用成功,发送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092]
    2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO  c.throwable.client.ClientApplication - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":""throwable say hello!"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
    

    小结

    Client端主要负责契约接口调用转换为发送RPC协议请求这一步,核心技术就是动态代理,在不进行模块封装优化的前提下实现是相对简单的。这里其实Client端还有一个比较大的技术难题没有解决,上面例子中客户端日志输出如果眼尖的伙伴会发现,Client端发送RPC请求的线程(main线程)和Client端接收ServerRPC响应处理的线程(nioEventLoopGroup-2-1线程)并不相同,这一点是Netty处理网络请求之所以能够如此高效的根源(简单来说就是请求和响应是异步的,两个流程本来是互不感知的)。但是更多情况下,我们希望外部请求是同步的,希望发送RPC请求的线程得到响应结果再返回(这里请求和响应有可能依然是异步流程)。下一篇文章会详细分析一下如果对请求-响应做同步化处理。

    Demo项目地址:

    (c-2-d e-a-20200116)

    技术公众号(《Throwable文摘》),不定期推送笔者原创技术文章(绝不抄袭或者转载):

    娱乐公众号(《天天沙雕》),甄选奇趣沙雕图文和视频不定期推送,缓解生活工作压力:

  • 相关阅读:
    非root用户在linux下安装多个版本的CUDA和cuDNN(cuda 8、cuda 10.1 等)
    python相关总结
    可视化滤波器
    Ubuntu 和windows程序区别
    远程服务器运行代码相关
    Ubuntu
    jmeter学习(1)基础支持+安装部署
    python中eval方法的使用
    mysql学习(4)python操作数据库
    mysql学习(3)10045错误,连接不上数据库
  • 原文地址:https://www.cnblogs.com/throwable/p/12203684.html
Copyright © 2011-2022 走看看