zoukankan      html  css  js  c++  java
  • 基于netty框架的轻量级RPC实现(附源码)

    前言

      Rpc( Remote procedure call):是一种请求 - 响应协议。RPC由客户端启动,客户端向已知的远程服务器发送请求消息,以使用提供的参数执行指定的过程。远程服务器向客户端发送响应,应用程序继续其进程。当服务器正在处理该调用时,客户端被阻塞(它等待服务器在恢复执行之前完成处理),除非客户端向服务器发送异步请求,例如XMLHttpRequest。在各种实现中存在许多变化和细微之处,导致各种不同(不兼容)的RPC协议。

      技术选型:

    1. Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
    2. Netty:基于NIO的网络编程框架,封装了NIO细节,使用更加方便
    3. SpringBoot:Spring 的组件的集合,内部封装服务器,实现了自动加载

    1.封装请求的pojo和响应的pojo

        

    public class RpcRequest {
        public RpcRequest() {
        }
    
        private Long id;
        /**
         * rpc name
         */
        private String className;
        /**
         * 方法名
         */
        private String methodName;
        /**
         * 参数
         */
        private HashMap<Class<?>, Object> arguments;
    
         //get and set ...

      

    public class RpcResponse {
        public RpcResponse() {
        }
    
        private Long id;
        private Integer code;
        private Object result;
        private String failMsg;
        // get and set ...

    2.server端对request进行解码,对response进行编码。反之client端对request进行编码,对response进行解码,因此需要编写两个编码和解码器,在不同端,对不同pojo进行编码解码

      编码类只对属于某个 genericClass的类进行编码,SerializationUtil为使用Protobuffer工具封装的一个工具类

    @ChannelHandler.Sharable
    public class RpcEncode extends MessageToByteEncoder {
        //client 端为 request, server 端为 response
        private Class<?> genericClass;
    
        public RpcEncode(Class<?> clazz) {
            this.genericClass = clazz;
        }
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext,
                              Object o, ByteBuf byteBuf) throws Exception {
            if (genericClass.isInstance(o)) {
                byte[] data = SerializationUtil.serialize(o);
                byteBuf.writeInt(data.length);
                byteBuf.writeBytes(data);
            }
        }
    }

      同样的,解码 

    public class RpcDecode extends ByteToMessageDecoder {
        private Class<?> genericClass;
    
        public RpcDecode(Class<?> clazz) {
            this.genericClass = clazz;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx,
                              ByteBuf in, List<Object> out) throws Exception {
            int dataLength = in.readInt();
            //一个整数4个字节
            if (dataLength < 4) {
                return;
            }
            in.markReaderIndex();
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            Object obj = SerializationUtil.deserialize(data, genericClass);
            out.add(obj);
        }

    3. server端将数据解码后,开始使用handler处理client的请求,handler里包含一个map,里面value是使用@RpcService后的bean,key是注解的value,通过RpcRequest的className,从map的key进行匹配,找到bean之后,通过反射执行  methodName对应的方法 和arguments的参数

      @RpcService用于标识在发布的服务类上,value为client 请求的classname,该注解继承了@Component注解,将会被spring注册为bean

        

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface RpcService {
        String value();
    
        String description() default "";
    }

      通过@RpcService 标识的类,将被注册为bean实例,这里将在 LrpcHandlerAutoConfiguration类中,将这些标识了该注解的bean实例找出来,传入handler中执行client的请求方法

      

    @Configurable
    @Component
    public class LrpcHandlerAutoConfiguration implements ApplicationContextAware {
        private ApplicationContext context;
        @Value("${lrpc.server}")
        public String port;
    
        @Bean
        public RpcHandler rpcHandler() {
            Map<String, Object> rpcBeans = context.getBeansWithAnnotation(RpcService.class);
            Set<String> beanNameSet = rpcBeans.keySet();
            for (String beanName : beanNameSet) {
                Object obj = rpcBeans.get(beanName);
                //rpcService注解会 把value的值传递给component
                RpcService annotation = obj.getClass().getDeclaredAnnotation(RpcService.class);
                //默认bean name
                if (StringUtils.isBlank(annotation.value()) || annotation.value().equals(beanName)) {
                    continue;
                }
                rpcBeans.put(annotation.value(), rpcBeans.get(beanName));
                //去掉重复
                rpcBeans.remove(beanName);
            }
            return new RpcHandler(rpcBeans);
        }
    //..........................

    RpcHandler的构造函数,注入了一份rpcBeans的引用,当client的RpcRequest请求时,将从该rpcBeans中获取对应的bean

    @ChannelHandler.Sharable
    public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
        private static final Logger logger = Logger.getLogger(RpcHandler.class.getName());
        private Map<String, Object> rpcBeans;
    
        public RpcHandler(Map<String, Object> rpcBeans) {
            this.rpcBeans = rpcBeans;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
            RpcResponse rpcResponse = handle(msg);
            ctx.channel().writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
        }
    
        private RpcResponse handle(RpcRequest msg) throws InvocationTargetException {
            RpcResponse rpcResponse = new RpcResponse();
            Object obj = rpcBeans.get(msg.getClassName());
            //TODO 暂时这样吧
            if (Objects.isNull(obj)) {
                System.out.println("未找到service");
                rpcResponse.setResult(null);
                rpcResponse.setCode(404);
                logger.warning("请求的service未找到,msg:" + msg.toString());
                return rpcResponse;
            }
            rpcResponse.setId(msg.getId());
            //解析请求,执行相应的rpc方法
            Class<?> clazz = obj.getClass();
            String methodName = msg.getMethodName();
            HashMap<Class<?>, Object> arguments = msg.getArguments();
            FastClass fastClass = FastClass.create(clazz);
            FastMethod method = fastClass.getMethod(methodName,
                    arguments.keySet().toArray(new Class[arguments.size()]));
            Object result = method.invoke(obj, arguments.values().toArray());
            rpcResponse.setResult(result);
            rpcResponse.setCode(200);
            return rpcResponse;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            logger.warning(cause.toString());
        }
    }

    4. 启动 LrpcServer,LrpcChannelInit 在 LrpcHandlerAutoConfiguration中进行初始化,同时注入 lrpc.server 环境变量给port参数

    @Component
    public class LrpcServerImpl implements LrpcServer, ApplicationListener<ApplicationReadyEvent> {
        @Autowired
        LrpcChannelInit lrpcChannelInit;
    
        @Override
        public void connect() {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .channel(NioServerSocketChannel.class)
                        .childHandler(lrpcChannelInit)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                ChannelFuture future = bootstrap.bind(new InetSocketAddress(lrpcChannelInit.getPort())).sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        @Override
        public void onApplicationEvent(ApplicationReadyEvent event) {
            connect();
        }
    }

    5. 客户端handler类 LrpClientHandler,里面持一把对象锁,因为netty返回数据总是异步的,这里将异步转成同步,利用 Object的wait()和notify()方法实现,LrpClientHandler这里是多例的,不存在竞争状态,因此是线程安全的

      

    public class LrpClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
        private final Object lock = new Object();
        private volatile RpcResponse rpcResponse = null;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
            rpcResponse = msg;
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        public Object getLock() {
            return lock;
        }
    
        public RpcResponse getRpcResponse() {
            return rpcResponse;
        }
    
        public void setRpcResponse(RpcResponse rpcResponse) {
            this.rpcResponse = rpcResponse;
        }
    
    }

    6. LrpcClientChannelInit 中持有一个LrpcClientHandler的引用,在初始化该类时同时初始化LrpcClientHandler

    public class LrpcClientChannelInit extends ChannelInitializer {
        private LrpClientHandler lrpClientHandler;
    
        public LrpcClientChannelInit() {
            lrpClientHandler = new LrpClientHandler();
        }
    
        @Override
        protected void initChannel(Channel ch) {
            //请求加密
            ch.pipeline().addLast(new RpcEncode(RpcRequest.class))
                    .addLast(new RpcDecode(RpcResponse.class))
                    .addLast(new LoggingHandler(LogLevel.INFO))
                    .addLast(lrpClientHandler);
        }
        public synchronized void initHandler(LrpClientHandler lrpClientHandler){
            this.lrpClientHandler = lrpClientHandler;
        }
        public LrpClientHandler getLrpClientHandler() {
            return lrpClientHandler;
        }
    
        public void setLrpClientHandler(LrpClientHandler lrpClientHandler) {
            this.lrpClientHandler = lrpClientHandler;
        }
    }

    7. 持有执行远程方法的host和port,execute(RpcRequest r)中连接,传递参数

    public class LrpcExecutorImpl implements LrpcExecutor {
        private String host;
        private Integer port;
    
        public LrpcExecutorImpl(String host, Integer port) {
            this.host = host;
            this.port = port;
        }
    
    
        @Override
        public RpcResponse execute(RpcRequest rpcRequest) {
            LrpcClientChannelInit lrpcClientChannelInit = new LrpcClientChannelInit();
            Bootstrap b = new Bootstrap();
            EventLoopGroup group = null;
            ChannelFuture future = null;
            try {
                group = new NioEventLoopGroup();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(lrpcClientChannelInit)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                        .option(ChannelOption.SO_KEEPALIVE, true);
                future = b.connect(new InetSocketAddress(host, port)).sync();
                //TODO 连接好了直接发送消息,同步则阻塞等待通知
                future.channel().writeAndFlush(rpcRequest).sync();
                Object lock = lrpcClientChannelInit.getLrpClientHandler().getLock();
                synchronized (lock) {
                    lock.wait();
                }
                RpcResponse rpcResponse = lrpcClientChannelInit.getLrpClientHandler().getRpcResponse();
                if (null != rpcResponse) {
                    future.channel().closeFuture().sync();
                }
                return rpcResponse;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lrpcClientChannelInit.getLrpClientHandler().setRpcResponse(null);
                if (null != group) {
                    group.shutdownGracefully();
                }
            }
            return null;
        }
    //get and set ...
    }

    8.使用实例

         Server端发布服务

    @RpcService("HelloService")
    public class HelloServiceImpl implements HelloService {
        @Override
        public String say(String msg) {
            return "Hello Word!" + msg;
        }
    }

        在application.properties中,注入环境变量:

    #
    
    lrpc.server=8888

        Client 配置服务地址和LrpcExecutor

       

    lrpc.hello.host=xxxxxxxxxxxxxxxxxxx
    lrpc.hello.port=8888
    lrpc.hello.desc=hello rpc调用

      配置调用服务执行器 LrpcExecutor,保存在spring 容器bean里,可通过依赖注入进行调用

    @Configuration
    @Component
    public class RpcConfiguration {
    
        @Bean("rpc.hello")
        @ConfigurationProperties(prefix = "lrpc.hello")
        public RpcServerProperties rpcClientCallProperties() {
            return new RpcServerProperties();
        }
    
        @Bean("helloRpcExecutor")
        LrpcExecutor lrpcExecutor(@Qualifier(value = "rpc.hello") RpcServerProperties rpcServerProperties) {
            return invoke(rpcServerProperties);
        }
    
        private LrpcExecutor invoke(RpcServerProperties config) {
            return new LrpcExecutorImpl(config.getHost(), config.getPort());
        }
    }

      调用服务,methodName为ClassName对应类下的方法名: 

    @Autowired
        @Qualifier(value = "helloRpcExecutor")
        private LrpcExecutor helloRpcExecutor;
    
        @GetMapping("/say")
        public String invoke(String msg) {
            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setClassName("HelloService");
            rpcRequest.setMethodName("say");
            rpcRequest.setId(111L);
            HashMap<Class<?>, Object> arguments = new HashMap<>(8);
            arguments.put(String.class, "good");
            rpcRequest.setArguments(arguments);
            RpcResponse execute = helloRpcExecutor.execute(rpcRequest);
            System.out.println(execute.toString());
            return execute.toString();
        }

    最后,以上为个人练手demo,未来有时间会把未完善的地方慢慢完善,最终目标是做成像dobble那样的pj,如有好的意见或疑惑欢迎各位大佬指点(morty630@foxmail.com),附上 github完整代码:https://github.com/To-echo/lrpc-all    (你的点赞是我的动力)

    相关技术文档  

    objenesis反射工具:http://objenesis.org/details.html

    Protobuf 协议:https://developers.google.com/protocol-buffers/

    Protobuffer序列化工具:https://github.com/protostuff/protostuff

    RPC介绍:https://en.wikipedia.org/wiki/Remote_procedure_call

    Netty官网:https://netty.io/

  • 相关阅读:
    【收藏】如何理解二维数组
    【algo&ds】9.拓扑排序、AOV&AOE、关键路径问题
    【algo&ds】8.最小生成树
    【algo&ds】7.最短路径问题
    【algo&ds】6.图及其存储结构、遍历
    【c&c++】变量初始化
    【algo&ds】【pat】5.并查集及其应用
    【algo&ds】4.B树、字典树、红黑树、跳表
    【algo&ds】4.树和二叉树、完全二叉树、满二叉树、二叉查找树、平衡二叉树、堆、哈夫曼树、散列表
    【algo&ds】3.栈和队列
  • 原文地址:https://www.cnblogs.com/coding400/p/9882789.html
Copyright © 2011-2022 走看看