zoukankan      html  css  js  c++  java
  • 八、Netty实现简单RPC调用

    1、RPC基本介绍

    • RPC(Remote Procedure Call,远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
    • 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)
      在这里插入图片描述
    • 常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

    RPC调用流程:

    在这里插入图片描述

    在RPC 中, Client 叫服务消费者,Server 叫服务提供者

    PRC调用流程说明

    1. 服务消费方(client)以本地调用方式调用服务
    2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
    3. client stub 将消息进行编码并发送到服务端
    4. server stub 收到消息后进行解码
    5. server stub 根据解码结果调用本地的服务
    6. 本地服务执行并将结果返回给 server stub
    7. server stub 将返回导入结果进行编码并发送至消费方
    8. client stub 接收到消息并进行解码
    9. 服务消费方(client)得到结果

    小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

    2、代码示例

    需求说明

    1. dubbo底层使用了Netty 作为网络通讯框架,要求用Netty实现一个简单的RPC框
    2. 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供
      者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.1.20

    设计说明

    1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。2)创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
    2. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty 请求
      提供者返回数据

    在这里插入图片描述

    服务接口:

    public interface HelloService {
        String hello(String message);
    }
    

    服务端服务接口实现类:

    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String message) {
            System.out.println("收到客户端消息=" + message);
            //根据 message 返回不同的结果
            if(message != null) {
                return "你好客户端,我已经收到你的消息【" + message + "】";
            } else {
                return "你好客户端,我已经收到你的消息。";
            }
        }
    }
    

    服务端初始化Netty:

    public class NettyServer {
    
        public static  void startServer(String hostName, int port) {
            startServer0(hostName, port);
        }
    
        private static void startServer0(String hostname, int port) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new NettyServerHandler()); //业务处理器
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(hostname,port).sync();
                System.out.println("服务提供方开始运行");
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                 bossGroup.shutdownGracefully();
                 workerGroup.shutdownGracefully();
            }
        }
    }
    

    服务端Handler:

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //获取客户端发送的消息,并调用服务
            System.out.println("msg=" + msg);
            //客户端在调用服务器的api 时,我们需要定义一个协议
            //比如要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#你好"
            if (msg.toString().startsWith("HelloService#hello#")) {
                String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    Netty客户端

    public class NettyClient {
        //创建一个线程池
        private static ExecutorService executor= Executors.newFixedThreadPool(5);
    
        private static NettyClientHandler client;
    
        //编写方法,使用代理模式,获取一个代理对象
        public Object getBean(final Class<?> serviceClass, final String providerName) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass},
                    (proxy, method, args) -> {
                        System.out.println("代理被调用");
                        if (client == null)
                            initClient();
                        //设置要发给服务器端的信息
                        client.setPara(providerName + args[0]);
                        return executor.submit(client).get();
                    });
        }
    
    
        //初始化客户端
        private static void initClient() {
            client = new NettyClientHandler();
            //创建EventLoopGroup
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    客户端Handler

    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext context; //上下文
        private String result; //返回的结果
        private String para; //客户端调用方法时,传入的参数
    
        //与服务端创建连接后调用
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("通道连接成功");
            context = ctx; //因为我们在其他方法会使用到 ctx
        }
    
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            notify(); //唤醒等待的线程
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        //被代理对象的调用,真正发送数据给服务器,发送完后就阻塞,等待被唤醒(channelRead)
        @Override
        public synchronized Object call() throws Exception {
            System.out.println("线程被调用-----");
            context.writeAndFlush(para);
            //进行wait
            wait(); //等待 channelRead 获取到服务器的结果后,进行唤醒。
            return result; //服务方返回的结果
        }
    
        public void setPara(String para){
            this.para = para;
        }
    }
    

    服务端启动:

    public class ServerBootstrap {
        public static void main(String[] args) {
            NettyServer.startServer("127.0.0.1", 7000);
        }
    }
    

    客户端启动并调用远程方法:

    public class ClientBootStrap {
    
        //这里定义协议头
        public static final String providerName = "HelloService#hello#";
    
        public static void main(String[] args) throws InterruptedException {
            //创建一个消费者
            NettyClient customer = new NettyClient();
            //创建代理对象
            HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
    
            //通过代理对象调用服务提供者的方法
            String res = service.hello("你好 Dubbo");
            System.out.println("调用的结果,res = " + res);
            Thread.sleep(2000);
        }
    }
    
  • 相关阅读:
    记录一些css奇淫技巧
    git的一些常用基础命令
    # 实现二维表格行头和列头固定的解决方案
    拿来-util工具函数
    mpvue开发小程序项目遇到的问题
    mac设置终端命令行别名alias(git、npm)
    简单配置nginx反向代理,实现跨域请求
    以前的阅读计划搁浅了,这里是分割线
    源码阅读心得11-13
    源码阅读心得1-10
  • 原文地址:https://www.cnblogs.com/idcode/p/14551394.html
Copyright © 2011-2022 走看看