zoukankan      html  css  js  c++  java
  • 基于netty实现rpc框架-spring boot客户端

    上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。

    这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。

    demo地址

    https://gitee.com/syher/grave-netty

    RPC实现

    同样定义注解扫描service接口。

    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    @Documented
    @Import({NettyClientScannerRegistrar.class, NettyClientApplicationContextAware.class})
    public @interface NettyClientScan {
    
        String[] basePackages();
    
        Class<? extends NettyFactoryBean> factoryBean() default NettyFactoryBean.class;
    }
    

      

    该注解用于spring boot启动类上,参数basePackages指定接口所在的包路径。

    @SpringBootApplication
    @NettyClientScan(basePackages = {
            "com.braska.grave.netty.api.service"
    })
    public class GraveNettyClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(GraveNettyClientApplication.class, args);
        }
    
    }
    

      

    NettyServerScannerRegistrar类注册bean。

    public class NettyClientScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {
    
        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
            // spring bean注册
            NettyClientInterfaceScanner scanner = new NettyClientInterfaceScanner(registry);
    
            AnnotationAttributes annoAttrs =
                    AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyClientScan.class.getName()));
    
            Class<? extends NettyFactoryBean> nettyFactoryBeanClass = annoAttrs.getClass("factoryBean");
            if (!NettyFactoryBean.class.equals(nettyFactoryBeanClass)) {
                scanner.setNettyFactoryBean(BeanUtils.instantiateClass(nettyFactoryBeanClass));
            }
    
            List<String> basePackages = new ArrayList<String>();
            for (String pkg : annoAttrs.getStringArray("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
    
            scanner.doScan(StringUtils.toStringArray(basePackages));
        }
    }
    

      

    NettyClientInterfaceScanner类使用jdk动态代理basePackages路径下的接口。

    public class NettyClientInterfaceScanner extends ClassPathBeanDefinitionScanner {
        private NettyFactoryBean nettyFactoryBean = new NettyFactoryBean();
    
    
        @Override
        public Set<BeanDefinitionHolder> doScan(String... basePackages) {
            Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
    
            if (beanDefinitions.isEmpty()) {
            } else {
                processBeanDefinitions(beanDefinitions);
            }
    
            return beanDefinitions;
        }
    
        private void processBeanDefinitions(
                Set<BeanDefinitionHolder> beanDefinitions) {
    
            GenericBeanDefinition definition;
    
            for (BeanDefinitionHolder holder : beanDefinitions) {
    
                definition = (GenericBeanDefinition) holder.getBeanDefinition();
                // 为对象属性赋值(这一块我也还不太明白)
           definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
                // 这里的nettyFactoryBean是生成Bean实例的工厂,不是Bean本身
                definition.setBeanClass(this.nettyFactoryBean.getClass());
    
                definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            }
        }
    }
    

      

    NettyFactoryBean 

    public class NettyFactoryBean<T> implements FactoryBean<T> {
        private Class<T> nettyInterface;
    
        public NettyFactoryBean() {}
    
        public NettyFactoryBean(Class<T> nettyInterface) {
            this.nettyInterface = nettyInterface;
        }
    
        @Override
        public T getObject() throws Exception {
            // 通过jdk动态代理创建实例
            return (T) Proxy.newProxyInstance(nettyInterface.getClassLoader(), new Class[]{nettyInterface}, c.getInstance());
        }
    
        @Override
        public Class<?> getObjectType() {
            return this.nettyInterface;
        }
    
        @Override
        public boolean isSingleton() {
            return true;
        }
    }
    

      

    关键来了,NettyInterfaceInvoker类负责数据包封装及发送。

    public class NettyInterfaceInvoker implements InvocationHandler {
    
        private RequestSender sender;
        // 静态内部类做单例模式 
        private static class SINGLETON {
            private static final NettyInterfaceInvoker invoker = new NettyInterfaceInvoker();
    
            private static NettyInterfaceInvoker setSender(RequestSender sender) {
                invoker.sender = sender;
                return invoker;
            }
        }
    
        public static NettyInterfaceInvoker getInstance() {
            return SINGLETON.invoker;
        }
    
        public static NettyInterfaceInvoker setSender(RequestSender sender) {
            return SINGLETON.setSender(sender);
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 数据包封装,包含类名、方法名及参数等信息。
            Request request = new Request();
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameters(args);
            request.setParameterTypes(method.getParameterTypes());
            request.setId(UUID.randomUUID().toString());
            // 数据发送
            Object result = sender.send(request);
            Class<?> returnType = method.getReturnType();
            // 处理返回数据
            Response response = JSON.parseObject(result.toString(), Response.class);
            if (response.getCode() == 1) {
                throw new Exception(response.getError());
            }
            if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {
                return response.getData();
            } else if (Collection.class.isAssignableFrom(returnType)) {
                return JSONArray.parseArray(response.getData().toString(), Object.class);
            } else if (Map.class.isAssignableFrom(returnType)) {
                return JSON.parseObject(response.getData().toString(), Map.class);
            } else {
                Object data = response.getData();
                return JSONObject.parseObject(data.toString(), returnType);
            }
        }
    }
    

      

    接着我们来看看RequestSender怎么处理数据的。

    public interface RequestSender {
        Channel connect(SocketAddress address) throws InterruptedException;
    
        Object send(Request request) throws InterruptedException;
    }
    

      

    RequestSender本身只是一个接口。他的实现类有:

    public class NettyClientApplicationContextAware extends ChannelInitializer<SocketChannel>
            implements RequestSender, ApplicationContextAware, InitializingBean {
        private static final Logger logger = Logger.getLogger(NettyClientApplicationContextAware.class.getName());
    
        private String remoteAddress;
        private Bootstrap bootstrap;
        private EventLoopGroup group;
        private NettyChannelManager manager;
        private NettyClientHandler handler;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.remoteAddress = applicationContext.getEnvironment().getProperty("remoteAddress");
            this.bootstrap = new Bootstrap();
            this.group = new NioEventLoopGroup(1);
            this.bootstrap.group(group).
                    channel(NioSocketChannel.class).
                    option(ChannelOption.TCP_NODELAY, true).
                    option(ChannelOption.SO_KEEPALIVE, true).
                    handler(this);
            this.manager = new NettyChannelManager(this);
            this.handler = new NettyClientHandler(manager, remoteAddress);
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            // socket连接入口。
            this.manager.refresh(Lists.newArrayList(remoteAddress));
        }
    
        @Override
        public Object send(Request request) throws InterruptedException {
            Channel channel = manager.take();
            if (channel != null && channel.isActive()) {
                SynchronousQueue<Object> queue = this.handler.sendRequest(request, channel);
                Object result = queue.take();
                return JSONArray.toJSONString(result);
            } else {
                Response res = new Response();
                res.setCode(1);
                res.setError("未正确连接到服务器.请检查相关配置信息!");
                return JSONArray.toJSONString(res);
            }
        }
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 0, 30));
            pipeline.addLast(new JSONEncoder());
            pipeline.addLast(new JSONDecoder());
            // 管道处理器
            pipeline.addLast(this.handler);
        }
    
        @Override
        public Channel connect(SocketAddress address) throws InterruptedException {
            ChannelFuture future = bootstrap.connect(address);
            // 建立长连接,提供失败重连。
            future.addListener(new ConnectionListener(this.manager, this.remoteAddress));
            Channel channel = future.channel();//future.sync().channel();
            return channel;
        }
    
        public void destroy() {
            this.group.shutdownGracefully();
        }
    }
    

      

    NettyClientHandler类处理管道事件。与服务端不通,这个管道处理器是继承ChannelInboundHandlerAdapter类。

    @ChannelHandler.Sharable
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
    
        private ConcurrentHashMap<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
        private NettyChannelManager manager;
        private String remoteAddress;
    
        public NettyClientHandler(NettyChannelManager manager, String remoteAddress) {
            this.manager = manager;
            this.remoteAddress = remoteAddress;
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
            logger.info("与netty服务器断开连接." + address);
            ctx.channel().close();
            manager.remove(ctx.channel());
            // 掉线重连
            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(() -> {
                manager.refresh(Lists.newArrayList(remoteAddress));
            }, 1L, TimeUnit.SECONDS);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 处理服务端返回的数据
            Response response = JSON.parseObject(msg.toString(), Response.class);
            String requestId = response.getRequestId();
            SynchronousQueue<Object> queue = queueMap.get(requestId);
            queue.put(response);
            queueMap.remove(requestId);
        }
    
        public SynchronousQueue<Object> sendRequest(Request request, Channel channel) {
            // 使用阻塞队列处理客户端请求
            SynchronousQueue<Object> queue = new SynchronousQueue<>();
            queueMap.put(request.getId(), queue);
            channel.writeAndFlush(request);
            return queue;
        }
    
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            logger.info("发送心跳消息...");
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.ALL_IDLE) {
                    Request request = new Request();
                    request.setMethodName("heartBeat");
                    ctx.channel().writeAndFlush(request);
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    

      

    这样,RPC的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。

    结合上篇RPC服务端。一个完整的RPC框架就搭建完了。

    当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。

  • 相关阅读:
    Spring配置多个数据源
    虚拟机内存结构
    Java中sleep,wait,yield,join的区别
    Java的四种引用方式
    Java 中的泛型详解-Java编程思想
    Java RTTI和反射
    linux 分析java 线程状态
    小容量的byteBuffer 读取大文本
    @Conditional 原理
    替换字符串占位符
  • 原文地址:https://www.cnblogs.com/braska/p/12759062.html
Copyright © 2011-2022 走看看