zoukankan      html  css  js  c++  java
  • Netty+Proxy实现RPC

    Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。

    直接上代码,项目类结果图

    Maven包文件:

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.15.Final</version>
            </dependency>
    
            <dependency>
                <groupId>org.reflections</groupId>
                <artifactId>reflections</artifactId>
                <version>0.9.10</version>
            </dependency>

     客户端类代码:

    package netty.rcp.client;
    
    import netty.rcp.clientStub.NettyRPCProxy;
    import netty.rcp.server.HelloNetty;
    import netty.rcp.server.HelloRPC;
    
    /**
     *
     */
    public class TestNettyRPCClient {
        public static void main(String[] args) {
            HelloNetty helloNetty = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
            System.out.println(helloNetty.hello());
    
            HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
            System.out.println(helloRPC.hello("RPC"));
        }
    }
    package netty.rcp.clientStub;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import netty.rcp.common.ClassInfo;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    /**
     *
     */
    public class NettyRPCProxy {
        public static Object create(Class target) {
            //代理方式实现执行接口调用远程
            return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    ClassInfo classInfo = new ClassInfo();
                    classInfo.setClassName(target.getName());
                    classInfo.setMethodName(method.getName());
                    classInfo.setObjects(args);
                    classInfo.setTypes(method.getParameterTypes());
    
                    EventLoopGroup group = new NioEventLoopGroup();
                    ResultHandler resultHandler = new ResultHandler();
                    try {
                        Bootstrap b = new Bootstrap();
                        b.group(group)
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                                        ChannelPipeline pipeline = socketChannel.pipeline();
                                        pipeline.addLast("encoder", new ObjectEncoder());
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                        pipeline.addLast("handler", resultHandler);
                                    }
                                });
    
                        ChannelFuture future = b.connect("127.0.0.1", 8090).sync();
                        future.channel().writeAndFlush(classInfo).sync();
                        future.channel().closeFuture().sync();
                    } finally {
                        group.shutdownGracefully();
                    }
                    return resultHandler.getResponse();
                }
            });
        }
    }

      

    package netty.rcp.clientStub;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    //客户端业务处理类
    public class ResultHandler extends ChannelInboundHandlerAdapter {
    
        private Object response;
        public Object getResponse() {
            return response;
        }
    
        @Override //读取服务器端返回的数据(远程调用的结果)
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
            ctx.close();
        }
    }

    服务端类

    package netty.rcp.serverStub;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    /**
     *
     */
    public class NettyRPCServer {
        private int port;
    
        public NettyRPCServer(int port) {
            this.port = port;
        }
    
        public void start(){
            //接收连接事件,线程池
            EventLoopGroup bossGroup=new NioEventLoopGroup();
            //处理接收数据事件,线程池
            EventLoopGroup workerGroup=new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        //服务器端通道实现方式
                        .channel(NioServerSocketChannel.class)
                        //线程队列中等待链接个数
                        .option(ChannelOption.SO_BACKLOG,128)
                        //保存活动连接状态
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        //创建通道初始化对象
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //得到流水线,事件通道
                                ChannelPipeline pipeline=socketChannel.pipeline();
                                //流水线添加编码方式
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //流水线添加解码方式
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                //添加自定义handler出来事件
                                pipeline.addLast(new InvokeHandler());
                            }
                        });
                //绑定链接 ChannelFuture为异步事件处理,影响后续业务
                ChannelFuture future=serverBootstrap.bind(port).sync();
                System.out.println("server is ready");
                //future.channel().closeFuture()会阻塞,等待回推的close事件触发继续执行
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //关闭连接线程组
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new NettyRPCServer(8090).start();
        }
    }
    package netty.rcp.serverStub;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import netty.rcp.common.ClassInfo;
    import org.reflections.Reflections;
    
    import java.lang.reflect.Method;
    import java.sql.SQLOutput;
    import java.util.Set;
    
    /**
     *
     */
    public class InvokeHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //通过接口类名、方法名、参数查找到对应的接口的实现类,并且执行
            ClassInfo classInfo = (ClassInfo) msg;
            Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
            Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
            Object result = method.invoke(clazz, classInfo.getObjects());
            //将执行结果返回
            ctx.writeAndFlush(result);
        }
    
        private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {
            String interfacePath="netty.rcp.server";
            int lastIndexOf = classInfo.getClassName().lastIndexOf(".");
            String interfaceName = classInfo.getClassName().substring(lastIndexOf);
            //加载接口类
            Class superClass = Class.forName(interfacePath.concat(interfaceName));
            //加载包下面的所有类
            Reflections reflections = new Reflections(interfacePath);
            //得到某接口下的所有实现类
            Set<Class> ImplClassSet = reflections.getSubTypesOf(superClass);
            if (ImplClassSet.size()==0){
                System.out.println("未找到实现类");
                return null;
            }else if (ImplClassSet.size()>1){
                System.out.println("找到多个实现类,未明确使用哪一个");
                return null;
            }else {
                Class[] classes = ImplClassSet.toArray(new Class[0]);
                return classes[0].getName();
            }
        }
    
    }

    公共类

    package netty.rcp.common;
    
    import java.io.Serializable;
    
    /**
     *
     */
    public class ClassInfo implements Serializable {
        private static final long serialVersionUID = 1L;
    
        private String className;  //类名
        private String methodName;//方法名
        private Class<?>[] types; //参数类型
        private Object[] objects;//参数列表
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Class<?>[] getTypes() {
            return types;
        }
    
        public void setTypes(Class<?>[] types) {
            this.types = types;
        }
    
        public Object[] getObjects() {
            return objects;
        }
    
        public void setObjects(Object[] objects) {
            this.objects = objects;
        }
    }
    package netty.rcp.server;
    
    public interface HelloNetty {
        String hello();
    }
    package netty.rcp.server;
    
    public class HelloNettyImpl implements HelloNetty {
        @Override
        public String hello() {
            return "hello,netty";
        }
    }
    package netty.rcp.server;
    
    public interface HelloRPC {
        String hello(String name);
    }
    package netty.rcp.server;
    
    public class HelloRPCImpl implements HelloRPC {
        @Override
        public String hello(String name) {
            return "hello," + name;
        }
    }
  • 相关阅读:
    路径
    JSTL-3
    JSTL-2
    JSTL-1
    EL和JSTL的关系
    Mybatis控制台打印SQL语句的两种方式
    centOS7安装JDK
    centOS7下安装GUI图形界面
    centOS7配置IP地址
    Office2016专业增强版永久激活
  • 原文地址:https://www.cnblogs.com/zhuyapeng/p/14449820.html
Copyright © 2011-2022 走看看