zoukankan      html  css  js  c++  java
  • java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

    说明

    上一篇代码基于 socket 的实现非常简单,但是对于实际生产,一般使用 netty。

    至于 netty 的优点可以参考:

    为什么选择 netty?

    http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

    在这里插入图片描述

    代码实现

    maven 引入

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>${netty.version}</version>
    </dependency>
    

    引入 netty 对应的 maven 包,此处为 4.1.17.Final。

    服务端代码实现

    netty 的服务端启动代码是比较固定的。

    package com.github.houbb.rpc.server.core;
    
    import com.github.houbb.log.integration.core.Log;
    import com.github.houbb.log.integration.core.LogFactory;
    import com.github.houbb.rpc.server.constant.RpcServerConst;
    import com.github.houbb.rpc.server.handler.RpcServerHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * rpc 服务端
     * @author binbin.hou
     * @since 0.0.1
     */
    public class RpcServer extends Thread {
    
        private static final Log log = LogFactory.getLog(RpcServer.class);
    
        /**
         * 端口号
         */
        private final int port;
    
        public RpcServer() {
            this.port = RpcServerConst.DEFAULT_PORT;
        }
    
        public RpcServer(int port) {
            this.port = port;
        }
    
        @Override
        public void run() {
            // 启动服务端
            log.info("RPC 服务开始启动服务端");
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(workerGroup, bossGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<Channel>() {
                            @Override
                            protected void initChannel(Channel ch) throws Exception {
                                ch.pipeline().addLast(new RpcServerHandler());
                            }
                        })
                        // 这个参数影响的是还没有被accept 取出的连接
                        .option(ChannelOption.SO_BACKLOG, 128)
                        // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                // 绑定端口,开始接收进来的链接
                ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
                log.info("RPC 服务端启动完成,监听【" + port + "】端口");
    
                channelFuture.channel().closeFuture().syncUninterruptibly();
                log.info("RPC 服务端关闭完成");
            } catch (Exception e) {
                log.error("RPC 服务异常", e);
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
    }
    

    为了简单,服务端启动端口号固定,RpcServerConst 常量类内容如下:

    public final class RpcServerConst {
    
        private RpcServerConst(){}
    
        /**
         * 默认端口
         * @since 0.0.1
         */
        public static final int DEFAULT_PORT = 9627;
    
    }
    

    RpcServerHandler

    当然,还有一个比较核心的类就是 RpcServerHandler

    public class RpcServerHandler extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // do nothing now
        }
    }
    

    目前是空实现,后续可以添加对应的日志输出及逻辑处理。

    测试

    启动测试的代码非常简单:

    /**
     * 服务启动代码测试
     * @param args 参数
     */
    public static void main(String[] args) {
        new RpcServer().start();
    }
    

    说明

    上面我们实现了服务端的实现,这一节来一起看一下 client 客户端代码实现。

    代码实现

    RpcClient

    /*
     * Copyright (c)  2019. houbinbin Inc.
     * rpc All rights reserved.
     */
    
    package com.github.houbb.rpc.client.core;
    
    import com.github.houbb.log.integration.core.Log;
    import com.github.houbb.log.integration.core.LogFactory;
    import com.github.houbb.rpc.client.handler.RpcClientHandler;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * <p> rpc 客户端 </p>
     *
     * <pre> Created: 2019/10/16 11:21 下午  </pre>
     * <pre> Project: rpc  </pre>
     *
     * @author houbinbin
     * @since 0.0.2
     */
    public class RpcClient extends Thread {
    
        private static final Log log = LogFactory.getLog(RpcClient.class);
    
        /**
         * 监听端口号
         */
        private final int port;
    
        public RpcClient(int port) {
            this.port = port;
        }
    
        public RpcClient() {
            this(9527);
        }
    
        @Override
        public void run() {
            // 启动服务端
            log.info("RPC 服务开始启动客户端");
    
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
                ChannelFuture channelFuture = bootstrap.group(workerGroup)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new ChannelInitializer<Channel>(){
                            @Override
                            protected void initChannel(Channel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(new LoggingHandler(LogLevel.INFO))
                                        .addLast(new RpcClientHandler());
                            }
                        })
                        .connect("localhost", port)
                        .syncUninterruptibly();
    
                log.info("RPC 服务启动客户端完成,监听端口:" + port);
                channelFuture.channel().closeFuture().syncUninterruptibly();
                log.info("RPC 服务开始客户端已关闭");
            } catch (Exception e) {
                log.error("RPC 客户端遇到异常", e);
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    
    }
    

    .connect("localhost", port) 声明了客户端需要连接的服务端,此处和服务端的端口保持一致。

    RpcClientHandler

    客户端处理类也比较简单,暂时留空。

    /*
     * Copyright (c)  2019. houbinbin Inc.
     * rpc All rights reserved.
     */
    
    package com.github.houbb.rpc.client.handler;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * <p> 客户端处理类 </p>
     *
     * <pre> Created: 2019/10/16 11:30 下午  </pre>
     * <pre> Project: rpc  </pre>
     *
     * @author houbinbin
     * @since 0.0.2
     */
    public class RpcClientHandler extends SimpleChannelInboundHandler {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // do nothing.
        }
    
    }
    

    启动测试

    服务端

    首先启动服务端。

    客户端

    然后启动客户端连接服务端,实现如下:

    /**
     * 服务启动代码测试
     * @param args 参数
     */
    public static void main(String[] args) {
        new RpcClient().start();
    }
    

    小结

    为了便于大家学习,以上源码已经开源:

    https://github.com/houbb/rpc

    我是老马,期待与你的下次重逢。

  • 相关阅读:
    Linux Home目录硬盘空间缩减
    test
    ORACLE 数据泵 expdp/impdp
    mysql利用mysqlbinlog命令恢复误删除数据
    LogMiner日志挖掘分析管理
    Oracle 审计测试与总结
    redis 5.0.3 讲解、集群搭建
    联想服务器配置 RAID
    Cenots7对lvm逻辑卷分区大小的调整
    kvm 基本运维命令
  • 原文地址:https://www.cnblogs.com/houbbBlogs/p/15383626.html
Copyright © 2011-2022 走看看