zoukankan      html  css  js  c++  java
  • Netty与Spring Boot的整合

    ​ 最近有朋友向我询问一些Netty与SpringBoot整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对netty与Spring,SpringMVC的整合是没有什么问题的。现在,就进入正题吧。

    Server端:#

    总的来说,服务端还是比较简单的,自己一共写了三个核心类。分别是

    • NettyServerListener:服务启动监听器
    • ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享
    • RequestDispatcher:请求分排器

    下面开始集成过程:

    1. 在pom.xml中添加以下依赖

      Copy
      <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-configuration-processor</artifactId>
         <optional>true</optional>
      </dependency>
      
    2. 让SpringBoot的启动类实现CommandLineRunner接口并重写run方法,比如我的启动类是CloudApplication.java

      Copy
      @SpringBootApplication
      public class CloudApplication implements CommandLineRunner {
      
          public static void main(String[] args) {
              SpringApplication.run(CloudApplication.class, args);
          }
      
          @Override
          public void run(String... strings) {
          }
      }
      
    3. 创建类NettyServerListener.java

      Copy
      // 读取yml的一个配置类
      import com.edu.hart.modules.constant.NettyConfig;
      // Netty连接信息配置类
      import com.edu.hart.modules.constant.NettyConstant;
      // 
      import com.edu.hart.rpc.util.ObjectCodec;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelPipeline;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
      import io.netty.handler.codec.LengthFieldPrepender;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.PreDestroy;
      import javax.annotation.Resource;
      
      /**
       * 服务启动监听器
       *
       * @author 叶云轩
       */
      @Component
      public class NettyServerListener {
          /**
           * NettyServerListener 日志输出器
           *
           * @author 叶云轩 create by 2017/10/31 18:05
           */
          private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
          /**
           * 创建bootstrap
           */
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          /**
           * BOSS
           */
          EventLoopGroup boss = new NioEventLoopGroup();
          /**
           * Worker
           */
          EventLoopGroup work = new NioEventLoopGroup();
          /**
           * 通道适配器
           */
          @Resource
          private ServerChannelHandlerAdapter channelHandlerAdapter;
          /**
           * NETT服务器配置类
           */
          @Resource
          private NettyConfig nettyConfig;
        
          /**
           * 关闭服务器方法
           */
          @PreDestroy
          public void close() {
              LOGGER.info("关闭服务器....");
              //优雅退出
              boss.shutdownGracefully();
              work.shutdownGracefully();
          }
      
          /**
           * 开启及服务线程
           */
          public void start() {
              // 从配置文件中(application.yml)获取服务端监听端口号
              int port = nettyConfig.getPort();
              serverBootstrap.group(boss, work)
                      .channel(NioServerSocketChannel.class)
                      .option(ChannelOption.SO_BACKLOG, 100)
                      .handler(new LoggingHandler(LogLevel.INFO));
              try {
                  //设置事件处理
                  serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          ChannelPipeline pipeline = ch.pipeline();
                          pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
                                  , 0, 2, 0, 2));
                          pipeline.addLast(new LengthFieldPrepender(2));
                          pipeline.addLast(new ObjectCodec());
      
                          pipeline.addLast(channelHandlerAdapter);
                      }
                  });
                  LOGGER.info("netty服务器在[{}]端口启动监听", port);
                  ChannelFuture f = serverBootstrap.bind(port).sync();
                  f.channel().closeFuture().sync();
              } catch (InterruptedException e) {
                  LOGGER.info("[出现异常] 释放资源");
                  boss.shutdownGracefully();
                  work.shutdownGracefully();
              }
          }
      }
      
    4. 创建类ServerChannelHandlerAdapter.java - 通道适配器

      Copy
      // 记录调用方法的元信息的类
      import com.edu.hart.rpc.entity.MethodInvokeMeta;
      import io.netty.channel.ChannelHandler.Sharable;
      import io.netty.channel.ChannelHandlerAdapter;
      import io.netty.channel.ChannelHandlerContext;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      
      /**
       * 多线程共享
       */
      @Component
      @Sharable
      public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
         /**
           * 日志处理
           */
          private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter.class);
       /**
           * 注入请求分排器
           */
          @Resource
          private RequestDispatcher dispatcher;
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
              cause.printStackTrace();
              ctx.close();
          }
      
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) {
              MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
              // 屏蔽toString()方法
              if (invokeMeta.getMethodName().endsWith("toString()")
                      && !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
                  logger.info("客户端传入参数 :{},返回值:{}",
                          invokeMeta.getArgs(), invokeMeta.getReturnType());
              dispatcher.dispatcher(ctx, invokeMeta);
          }
      }
      
    5. RequestDispatcher.java

      Copy
      // 封装的返回信息枚举类
      import com.edu.hart.modules.communicate.ResponseCodeEnum;
      // 封装的返回信息实体类
      import com.edu.hart.modules.communicate.ResponseResult;
      // 封装的连接常量类
      import com.edu.hart.modules.constant.NettyConstant;
      // 记录元方法信息的实体类
      import com.edu.hart.rpc.entity.MethodInvokeMeta;
      // 对于返回值为空的一个处理
      import com.edu.hart.rpc.entity.NullWritable;
      // 封装的返回信息实体工具类
      import com.edu.hart.rpc.util.ResponseResultUtil;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelFutureListener;
      import io.netty.channel.ChannelHandlerContext;
      import org.springframework.beans.BeansException;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.ApplicationContextAware;
      import org.springframework.stereotype.Component;
      
      import java.lang.reflect.Method;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      /**
       * 请求分排器
       */
      @Component
      public class RequestDispatcher implements ApplicationContextAware {
          private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
          private ApplicationContext app;
      
          /**
           * 发送
           *
           * @param ctx
           * @param invokeMeta
           */
          public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
              executorService.submit(() -> {
                  ChannelFuture f = null;
                  try {
                      Class<?> interfaceClass = invokeMeta.getInterfaceClass();
                      String name = invokeMeta.getMethodName();
                      Object[] args = invokeMeta.getArgs();
                      Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
                      Object targetObject = app.getBean(interfaceClass);
                      Method method = targetObject.getClass().getMethod(name, parameterTypes);
                      Object obj = method.invoke(targetObject, args);
                      if (obj == null) {
                          f = ctx.writeAndFlush(NullWritable.nullWritable());
                      } else {
                          f = ctx.writeAndFlush(obj);
                      }
                      f.addListener(ChannelFutureListener.CLOSE);
                  } catch (Exception e) {
                      ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
                      f = ctx.writeAndFlush(error);
                  } finally {
                      f.addListener(ChannelFutureListener.CLOSE);
                  }
              });
          }
      
          /**
           * 加载当前application.xml
           *
           * @param ctx
           * @throws BeansException
           */
          public void setApplicationContext(ApplicationContext ctx) throws BeansException {
              this.app = ctx;
          }
      }
      
    6. application.yml文件中对于netty的一个配置

      Copy
      netty:
        port: 11111
      
    7. NettyConfig.java

      Copy
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.stereotype.Component;
      
      /**
       * 读取yml配置文件中的信息
       * Created by 叶云轩 on 2017/10/31 - 18:38
       * Concat tdg_yyx@foxmail.com
       */
      @Component
      @ConfigurationProperties(prefix = "netty")
      public class NettyConfig {
      
          private int port;
      
          public int getPort() {
              return port;
          }
      
          public void setPort(int port) {
              this.port = port;
          }
      }
      
    8. NettyConstanct.java

      Copy
      import org.springframework.stereotype.Component;
      
      /**
       * Netty服务器常量
       * Created by 叶云轩 on 2017/10/31 - 17:47
       * Concat tdg_yyx@foxmail.com
       */
      @Component
      public class NettyConstant {
      
          /**
           * 最大线程量
           */
          private static final int MAX_THREADS = 1024;
          /**
           * 数据包最大长度
           */
          private static final int MAX_FRAME_LENGTH = 65535;
      
          public static int getMaxFrameLength() {
              return MAX_FRAME_LENGTH;
          }
      
          public static int getMaxThreads() {
              return MAX_THREADS;
          }
      }
      

      至此,netty服务端算是与SpringBoot整合成功。那么看一下启动情况吧。

      spring-boot netty集成启动

    Client端:#

    Client我感觉要比Server端要麻烦一点。这里还是先给出核心类吧。

    • NettyClient : netty客户端
    • ClientChannelHandlerAdapter : 客户端通道适配器
    • CustomChannelInitalizer:自定义通道初始化工具
    • RPCProxyFactoryBean:RPC通信代理工厂

    在Client端里。SpringBoot的启动类要继承SpringBootServletInitializer这个类,并覆盖SpringApplicationBuilder方法

    Copy
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.builder.SpringApplicationBuilder;
    import org.springframework.boot.web.support.SpringBootServletInitializer;
    
    @SpringBootApplication
    public class OaApplication extends SpringBootServletInitializer {
    
        public static void main(String[] args) {
            SpringApplication.run(OaApplication.class, args);
        }
    
        @Override
        protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
            return builder.sources(OaApplication.class);
        }
    }
    
    
    1. NettyClient.java

      Copy
      // 记录元方法信息的实体类
      import com.edu.hart.rpc.entity.MethodInvokeMeta;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import javax.management.MBeanServer;
      
      /**
       * 客户端发送类
       * Created by 叶云轩 on 2017/6/16-16:58
       * Concat tdg_yyx@foxmail.com
       */
      public class NettyClient {
      
          private Logger logger = LoggerFactory.getLogger(MBeanServer.class);
          private Bootstrap bootstrap;
          private EventLoopGroup worker;
          private int port;
          private String url;
          private int MAX_RETRY_TIMES = 10;
      
          public NettyClient(String url, int port) {
              this.url = url;
              this.port = port;
              bootstrap = new Bootstrap();
              worker = new NioEventLoopGroup();
              bootstrap.group(worker);
              bootstrap.channel(NioSocketChannel.class);
          }
      
          public void close() {
              logger.info("关闭资源");
              worker.shutdownGracefully();
          }
      
          public Object remoteCall(final MethodInvokeMeta cmd, int retry) {
              try {
                  CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd);
                  bootstrap.handler(customChannelInitializer);
                  ChannelFuture sync = bootstrap.connect(url, port).sync();
                  sync.channel().closeFuture().sync();
                  Object response = customChannelInitializer.getResponse();
                  return response;
              } catch (InterruptedException e) {
                  retry++;
                  if (retry > MAX_RETRY_TIMES) {
                      throw new RuntimeException("调用Wrong");
                  } else {
                      try {
                          Thread.sleep(100);
                      } catch (InterruptedException e1) {
                          e1.printStackTrace();
                      }
                      logger.info("第{}次尝试....失败", retry);
                      return remoteCall(cmd, retry);
                  }
              }
          }
      }
      
    2. ClientChannelHandlerAdapter.java

      Copy
      import com.edu.hart.rpc.entity.MethodInvokeMeta;
      import io.netty.channel.ChannelHandlerAdapter;
      import io.netty.channel.ChannelHandlerContext;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      /**
       * Created by 叶云轩 on 2017/6/16-17:03
       * Concat tdg_yyx@foxmail.com
       */
      public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
          private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter.class);
          private MethodInvokeMeta methodInvokeMeta;
          private CustomChannelInitializerClient channelInitializerClient;
      
          public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) {
              this.methodInvokeMeta = methodInvokeMeta;
              this.channelInitializerClient = channelInitializerClient;
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              logger.info("客户端出异常了,异常信息:{}", cause.getMessage());
              cause.printStackTrace();
              ctx.close();
          }
      
          @Override
          public void channelActive(ChannelHandlerContext ctx) throws Exception {
              if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString()))
                  logger.info("客户端发送信息参数:{},信息返回值类型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType());
              ctx.writeAndFlush(methodInvokeMeta);
      
          }
      
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              channelInitializerClient.setResponse(msg);
          }
      }
      
    3. CustomChannelInitializerClient.java

      Copy
      import com.edu.hart.rpc.entity.MethodInvokeMeta;
      import com.edu.hart.rpc.entity.NullWritable;
      import com.edu.hart.rpc.util.ObjectCodec;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelPipeline;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
      import io.netty.handler.codec.LengthFieldPrepender;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      

      /**

      • Created by 叶云轩 on 2017/6/16-15:01
      • Concat tdg_yyx@foxmail.com
        */
        public class CustomChannelInitializerClient extends ChannelInitializer {
    Copy
       private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient.class);
    
       private MethodInvokeMeta methodInvokeMeta;
    
       private Object response;
    
       public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) {
           if (!"toString".equals(methodInvokeMeta.getMethodName())) {
               logger.info("[CustomChannelInitializerClient] 调用方法名:{},入参:{},参数类型:{},返回值类型{}"
                       , methodInvokeMeta.getMethodName()
                       , methodInvokeMeta.getArgs()
                       , methodInvokeMeta.getParameterTypes()
                       , methodInvokeMeta.getReturnType());
           }
           this.methodInvokeMeta = methodInvokeMeta;
       }
    
       public Object getResponse() {
           if (response instanceof NullWritable) {
               return null;
           }
           return response;
       }
    
       public void setResponse(Object response) {
           this.response = response;
       }
    
       @Override
       protected void initChannel(SocketChannel ch) {
           ChannelPipeline pipeline = ch.pipeline();
           pipeline.addLast(new LengthFieldPrepender(2));
           pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2));
           pipeline.addLast(new ObjectCodec());
           pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this));
       }
    

    }

    Copy
    4. RPCProxyFactoryBean.java
    
    

    import com.edu.hart.rpc.entity.MethodInvokeMeta;
    import com.edu.hart.rpc.util.WrapMethodUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.config.AbstractFactoryBean;

    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;

    /**
    * Created by 叶云轩 on 2017/6/16-17:16
    * Concat tdg_yyx@foxmail.com
    */
    public class RPCProxyFactoryBean extends AbstractFactoryBean implements InvocationHandler {
    private Logger logger = LoggerFactory.getLogger(RPCProxyFactoryBean.class);

    Copy
       private Class interfaceClass;
    
       private NettyClient nettyClient;
    
       @Override
       public Class<?> getObjectType() {
           return interfaceClass;
       }
    
       @Override
       protected Object createInstance() throws Exception {
           logger.info("[代理工厂] 初始化代理Bean : {}", interfaceClass);
           return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this);
       }
    
       @Override
       public Object invoke(Object proxy, Method method, Object[] args) {
           final MethodInvokeMeta methodInvokeMeta = WrapMethodUtils.readMethod(interfaceClass, method, args);
           if (!methodInvokeMeta.getMethodName().equals("toString")) {
               logger.info("[invoke] 调用接口{},调用方法名:{},入参:{},参数类型:{},返回值类型{}",
                       methodInvokeMeta.getInterfaceClass(), methodInvokeMeta.getMethodName()
                       , methodInvokeMeta.getArgs(), methodInvokeMeta.getParameterTypes(), methodInvokeMeta.getReturnType());
           }
           return nettyClient.remoteCall(methodInvokeMeta, 0);
       }
    
       public void setInterfaceClass(Class interfaceClass) {
           this.interfaceClass = interfaceClass;
       }
    
       public void setNettyClient(NettyClient nettyClient) {
           this.nettyClient = nettyClient;
       }
    

    }

    Copy
    
       至此,netty-client与SpringBoot的集成了算完毕了。同样 ,在netty-client中也要加入相应的依赖
    
       不过上面server与client使用了一些公共的类和工具。下面也给列举中出来。
    
    ###### MethodInvokeMeta.java
    
    

    import org.springframework.stereotype.Component;

    import java.io.Serializable;

    /**
    * 记录调用方法的元信息
    * Created by 叶云轩 on 2017/6/7-15:41
    * Concat tdg_yyx@foxmail.com
    */
    @Component
    public class MethodInvokeMeta implements Serializable {

    Copy
       private static final long serialVersionUID = 8379109667714148890L;
       //接口
       private Class<?> interfaceClass;
       //方法名
       private String methodName;
       //参数
       private Object[] args;
       //返回值类型
       private Class<?> returnType;
       //参数类型
       private Class<?>[] parameterTypes;
    
       public Object[] getArgs() {
           return args;
       }
    
       public void setArgs(Object[] args) {
           this.args = args;
       }
    
       public Class<?> getInterfaceClass() {
           return interfaceClass;
       }
    
       public void setInterfaceClass(Class<?> interfaceClass) {
           this.interfaceClass = interfaceClass;
       }
    
       public String getMethodName() {
           return methodName;
       }
    
       public void setMethodName(String methodName) {
           this.methodName = methodName;
       }
    
       public Class[] getParameterTypes() {
           return parameterTypes;
       }
    
       public void setParameterTypes(Class<?>[] parameterTypes) {
           this.parameterTypes = parameterTypes;
       }
    
       public Class getReturnType() {
           return returnType;
       }
    
       public void setReturnType(Class returnType) {
           this.returnType = returnType;
       }
    

    }

    Copy
    
    ###### NullWritable.java
    
    

    import java.io.Serializable;

    /**
    * 服务器可能返回空的处理
    * Created by 叶云轩 on 2017/6/16-16:46
    * Concat tdg_yyx@foxmail.com
    */
    public class NullWritable implements Serializable {

    Copy
       private static final long serialVersionUID = -8191640400484155111L;
       private static NullWritable instance = new NullWritable();
    
       private NullWritable() {
       }
    
       public static NullWritable nullWritable() {
           return instance;
       }
    

    }

    Copy
    
    ###### ObjectCodec.java
    
    

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageCodec;

    import java.util.List;

    public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> {

    Copy
       @Override
       protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
           byte[] data = ObjectSerializerUtils.serilizer(msg);
           ByteBuf buf = Unpooled.buffer();
           buf.writeBytes(data);
           out.add(buf);
       }
    
       @Override
       protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
           byte[] bytes = new byte[msg.readableBytes()];
           msg.readBytes(bytes);
           Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
           out.add(deSerilizer);
       }
    

    }

    Copy
    
    ###### ObjectSerializerUtils.java
    
    

    package com.edu.hart.rpc.util;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.*;

    /**
    * 对象序列化工具
    */
    public class ObjectSerializerUtils {

    Copy
       private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils.class);
    
       /**
        * 反序列化
        *
        * @param data
        * @return
        */
       public static Object deSerilizer(byte[] data) {
           if (data != null && data.length > 0) {
               try {
                   ByteArrayInputStream bis = new ByteArrayInputStream(data);
                   ObjectInputStream ois = new ObjectInputStream(bis);
                   return ois.readObject();
               } catch (Exception e) {
                   logger.info("[异常信息] {}", e.getMessage());
                   e.printStackTrace();
               }
               return null;
           } else {
               logger.info("[反序列化] 入参为空");
               return null;
           }
       }
    
       /**
        * 序列化对象
        *
        * @param obj
        * @return
        */
       public static byte[] serilizer(Object obj) {
           if (obj != null) {
               try {
                   ByteArrayOutputStream bos = new ByteArrayOutputStream();
                   ObjectOutputStream oos = new ObjectOutputStream(bos);
                   oos.writeObject(obj);
                   oos.flush();
                   oos.close();
                   return bos.toByteArray();
               } catch (IOException e) {
                   e.printStackTrace();
               }
               return null;
           } else {
               return null;
           }
       }
    

    }

    Copy
    
       下面主要是用于Client端的:
    
    ###### NettyBeanSacnner.java
    
    

    import com.edu.hart.rpc.client.RPCProxyFactoryBean;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.DefaultListableBeanFactory;

    import java.util.List;

    /**
    * 动态加载代理bean到Spring bean工厂
    */
    public class NettyBeanScanner implements BeanFactoryPostProcessor {

    Copy
       private DefaultListableBeanFactory beanFactory;
    
       private String basePackage;
    
       private String clientName;
    
       public NettyBeanScanner(String basePackage, String clientName) {
           this.basePackage = basePackage;
           this.clientName = clientName;
       }
    
    
       /**
        * 注册Bean到Spring的bean工厂
        */
       public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
           this.beanFactory = (DefaultListableBeanFactory) beanFactory;
           // 加载远程服务的接口
           List<String> resolverClass = PackageClassUtils.resolver(basePackage);
           for (String clazz : resolverClass) {
               String simpleName;
               if (clazz.lastIndexOf('.') != -1) {
                   simpleName = clazz.substring(clazz.lastIndexOf('.') + 1);
               } else {
                   simpleName = clazz;
               }
               BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RPCProxyFactoryBean.class);
               gd.addPropertyValue("interfaceClass", clazz);
               gd.addPropertyReference("nettyClient", clientName);
               this.beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
           }
       }
    

    }

    Copy
    
    ###### PackageClassUtils.java
    
       **这个类要说一下,主要是用来加载Server对应的接口的。因为在Client中RPC接口没有实现类,所以要自己将这些接口加载到Spring工厂里面。但是现在有个问题就是需要使用**
    
    ###### SpringBoot中application.yml
    
    

    basePackage: com.edu.hart.rpc.service.login;com.edu.hart.rpc.service.employee;com.edu.hart.rpc.service.authorization;

    Copy
    
       **这样的方式来加载,使用通配符的时候会加载不到,这个问题我还没有解决。**
    
    

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.File;
    import java.util.ArrayList;
    import java.util.List;

    /**
    * 字节文件加载
    */
    public class PackageClassUtils {

    Copy
       private final static Logger LOGGER = LoggerFactory.getLogger(PackageClassUtils.class);
    
       /**
        * 解析包参数
        *
        * @param basePackage 包名
        * @return 包名字符串集合
        */
       public static List<String> resolver(String basePackage) {
           //以";"分割开多个包名
           String[] splitFHs = basePackage.split(";");
           List<String> classStrs = new ArrayList<>();
           //s: com.yyx.util.*
           for (String s : splitFHs) {
               LOGGER.info("[加载类目录] {}", s);
               //路径中是否存在".*" com.yyx.util.*
               boolean contains = s.contains(".*");
               if (contains) {
                   //截断星号  com.yyx.util
                   String filePathStr = s.substring(0, s.lastIndexOf(".*"));
                   //组装路径 com/yyx/util
                   String filePath = filePathStr.replaceAll("\\.", "/");
                   //获取路径 xxx/classes/com/yyx/util
                   File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
                   //获取目录下获取文件
                   getAllFile(filePathStr, file, classStrs);
               } else {
                   String filePath = s.replaceAll("\\.", "/");
                   File file = new File(PackageClassUtils.class.getResource("/").getPath() + "/" + filePath);
                   classStrs = getClassReferenceList(classStrs, file, s);
               }
           }
           return classStrs;
       }
    
       /**
        * 添加全限定类名到集合
        *
        * @param classStrs 集合
        * @return 类名集合
        */
       private static List<String> getClassReferenceList(List<String> classStrs, File file, String s) {
           File[] listFiles = file.listFiles();
           if (listFiles != null && listFiles.length != 0) {
               for (File file2 : listFiles) {
                   if (file2.isFile()) {
                       String name = file2.getName();
                       String fileName = s + "." + name.substring(0, name.lastIndexOf('.'));
                       LOGGER.info("[加载完成] 类文件:{}", fileName);
                       classStrs.add(fileName);
                   }
               }
           }
           return classStrs;
       }
    
    
       /**
        * 获取一个目录下的所有文件
        *
        * @param s
        * @param file
        * @param classStrs
        */
       private static void getAllFile(String s, File file, List<String> classStrs) {
           if (file.isDirectory()) {
               File[] files = file.listFiles();
               if (files != null)
                   for (File file1 : files) {
                       getAllFile(s, file1, classStrs);
                   }
           } else {
               String path = file.getPath();
               String cleanPath = path.replaceAll("/", ".");
               String fileName = cleanPath.substring(cleanPath.indexOf(s), cleanPath.length());
               LOGGER.info("[加载完成] 类文件:{}", fileName);
               classStrs.add(fileName);
           }
       }
    

    }

    Copy
    
    ###### RemoteMethodInvokeUtil.java
    
    

    import com.edu.hart.rpc.entity.MethodInvokeMeta;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;

    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;

    /**
    * 消息处理类
    * Created by 叶云轩 on 2017/6/7-15:49
    * Concat tdg_yyx@foxmail.com
    */
    public class RemoteMethodInvokeUtil implements ApplicationContextAware {

    Copy
       private ApplicationContext applicationContext;
    
       public Object processMethod(MethodInvokeMeta methodInvokeMeta) throws InvocationTargetException, IllegalAccessException {
           Class interfaceClass = methodInvokeMeta.getInterfaceClass();
           Object bean = applicationContext.getBean(interfaceClass);
           Method[] declaredMethods = interfaceClass.getDeclaredMethods();
           Method method = null;
           for (Method declaredMethod : declaredMethods) {
               if (methodInvokeMeta.getMethodName().equals(declaredMethod.getName())) {
                   method = declaredMethod;
               }
           }
           Object invoke = method.invoke(bean, methodInvokeMeta.getArgs());
           return invoke;
       }
    
       @Override
       public void setApplicationContext(ApplicationContext app) throws BeansException {
           applicationContext = app;
       }
    

    }

    Copy
    
    ###### WrapMethodUtils.java
    
    

    import com.edu.hart.rpc.entity.MethodInvokeMeta;

    import java.lang.reflect.Method;

    public class WrapMethodUtils {
    /**
    * 获取 method的元数据信息
    *
    * @param interfaceClass
    * @param method
    * @param args
    * @return
    */
    public static MethodInvokeMeta readMethod(Class interfaceClass, Method method, Object[] args) {
    MethodInvokeMeta mim = new MethodInvokeMeta();
    mim.setInterfaceClass(interfaceClass);
    mim.setArgs(args);
    mim.setMethodName(method.getName());
    mim.setReturnType(method.getReturnType());
    Class<?>[] parameterTypes = method.getParameterTypes();
    mim.setParameterTypes(parameterTypes);
    return mim;
    }
    }

    Copy
    
    ------
    
    下面的这些类我也会用在与前台通信时使用:
    
    ###### ResponseEnum.java
    
    

    import java.io.Serializable;

    /**

    • 响应码枚举类

    • Created by 叶云轩 on 2017/6/13-11:53

    • Concat tdg_yyx@foxmail.com
      */
      public enum ResponseCodeEnum implements Serializable {

      // region authentication code
      REQUEST_SUCCESS(10000, "请求成功"),
      SERVER_ERROR(99999, "服务器内部错误"),;

      //region 提供对外访问的方法,无需更改
      /**

      • 响应码
        /
        private Integer code;
        /
        *
      • 响应信息
        */
        private String msg;

      ResponseCodeEnum(Integer code, String msg) {
      this.code = code;
      this.msg = msg;
      }

      public Integer getCode() {
      return code;
      }

      public String getMsg() {
      return msg;
      }

      //endregion
      }

    Copy
    
    ###### ResponseResult.java
    
    

    import java.io.Serializable;

    /**

    • 数据返回实体封装

    • Created by 叶云轩 on 2017/6/13-11:38

    • Concat tdg_yyx@foxmail.com

    • @param 通用变量
      */
      public class ResponseResult implements Serializable {

      private static final long serialVersionUID = -3411174924856108156L;
      /**

      • 服务器响应码
        /
        private Integer code;
        /
        *
      • 服务器响应说明
        /
        private String msg;
        /
        *
      • 服务器响应数据
        */
        private T data;

      public ResponseResult() {

      }

      @Override
      public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;

      Copy
       ResponseResult<?> that = (ResponseResult<?>) o;
      
       return (code != null ? code.equals(that.code) : that.code == null) && (msg != null ? msg.equals(that.msg) : that.msg == null) && (data != null ? data.equals(that.data) : that.data == null);
      

      }

      public Integer getCode() {

      Copy
       return code;
      

      }

      public void setCode(Integer code) {
      this.code = code;
      }

      public T getData() {
      return data;
      }

      public void setData(T data) {
      this.data = data;
      }

      public String getMsg() {
      return msg;
      }

      public void setMsg(String msg) {
      this.msg = msg;
      }

      @Override
      public int hashCode() {
      int result = code != null ? code.hashCode() : 0;
      result = 31 * result
      + (msg != null ? msg.hashCode() : 0);
      result = 31 * result + (data != null ? data.hashCode() : 0);
      return result;
      }

      @Override
      public String toString() {
      return "ResponseResult{"
      + "code="
      + code
      + ", msg='"
      + msg
      + '''
      + ", data="
      + data
      + '}';
      }
      }

    Copy
    
    ###### ResponseResultUtil.java
    
    

    import com.edu.hart.modules.communicate.ResponseCodeEnum;
    import com.edu.hart.modules.communicate.ResponseResult;

    /**

    • 返回结果工具类

    • Created by 叶云轩 on 2017/5/29-10:37

    • Concat tdg_yyx@foxmail.com
      */
      public class ResponseResultUtil {

      /**

      • 请求失败返回的数据结构
      • @param responseCodeEnum 返回信息枚举类
      • @return 结果集
        */
        public static ResponseResult error(ResponseCodeEnum responseCodeEnum) {
        ResponseResult ResponseResult = new ResponseResult();
        ResponseResult.setMsg(responseCodeEnum.getMsg());
        ResponseResult.setCode(responseCodeEnum.getCode());
        ResponseResult.setData(null);
        return ResponseResult;
        }

      /**

      • 没有结果集的返回数据结构
      • @return 结果集
        */
        public static ResponseResult success() {
        return success(null);
        }

      /**

      • 成功返回数据结构
      • @param o 返回数据对象
      • @return 返回结果集
        */
        public static ResponseResult success(Object o) {
        ResponseResult responseResult = new ResponseResult();
        responseResult.setMsg(ResponseCodeEnum.REQUEST_SUCCESS.getMsg());
        responseResult.setCode(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
        responseResult.setData(o);
        return responseResult;
        }

      /**

      • 判断是否成功
      • @param responseResult 请求结果
      • @return 判断结果
        */
        public static boolean judgementSuccess(ResponseResult responseResult) {
        return responseResult.getCode().equals(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
        }
        }
    Copy
    
    来,我们测试一下远程通信:
    
    1. Client调用Server的一个接口。可以看到在hart-oa项目中,RPCEmployeeService没有任何实现类,控制台中打印了方法的调用 以及入参信息
    
    ![Client调用Server的一个接口](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131759375-896850881.png)
    
    1. Server断点监听到远程调用,CloudApplication项目为Server端,我们可以看到接收到来自hart-oa的一个请求,参数一致。在CloudApplication中进行相应的处理后,返回到Client(hart-oa)
    
    ![Server断点监听到远程调用 ](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131759531-1793044895.png)
    
    1. 返回信息到Client,可以看到我们(hart-oa)收到了来自CloudApplication的响应,结果是我们封装好的ResponseResult.
    
    ![返回信息到Client](http://images2017.cnblogs.com/blog/1161505/201801/1161505-20180129131804968-429317475.png)
    
    ------
    
    嗯 ~至此整合测试完成。要有想知道与Spring整合的,也可以留言或者邮箱:tdg_yyx@foxmail.com给我~~~
    
    若是想学习Netty的,对上面 代码看不懂的也可以留言或者邮箱:tdg_yyx@foxmail.com给我~~~

    作者: 叶云轩

    出处:https://www.cnblogs.com/tdg-yyx/p/8376842.html

    本站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明作者及出处

  • 相关阅读:
    赫夫曼树相关算法
    用栈来实现 括号匹配 字符序列检验
    二叉树的建立和遍历
    数据结构-算术表达式求值
    构造一个单链表L,其头结点指针为head,编写程序实现将L逆置
    单链表的基本操作(C语言)数据结构
    java代码打印杨辉三角
    无标题
    写一个方法,判断给定的数字是偶数还是奇数
    关于生物信息学与R的相关资料和网站
  • 原文地址:https://www.cnblogs.com/javalinux/p/14247213.html
Copyright © 2011-2022 走看看