zoukankan      html  css  js  c++  java
  • Netty 实现类似Dubbo的RPC

      之前了解到dubbo 的底层是基于Netty,在学习了Netty 之后简单的模拟一个RPC。

      模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的字符串

     1. HelloService 公共接口

    package netty.rpc.publicinterface;
    
    /**
     * 公共接口
     */
    public interface HelloService {
    
        String hello(String name);
    }

    2. HelloServiceImpl 服务实现类

    package netty.rpc.provider;
    
    import netty.rpc.publicinterface.HelloService;
    
    /**
     * 服务提供者实现类
     */
    public class HelloServiceImpl implements HelloService {
    
        /**
         * 记录调用次数
         */
        private int count;
    
        @Override
        public String hello(String name) {
            count++;
            System.out.println("netty.rpc.server.HelloServiceImpl.hello, name: " + name + "; count: " + count);
            return " this is " + name + "; count: " + count;
        }
    }

    3.Netty 相关类

    NettyServerHandler:

    package netty.rpc.netty;
    
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import netty.rpc.provider.HelloServiceImpl;
    import netty.rpc.publicinterface.HelloService;
    import org.apache.commons.lang.StringUtils;
    
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final String PROTO = "helloservice://";
        private HelloService service = new HelloServiceImpl();
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 接收客户端的消息,并调用服务
            System.out.println("msg: " + msg);
    
            // 以约定的协议开头,就去掉协议之后调用serviceImpl 进行服务调用
            if (msg.toString().startsWith(PROTO)) {
                String param = StringUtils.substring(msg.toString(), PROTO.length());
                String result = service.hello(param);
                // 回显给客户端
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    NettyServer

    package netty.rpc.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class NettyServer {
    
        public static void start(String hostName, int port) {
            startService(hostName, port);
        }
    
        public static void startService(String hostname, int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(8);
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        // 自己的处理器
                        pipeline.addLast(new NettyServerHandler());
                    }
                });
                ChannelFuture sync = serverBootstrap.bind(hostname, port).sync();
                sync.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            System.out.println("服务器启动成功, hostname: " + hostname + ", port: " + port);
                        }
                    }
                });
                sync.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

    NettyClientHandler

    package netty.rpc.netty;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.concurrent.Callable;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext channelHandlerContext;
    
        private String result;
    
        private String param;
    
        // 该方法第一次被调用(1)
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channelHandlerContext = ctx;
        }
    
        // 4
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            notify();
        }
    
        // 3 - wait - 5
        @Override
        public synchronized Object call() throws Exception {
            channelHandlerContext.writeAndFlush(param);
            wait();
            return result;
        }
    
        // 2
        public void setParam(String param) {
            this.param = param;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }

    NettyClient

    package netty.rpc.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.lang.reflect.Proxy;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class NettyClient {
    
        // 执行器
        private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private NettyClientHandler nettyClientHandler;
    
        // 使用代理模式获取对象(JDK 动态代理)= 执行过程实际是走Netty 调用服务
        public Object getBean(final Class serviceClazz, String proto) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClazz}, (proxy, method, param) -> {
                if (nettyClientHandler == null) {
                    init();
                }
    
                // 设置参数为协议 + 参数 (服务器端会根据协议来解析到对应的参数然后来进行服务调用)
                nettyClientHandler.setParam(proto + param[0]);
                return executorService.submit(nettyClientHandler).get();
            });
        }
    
        private void init() {
            nettyClientHandler = new NettyClientHandler();
            EventLoopGroup workerGrop = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGrop).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(nettyClientHandler);
                        }
                    });
    
            try {
                bootstrap.connect("127.0.0.1", 6666).sync();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    4. CustomerBootstrap 服务消费者

    package netty.rpc.customer;
    
    import netty.rpc.netty.NettyClient;
    import netty.rpc.publicinterface.HelloService;
    
    public class CustomerBootstrap {
    
        private static final String PROTO = "helloservice://";
    
        public static void main(String[] args) {
            // 创建一个消费者
            NettyClient nettyClient = new NettyClient();
            // 创建代理对象
            HelloService bean = (HelloService) nettyClient.getBean(HelloService.class, PROTO);
            for (int i = 0; i < 20; i++) {
                String hello = bean.hello("123456");
                System.out.println(hello);
            }
        }
    }

     5. ServerBootstrap

    package netty.rpc.provider;
    
    import netty.rpc.netty.NettyServer;
    
    /**
     * 启动一个服务的提供者,NettyServer
     */
    public class ServerBootstrap {
    
        public static void main(String[] args) {
            NettyServer.start("127.0.0.1", 6666);
        }
    }

      可以看到上面的核心逻辑是,启动一个NettyServer、一个NettyClient。 NettyServer 用于启动Server服务器接收客户端发送的数据,NettyClient 实际也是一个任务,通过JDK的动态代理,在调用方法的时候,实际是用NettyClient 去向服务器端发送数据。 服务器端会根据接收到的数据进行判断,是否满足约定的协议,如果满足调用ServiceImpl 获取结果,然后输出给客户端。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    字符串最大最小表示法模板 ( 字典序最大最小 )
    Manacher模板( 线性求最长回文子串 )
    2017南宁网络赛 Problem J Minimum Distance in a Star Graph ( 模拟 )
    字符串截取模板 && POJ 3450、3080 ( 暴力枚举子串 && KMP匹配 )
    HDU 6153 A Secret ( KMP&&DP || 拓展KMP )
    51Nod 1277 字符串中的最大值 ( KMP && DP )
    HDU 4300 Clairewd's message ( 拓展KMP )
    拓展KMP以及模板
    KMP解决字符串最小循环节相关问题
    序列终结者 BZOJ 1251 Splay
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14644267.html
Copyright © 2011-2022 走看看