zoukankan      html  css  js  c++  java
  • Netty+Proxy实现RPC

    Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。

    直接上代码,项目类结果图

    Maven包文件:

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.15.Final</version>
            </dependency>
    
            <dependency>
                <groupId>org.reflections</groupId>
                <artifactId>reflections</artifactId>
                <version>0.9.10</version>
            </dependency>

     客户端类代码:

    package netty.rcp.client;
    
    import netty.rcp.clientStub.NettyRPCProxy;
    import netty.rcp.server.HelloNetty;
    import netty.rcp.server.HelloRPC;
    
    /**
     *
     */
    public class TestNettyRPCClient {
        public static void main(String[] args) {
            HelloNetty helloNetty = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
            System.out.println(helloNetty.hello());
    
            HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
            System.out.println(helloRPC.hello("RPC"));
        }
    }
    package netty.rcp.clientStub;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import netty.rcp.common.ClassInfo;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    /**
     *
     */
    public class NettyRPCProxy {
        public static Object create(Class target) {
            //代理方式实现执行接口调用远程
            return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    ClassInfo classInfo = new ClassInfo();
                    classInfo.setClassName(target.getName());
                    classInfo.setMethodName(method.getName());
                    classInfo.setObjects(args);
                    classInfo.setTypes(method.getParameterTypes());
    
                    EventLoopGroup group = new NioEventLoopGroup();
                    ResultHandler resultHandler = new ResultHandler();
                    try {
                        Bootstrap b = new Bootstrap();
                        b.group(group)
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                                        ChannelPipeline pipeline = socketChannel.pipeline();
                                        pipeline.addLast("encoder", new ObjectEncoder());
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                        pipeline.addLast("handler", resultHandler);
                                    }
                                });
    
                        ChannelFuture future = b.connect("127.0.0.1", 8090).sync();
                        future.channel().writeAndFlush(classInfo).sync();
                        future.channel().closeFuture().sync();
                    } finally {
                        group.shutdownGracefully();
                    }
                    return resultHandler.getResponse();
                }
            });
        }
    }

      

    package netty.rcp.clientStub;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    //客户端业务处理类
    public class ResultHandler extends ChannelInboundHandlerAdapter {
    
        private Object response;
        public Object getResponse() {
            return response;
        }
    
        @Override //读取服务器端返回的数据(远程调用的结果)
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
            ctx.close();
        }
    }

    服务端类

    package netty.rcp.serverStub;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    /**
     *
     */
    public class NettyRPCServer {
        private int port;
    
        public NettyRPCServer(int port) {
            this.port = port;
        }
    
        public void start(){
            //接收连接事件,线程池
            EventLoopGroup bossGroup=new NioEventLoopGroup();
            //处理接收数据事件,线程池
            EventLoopGroup workerGroup=new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        //服务器端通道实现方式
                        .channel(NioServerSocketChannel.class)
                        //线程队列中等待链接个数
                        .option(ChannelOption.SO_BACKLOG,128)
                        //保存活动连接状态
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        //创建通道初始化对象
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //得到流水线,事件通道
                                ChannelPipeline pipeline=socketChannel.pipeline();
                                //流水线添加编码方式
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //流水线添加解码方式
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                //添加自定义handler出来事件
                                pipeline.addLast(new InvokeHandler());
                            }
                        });
                //绑定链接 ChannelFuture为异步事件处理,影响后续业务
                ChannelFuture future=serverBootstrap.bind(port).sync();
                System.out.println("server is ready");
                //future.channel().closeFuture()会阻塞,等待回推的close事件触发继续执行
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //关闭连接线程组
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new NettyRPCServer(8090).start();
        }
    }
    package netty.rcp.serverStub;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import netty.rcp.common.ClassInfo;
    import org.reflections.Reflections;
    
    import java.lang.reflect.Method;
    import java.sql.SQLOutput;
    import java.util.Set;
    
    /**
     *
     */
    public class InvokeHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //通过接口类名、方法名、参数查找到对应的接口的实现类,并且执行
            ClassInfo classInfo = (ClassInfo) msg;
            Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
            Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
            Object result = method.invoke(clazz, classInfo.getObjects());
            //将执行结果返回
            ctx.writeAndFlush(result);
        }
    
        private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {
            String interfacePath="netty.rcp.server";
            int lastIndexOf = classInfo.getClassName().lastIndexOf(".");
            String interfaceName = classInfo.getClassName().substring(lastIndexOf);
            //加载接口类
            Class superClass = Class.forName(interfacePath.concat(interfaceName));
            //加载包下面的所有类
            Reflections reflections = new Reflections(interfacePath);
            //得到某接口下的所有实现类
            Set<Class> ImplClassSet = reflections.getSubTypesOf(superClass);
            if (ImplClassSet.size()==0){
                System.out.println("未找到实现类");
                return null;
            }else if (ImplClassSet.size()>1){
                System.out.println("找到多个实现类,未明确使用哪一个");
                return null;
            }else {
                Class[] classes = ImplClassSet.toArray(new Class[0]);
                return classes[0].getName();
            }
        }
    
    }

    公共类

    package netty.rcp.common;
    
    import java.io.Serializable;
    
    /**
     *
     */
    public class ClassInfo implements Serializable {
        private static final long serialVersionUID = 1L;
    
        private String className;  //类名
        private String methodName;//方法名
        private Class<?>[] types; //参数类型
        private Object[] objects;//参数列表
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Class<?>[] getTypes() {
            return types;
        }
    
        public void setTypes(Class<?>[] types) {
            this.types = types;
        }
    
        public Object[] getObjects() {
            return objects;
        }
    
        public void setObjects(Object[] objects) {
            this.objects = objects;
        }
    }
    package netty.rcp.server;
    
    public interface HelloNetty {
        String hello();
    }
    package netty.rcp.server;
    
    public class HelloNettyImpl implements HelloNetty {
        @Override
        public String hello() {
            return "hello,netty";
        }
    }
    package netty.rcp.server;
    
    public interface HelloRPC {
        String hello(String name);
    }
    package netty.rcp.server;
    
    public class HelloRPCImpl implements HelloRPC {
        @Override
        public String hello(String name) {
            return "hello," + name;
        }
    }
  • 相关阅读:
    AcWing 1135. 新年好 图论 枚举
    uva 10196 将军 模拟
    LeetCode 120. 三角形最小路径和 dp
    LeetCode 350. 两个数组的交集 II 哈希
    LeetCode 174. 地下城游戏 dp
    LeetCode 面试题 16.11.. 跳水板 模拟
    LeetCode 112. 路径总和 递归 树的遍历
    AcWing 1129. 热浪 spfa
    Thymeleaf Javascript 取值
    Thymeleaf Javascript 取值
  • 原文地址:https://www.cnblogs.com/zhuyapeng/p/14449820.html
Copyright © 2011-2022 走看看