zoukankan      html  css  js  c++  java
  • 【Netty】Netty实现简单RPC

    Netty实现RPC

    RPC(Remote Procedure Call)远程过程调用,一种计算机通信协议

    • 即:一台计算机的程序调用另一台计算机的子程序,并且不需要对这个交互,进行额外的编程;

    RPC机制

    RPC调用者要调用远程API,首先调用RPCProxy代理,再通过RCPInvoker调用者,打开RCPConnector连接;

    这里的RPCChannel就是Netty的通信方式(SocketChannel);

    RPC的协议,我们进行自定义,可以通过字符串来定义一个协议头,代表,要调用的目标方法服务;

    常用RPC框架

    阿里Dubbo、Apache的thrift、Spring的Spring Cloud、google的gRPC

    实现流程

    1. 创建接口,定义抽象方法,用于约定消费者和提供者;
    2. 创建提供者,需要监听消费者请求,并按照约定返回数据;
    3. 创建消费者,该类需要透明的调用自己不存在的方法,使用Netty来发送请求;

    公用接口

    RPC的两端,都要有此接口;

    客户端目的就是要调用服务端的实现;所以客户端不实现此接口;

    server端实现此接口;

    public interface HelloService {
        String hello(String msg);
    }
    

    RPC Server

    1. 在server端,实现接口,以供远程调用
    // 多次调用,同一个实例,会创建多个实例,而非同一个!
    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String msg) {
            if (null != msg) {
                return "RPC returns message :" + msg;
            } else {
                return "message is null";
            }
        }
    }
    
    1. Server端通过Netty实现
    public class NettyServer {
        //netty server的启动和初始化
        public static void startServer(String hostName, int port) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
                ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
                System.out.println("-------- Netty server starts --------");
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    1. Server端自定义Handler

      主要是读取客户端的消息;

      通过消息,获取规定好的协议头,来判断,具体调用哪个方法;

      这里的协议头为:HelloService#hello#,后面跟上方法参数;

      拿到协议头后,截取出方法参数,调用对应方法,拿到返回结果;

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        // read:读取客户端消息,并调用服务
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("client msg : " + msg);
            /**
             * 规定RPC调用协议:
             * 要想调用某个服务,在这里自定义协议头:必须以字符串"HelloService#hello#"开头(接口#方法)
             */
            if (msg.toString().startsWith("HelloService#hello#")) {
                String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
        }
    
        // exception
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    1. 最后是Server端的启动类:启动Netty Server
    public class RPCServerBootstrap {
        public static void main(String[] args) {
            NettyServer.startServer("localhost", 8899);
        }
    }
    

    RPC Client

    • 客户端是通过一个线程调用RPC的;也就是call方法,发送RPC请求给Server;

    • 请求发送之后,消息并不是立刻返回的,手动将线程阻塞;

    • Server端将RPC调用结果,返回给Client,是首先进入Handler的channelRead方法中;

      在此方法中,拿到msg数据,并执行notify,唤醒等待的线程;

    • 阻塞的线程,被唤醒,并拿到数据,返回RPC调用结果;

    执行流程:

    1. call线程发送RPC请求,以及参数,发送之后,阻塞等待被唤醒;
    2. Server端根据参数,确定要调用的方法,以及参数,执行方法,返回结果
    3. client的Handler的channelRead方法,读取到返回的结果;
    4. 唤醒call线程,拿到RPC结果,返回;
    
    1. 客户端这边,我们先完成自定义的ClientHandler

      注意:call方法与channelRead必须一起同步,否则无法唤醒线程;

    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
        private final Lock lock = new ReentrantLock();
        private final Condition condition = lock.newCondition();
        // ctx上下文,需要在call方法中使用ctx
        private ChannelHandlerContext context;
        // RPC返回结果
        private String result;
        // client调用RPC传入参数:即方法参数
        private String para;
        void setPara(String para) {
            this.para = para;
        }
        /**
         * call方法发送请求数据,并阻塞,等待被唤醒;
         * 发送RPC请求---> wait等待唤醒--->  channelRead notify唤醒 --> 获取RPC结果
         */
        @Override
        public Object call() throws Exception {
            lock.lock();
            try {
                System.out.println("------ NettyClientHandler.call is called ---------");
                // 发送RPC请求参数
                context.writeAndFlush(para);
                // 等待结果
                condition.await();
                // 唤醒之后,返回结果
                System.out.println("----- NettyClientHandler.call is notified ------");
                return result;
            } finally {
                lock.unlock();
            }
        }
        // 与server创建连接时,调用 channelActive
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("--------- channelActive is called ---------");
            context = ctx;
        }
        // 收到server数据后,必须与call方法同步!
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            lock.lock();
            try {
                // RPC返回结果
                result = msg.toString();
                // 唤醒等待的线程 call方法
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    1. Netty客户端的初始化代码

      这里通过提交线程池的方法启动RPC请求的发送;

      通过代理的方式,调用并返回结果;

    public class NettyClient {
        // 创建一个线程池 线程数 = CPU处理器数量
        private static int nThread = Runtime.getRuntime().availableProcessors();
        private static ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(nThread, nThread, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        private static NettyClientHandler clientHandler;
        /**
         * 使用代理模式
         * 反射获取一个代理对象
         */
        public Object getBean(final Class<?> serivceClass, final String providerName) {
    
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[]{serivceClass}, (proxy, method, args) -> {
                        System.out.println("------------ Proxy ------------");
                        if (clientHandler == null) {
                            // 初始化
                            initClient();
                        }
                        //设置要发给服务器端的信息
                        //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                        clientHandler.setPara(providerName + args[0]);
    					// 提交线程到线程池,返回结果
                        return threadPoolExecutor.submit(clientHandler).get();
                    });
        }
        // 初始化客户端
        public static void initClient() {
            // 必须提前实例化!
            clientHandler = new NettyClientHandler();
            try {
                NioEventLoopGroup group = new NioEventLoopGroup();
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(clientHandler);
                            }
                        });
                bootstrap.connect("localhost", 8899).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    1. 最后是客户端启动类
    public class RPCClientBootstrap {
        // 相当于协议头,代表要调用哪个方法
        public static final String PROVIDER_NAME = "HelloService#hello#";
    
        public static void main(String[] args) {
            // 创建client
            NettyClient client = new NettyClient();
            // 获取proxy代理
            HelloService helloServiceProxy = (HelloService) client.getBean(HelloService.class, PROVIDER_NAME);
            // 代理对象调用PRC获取结果
            String res = helloServiceProxy.hello("RPC Request");
            System.out.println("RPC result: " + res);
        }
    }
    
  • 相关阅读:
    了解linux web的监听工具
    Ubuntu 16.04 安装docker-ce,docker-compose
    php 连接mysql 主机 localhost,显示 No such file or directory
    vagrant ssh try
    解决 WordPress“正在执行例行维护,请一分钟后回来”
    wordpress Warning: Parameter 2 to qtranxf_postsFilter() expected to be a reference
    ubuntu phpize 安裝
    ubuntu 16.04 pecl 不能安裝 mcrypt
    wp api jwt 403 (Forbidden) -- JWT is not configurated properly, please contact the admin
    docker gitlab backup
  • 原文地址:https://www.cnblogs.com/mussessein/p/12627453.html
Copyright © 2011-2022 走看看