zoukankan      html  css  js  c++  java
  • Netty简介及服务器客户端简单开发流程

    什么是Netty

    • Netty是一个基于Java NIO的编写客服端服务器的框架,是一个异步事件框架。
    • 官网https://netty.io/

    为什么选择Netty

    由于JAVA NIO编写服务器的过程过于复杂且不易掌控,所以我们选择Netty框架进行开发。

    • 具有很高的的性能。
    • 且比NIO更容易编码和维护。
    • 实践者众多,Elastic Search,dubbo,Akka,grpc等等
    • 社区很成熟。

    Netty的常用组件

    • EventLoopGroup 相当于Reactor线程组,常用NioEventLoopGroup
    • Channel 连接到网络套接字或能够进行I/O操作(如读、写、连接和绑定)的组件的连接。常用NioServerSocketChannel,NioSocketChannel,SocketChannel
    • Bootstrap/ServerBootstrap 辅助类
    • ChannelPipeline 处理或截取通道的入站事件和出站操作的通道处理程序列表
    • ChannelHandler 处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline中的下一个处理程序。常用ChannelInboundHandlerAdapter和SimpleChannelInboundHandler,编码器,解码器。
    • ChannelFuture 异步操作使用。

    Netty实现一个服务器

    服务器代码:

    /**
     * @author monkjavaer
     * @date 2019/7/18 14:56
     */
    public class NettyServer {
        private static Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
        public static int PORT = 8080;
        public static void connect(){
            //配置两个服务端的NIO线程组,一个用于接收客服端的链接,另一个用于进行SocketChannel的网络读写。
            //NioEventLoopGroup是一个处理I/O操作的多线程事件循环
            //"boss":接收一个传入连接
            EventLoopGroup boss = new NioEventLoopGroup();
            //"worker" : 当boss接收连接并把接收的连接注册给worker,work就开始处理
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                //ServerBootstrap是一个帮助类,可以设置服务器
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss,worker)
                        //NioServerSocketChannel用于实例化新通道来接收传入的连接
                        .channel(NioServerSocketChannel.class)
                        //配置日志
                        .handler(new LoggingHandler(LogLevel.INFO))
                        //ChannelInitializer用于配置新通道
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //通过ChannelPipeline添加处理类ChannelHandler
                                //通常有很多处理类,可以将这个内部类new ChannelInitializer提为一个独立的类
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new NettyServerHandler());
                            }
                        })
                        //ChannelOption和ChannelConfig可以设置各种参数
                        .option(ChannelOption.SO_BACKLOG, 128)
                        //option()用于接受传入连接的NioServerSocketChannel,childOption()用于父ServerChannel接受的通道
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                // Bind and start to accept incoming connections.
                //异步地绑定服务器;调用 sync()方法阻塞等待直到绑定完成
                ChannelFuture f = bootstrap.bind(PORT).sync();
                // Wait until the server socket is closed.
                // In this example, this does not happen, but you can do that to gracefully
                // shut down your server.
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            NettyServer.connect();
        }
    }
    
    • 首先创建两个NioEventLoopGroup线程组,一个用于接收客服端的链接,另一个用于进行SocketChannel的网络读写。
    • 创建一个服务器帮助类ServerBootstrap,将boss,worker两个线程组加入
    • 通过ServerBootstrap流式设置服务器
    • 设置通道为NioServerSocketChannel
    • 通过ChannelInitializer设置自定义处理器NettyServerHandler,并将他加入ChannelPipeline,这里用内部类简易实现,真实线上环境我们应该提取为相应的类。
    • 通过option和childOption设置TCP相关参数。
    • 异步地绑定服务器;调用 sync()方法阻塞等待直到绑定完成
    • 最后关闭相关资源

    服务器处理类:

    客户端的处理类和服务器类似。

    /**
     * ChannelHandler.Sharable 标注一个channel handler可以被多个channel安全地共享
     * ChannelInboundHandlerAdapter实现了ChannelInboundHandler
     * 回调事件处理类
     *
     * @author monkjavaer
     * @date 2019/7/18 15:36
     */
    @ChannelHandler.Sharable
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
    
        /**
         * 新的连接被建立时调用
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("client {} connected.", ctx.channel().remoteAddress());
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello client!", CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //获取缓冲区可读字节数
            int readableBytes = byteBuf.readableBytes();
            byte[] bytes = new byte[readableBytes];
            byteBuf.readBytes(bytes);
            LOGGER.info("readableBytes is{},server received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
    //        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
    //                .addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("server exceptionCaught,{}",cause.getMessage());
            ctx.close();
        }
    }
    
    • 自定义处理类继承自ChannelInboundHandlerAdapter
    • 重写我们需要的方法,channelActive(新的连接被建立时调用),channelRead(读取数据),channelReadComplete(读取最后的一条信息),exceptionCaught(发生异常时调用)

    Netty实现一个客户端

    客户端

    /**
     * @author monkjavaer
     * @date 2019/7/18 17:17
     */
    public class NettyClient {
        private static Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
        public static String IP = "127.0.0.1";
        public static int PORT = 8080;
    
        public static void main(String[] args) {
            EventLoopGroup client = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(client)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new NettyClientHandler());
                            }
                        });
    
                ChannelFuture f = bootstrap.connect(IP,PORT).sync();
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                client.shutdownGracefully();
            }
        }
    }
    
    • 客户端这里只创建一个线程组即可
    • 帮助类这里使用Bootstrap
    • 设置通道为NioSocketChannel
    • 其他类容和服务器雷士
    • 和服务器建立连接

    客户端处理类

    /**
     * @author monkjavaer
     * @date 2019/7/18 17:26
     */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
    
        /**
         * 新的连接被建立时调用
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            LOGGER.info("server {} connected.", ctx.channel().remoteAddress());
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            //获取缓冲区可读字节数
            int readableBytes = byteBuf.readableBytes();
            byte[] bytes = new byte[readableBytes];
            byteBuf.readBytes(bytes);
            LOGGER.info("readableBytes is{},client received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("server exceptionCaught,{}",cause.getMessage());
            ctx.close();
        }
    }
    
  • 相关阅读:
    年入50万的众生相
    【史上最全面经】银行类
    Dubbo背景和简介
    剑指Offer66题的总结、目录
    如何写一份更好的简历
    Linux命令 file
    Linux命令 umask
    Linux perm
    Linux 命令 which whereis locate find
    Linux命令 diff cmp patch
  • 原文地址:https://www.cnblogs.com/monkjavaer/p/11210369.html
Copyright © 2011-2022 走看看