zoukankan      html  css  js  c++  java
  • netty 详解(八)基于 Netty 模拟实现 RPC

     1、RPC 基本介绍

      RPC(Remote Procedure Call) 远程过程调用,是一个计算机通信协议,该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序无需额外地为这个交互作用编程。

      两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。

      RPC 调用流程

      RPC调用流程说明:

      1)服务消费方(client)以本地调用方式调用服务;

      2)client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

      3)client stub 将消息进行编码并发送到服务端

      4)server stub 收到消息后进行解码

      5)sever stub 根据解码结果调用本地的服务

      6)本地服务执行并将结果返回给 server stub

      7)server stub 将返回导入结果进行编码并发送至消费方

      8)client  stub 接收消息并进行解码

      9)服务消费方(client)得到结果

      小结:RPC 的目标即使将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

    2、自己实现 dubbo RPC(基于Netty)

      dubbo 底层使用了 Netty 作为网络通信框架,要求用 Netty 实现一个简单的 RPC 框架

      模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据,底层网络通信使用 Nett有 4.x。

      设计说明:

    • 创建一个接口,定义抽象方法,用于消费者和提供者之间的约定
    • 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
    • 创建一个消费者,该类需要透明地调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

       共用接口 HelloService

    package com.oy.dubbo.api;
    
    public interface HelloService {
        String hello(String message);
    }
    View Code

      服务提供方提供的接口 HelloServiceImpl

    package com.oy.dubbo.provider;
    
    import com.oy.dubbo.api.HelloService;
    
    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String message) {
            System.out.println("服务提供方 HelloServiceImpl#Hello() 被调用");
            // 根据 msg 返回结果
            if (message != null) {
                return "你好,客户端,我已经收到你的消息。你发送的消息是: " + message;
            } else {
                return "你好,客户端,我已经收到你的消息。";
            }
        }
    }
    View Code

      

      NettyServer 和对应的 ChannelHandler

    package com.oy.dubbo.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class NettyServer {
    
        public static void startServer(String host, int port) {
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup work = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .group(boss, work)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
                System.out.println("服务端开启服务,端口: " + port);
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
    
    
        }
    }
    
    
    package com.oy.dubbo.netty;
    
    import com.oy.dubbo.provider.HelloServiceImpl;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 获取客户端发送的消息,并调用服务
            System.out.println("收到消费方信息,msg: " + msg);
            // 客户端在调用服务的 api,需要定义一个协议
            // 每次发消息都必须以某个字符串开头 "HelloService#Hello#你好"
            String s = msg.toString();
            if (s.startsWith("HelloService#Hello")) {
                String result = new HelloServiceImpl().hello(s.substring(s.lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("NettyServerHandler exception: " + cause.getMessage());
            ctx.close();
        }
    }

      服务提供方的服务启动类 ServerBootstrap

    package com.oy.dubbo.provider;
    
    import com.oy.dubbo.netty.NettyServer;
    
    public class ServerBootstrap {
        public static void main(String[] args) {
            NettyServer.startServer("127.0.0.1", 10086);
        }
    }
    View Code

      

      客户端程序:

      NettyClient

    package com.oy.dubbo.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.lang.reflect.Proxy;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class NettyClient {
        // 创建线程池
        private static ExecutorService excutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private static NettyClientHandler client;
    
        // 使用代理模式,获取一个代理对象
        public Object getBean(final Class<?> serviceClass, final String provider) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[]{serviceClass},
                    (proxy, method, args) -> {
                        if (client == null) {
                            System.out.println("client == null");
                            initClient();
                        }
                        // 设置要发给服务器的消息
                        System.out.println("client != null");
                        client.setPara(provider + args[0]);
                        return excutor.submit(client).get();
                    });
        }
    
        // 初始化客户端
        private static void initClient() {
            client = new NettyClientHandler();
    
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(client);
                        }
                    });
    
    
            try {
                ChannelFuture future = bootstrap.connect("127.0.0.1", 10086).sync();
                //future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    //        future.addListener(new ChannelFutureListener() {
    //            public void operationComplete(ChannelFuture channelFuture) throws Exception {
    //                if (future.isDone()) {
    //                    System.out.println("连接服务器端口 10086 成功");
    //                } else {
    //                    System.out.println("连接服务器端口 10086 失败");
    //                }
    //            }
    //        });
    
    //        future.channel().closeFuture().sync();
        }
    }

      NettyClientHandler

    package com.oy.dubbo.netty;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.concurrent.Callable;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext context;
        private String result;// 返回的结果
        private String para;// 客户端调用方法时,传入的参数
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            this.context = ctx; // 其他方法中要用到 ChannelHandlerContext
        }
    
        /**
         * 收到服务器返回后调用该方法
         */
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            System.out.println("NettyClientHandler#channelRead() 被调用, result=" + result);
            notify(); // 唤醒等待的线程
        }
    
        /**
         * 被代理对象调用,发送数据给服务器,wait,等待被 channelRead 唤醒
         */
        @Override
        public synchronized Object call() throws Exception {
            context.writeAndFlush(para);
            wait(); // 等待 channelRead 方法获取服务器返回结果后唤醒
            return result;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        public void setPara(String para) {
            this.para = para;
        }
    }

      ClientBootstrap

    package com.oy.dubbo.customer;
    
    import com.oy.dubbo.api.HelloService;
    import com.oy.dubbo.netty.NettyClient;
    
    public class ClientBootstrap {
        // 定义协议头
        public static final String  providerName = "HelloService#Hello#";
    
        public static void main(String[] args) {
            // 创建一个消费者
            NettyClient customer = new NettyClient();
            System.out.println("1");
            // 创建代理对象
            HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
            System.out.println("2");
            // 通过代理对象调用服务器提供者的方法(服务)
            String result = service.hello("你好 dubbo");
            System.out.println("3");
            System.out.println("消费者调用提供者方法,返回的结果:result=" + result);
        

     ---

  • 相关阅读:
    FFT学习笔记
    FWT(Fast Walsh Transformation)快速沃尔什变换学习笔记
    GMS2游戏开发学习历程
    [BZOJ3238][AHOI2013]差异 [后缀数组+单调栈]
    Trie树简单讲解
    自己的题
    小技巧
    编程注意事项
    构造方法
    递归
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/12829211.html
Copyright © 2011-2022 走看看