zoukankan      html  css  js  c++  java
  • Netty实现远程调用RPC功能

    添加依赖

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.2.Final</version>
    </dependency>
    
    <dependency>
        <groupId>org.reflections</groupId>
        <artifactId>reflections</artifactId>
        <version>0.9.10</version>
    </dependency>
    

    组织架构

    服务端

    封装类信息

    public class ClassInfo implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String className;  //类名
        private String methodName;//方法名
        private Class<?>[] types; //参数类型
        private Object[] objects;//参数列表
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Class<?>[] getTypes() {
            return types;
        }
    
        public void setTypes(Class<?>[] types) {
            this.types = types;
        }
    
        public Object[] getObjects() {
            return objects;
        }
    
        public void setObjects(Object[] objects) {
            this.objects = objects;
        }
    }
    

    服务端网络处理服务器

    public class NettyRPCServer {
        private int port;
        public NettyRPCServer(int port) {
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .localAddress(port).childHandler(
                                new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel ch) throws Exception {
                                        ChannelPipeline pipeline = ch.pipeline();
                                        //编码器
                                        pipeline.addLast("encoder", new ObjectEncoder());
                                        //解码器
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                        //服务器端业务处理类
                                        pipeline.addLast(new InvokeHandler());
                                    }
                                });
                ChannelFuture future = serverBootstrap.bind(port).sync();
                System.out.println("......server is ready......");
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new NettyRPCServer(9999).start();
        }
    }
    

    服务器端业务处理类

    public class InvokeHandler extends ChannelInboundHandlerAdapter {
        //得到某接口下某个实现类的名字
        private String getImplClassName(ClassInfo classInfo) throws Exception{
            //服务方接口和实现类所在的包路径
            String interfacePath="com.lyz.server";
            int lastDot = classInfo.getClassName().lastIndexOf(".");
            String interfaceName=classInfo.getClassName().substring(lastDot);
            Class superClass=Class.forName(interfacePath+interfaceName);
            Reflections reflections = new Reflections(interfacePath);
            //得到某接口下的所有实现类
            Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
            if(ImplClassSet.size()==0){
                System.out.println("未找到实现类");
                return null;
            }else if(ImplClassSet.size()>1){
                System.out.println("找到多个实现类,未明确使用哪一个");
                return null;
            }else {
                //把集合转换为数组
                Class[] classes=ImplClassSet.toArray(new Class[0]);
                return classes[0].getName(); //得到实现类的名字
            }
        }
    
        @Override  //读取客户端发来的数据并通过反射调用实现类的方法
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ClassInfo classInfo = (ClassInfo) msg;
            System.out.println(classInfo);
            Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
            Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
            //通过反射调用实现类的方法
            Object result = method.invoke(clazz, classInfo.getObjects());
            ctx.writeAndFlush(result);
        }
    }
    

    服务端接口及实现类

    // 无参接口
    public interface HelloNetty {
        String hello();
    }
    
    // 实现类
    public class HelloNettyImpl implements HelloNetty {
        @Override
        public String hello() {
            return "hello,netty";
        }
    }
    
    // 带参接口
    public interface HelloRPC {
        String hello(String name);
    }
    
    // 实现类
    public class HelloRPCImpl implements HelloRPC {
        @Override
        public String hello(String name) {
            return "hello," + name;
        }
    }
    

    客户端

    代理类

    public class NettyRPCProxy {
        //根据接口创建代理对象
        public static Object create(Class target) {
            return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args)
                        throws Throwable {
                    //封装ClassInfo
                    ClassInfo classInfo = new ClassInfo();
                    classInfo.setClassName(target.getName());
                    classInfo.setMethodName(method.getName());
                    classInfo.setObjects(args);
                    classInfo.setTypes(method.getParameterTypes());
    
                    //开始用Netty发送数据
                    EventLoopGroup group = new NioEventLoopGroup();
                    ResultHandler resultHandler = new ResultHandler();
                    try {
                        Bootstrap b = new Bootstrap();
                        b.group(group)
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel ch) throws Exception {
                                        ChannelPipeline pipeline = ch.pipeline();
                                        //编码器
                                        pipeline.addLast("encoder", new ObjectEncoder());
                                        //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                        //客户端业务处理类
                                        pipeline.addLast("handler", resultHandler);
                                    }
                                });
                        ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
                        future.channel().writeAndFlush(classInfo).sync();
                        future.channel().closeFuture().sync();
                    } finally {
                        group.shutdownGracefully();
                    }
                    return resultHandler.getResponse();
                }
            });
        }
    }
    

    客户端业务处理类

    public class ResultHandler extends ChannelInboundHandlerAdapter {
    
        private Object response;
        public Object getResponse() {
            return response;
        }
    
        @Override //读取服务器端返回的数据(远程调用的结果)
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
            ctx.close();
        }
    }
    

    客户端接口

    // 无参接口
    public interface HelloNetty {
        String hello();
    }
    
    // 带参接口
    public interface HelloRPC {
        String hello(String name);
    }

    测试类 服务调用方

    public class TestNettyRPC {
        public static void main(String [] args){
    
            //第1次远程调用
            HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);
            System.out.println(helloNetty.hello());
    
            //第2次远程调用
            HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
            System.out.println(helloRPC.hello("RPC"));
    
        }
    }
    

    输出结果

    服务端

    ......server is ready......
    com.lyz.serverStub.ClassInfo@2b894733
    com.lyz.serverStub.ClassInfo@167bfa9

    客户端

    hello,netty
    hello,RPC
    

    下一篇通过netty实现线上聊天功能


  • 相关阅读:
    txt文件按行处理工具类(可以截取小说、分析日志等)【我】
    java正则表达式取出匹配字符串
    svn客户端更改用户名
    Spring集成MyBatis完整示例
    redis的list取出数据方式速度测试
    处理大量数据的性能优化一例【我】
    unity3d结合轮廓显示,实现完整的框选目标(附Demo代码)
    Unity3dPro 和免费版的区别
    unity3d 射线扫描 忽略图层
    Unity3D角色攻击范围判定和攻击判定
  • 原文地址:https://www.cnblogs.com/lyze/p/11803073.html
Copyright © 2011-2022 走看看