zoukankan      html  css  js  c++  java
  • Netty框架

    学习Netty框架,三连问:

      什么是Netty框架?

      为什么要用Netty框架?

      怎么用Netty框架?

    什么是Netty框架?

      Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

      Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。

      Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程。

      作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

    为什么要用Netty框架?

    因为Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了JDK 原生 NIO 程序的问题。

      JDK 原生 NIO 程序的问题:

        JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:

          1)NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

          2)需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。

          3)可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。

          4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。

      Netty的主要特点有:

        1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。

        2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。

        3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

        4)安全:完整的 SSL/TLS 和 StartTLS 支持。

        5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

    Netty 常见的使用场景如下:

      1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

      2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。

    非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。

      3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

    有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty的Related Projects

    怎么用?(简单入门)

      可参考学习   netty 官方API: http://netty.io/4.1/api/index.html

     配置 pom.xml

    1         <dependency>
    2             <groupId>io.netty</groupId>
    3             <artifactId>netty-all</artifactId>
    4             <version>4.1.31.Final</version>
    5         </dependency>

     配置 ServerConnection.java 

    package com.example.demo.net;
    
    import java.net.InetSocketAddress;
    
    import org.apache.log4j.Logger;
    
    import com.example.demo.handler.ServerMsgHandler;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class ServerConnection {
        
        Logger logger = Logger.getLogger(ServerConnection.class);
        private final int port ;
        private EventLoopGroup bossGroup ;
        private EventLoopGroup workerGroup ;
        
        public ServerConnection(int port) {
            this.port = port ;
        }
        
        /***
         * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
         * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 在这个例子中我们实现了一个服务端的应用,
         * 因此会有2个NioEventLoopGroup会被使用。 第一个经常被叫做‘boss’,用来接收进来的连接。
         * 第二个经常被叫做‘worker’,用来处理已经被接收的连接, 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
         * 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,
         * 并且可以通过构造函数来配置他们的关系。
         */
        public void run() {
            System.out.println("启动服务端Netty连接");
            bossGroup = new NioEventLoopGroup() ;
            workerGroup = new NioEventLoopGroup() ;
            /**
             * ServerBootstrap 是一个服务端启动NIO服务的辅助启动类 , 可以在这个服务中直接使用Channel
             */
            ServerBootstrap bootstrap = new ServerBootstrap() ;
            /**
             * 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
             */
            bootstrap = bootstrap.group(bossGroup, workerGroup) ;
            /***
             * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接
             * 这里告诉Channel如何获取新的连接.
             */
            bootstrap = bootstrap.channel(NioServerSocketChannel.class) ;
            /***
             * 绑定端口
             */
            bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ;
            /***
             * 你可以设置这里指定的通道实现的配置参数。 我们正在写一个TCP/IP的服务端,
             * 因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。
             * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
             */
            bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ;
            /***
             * option()是提供给NioServerSocketChannel用来接收进来的连接。
             * childOption()是提供给由父管道ServerChannel接收到的连接,
             * 在这个例子中也是NioServerSocketChannel。
             */
            bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ;
            /***
             * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类,
             * 目的是帮助使用者配置一个新的Channel。
             * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
             * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,
             * 然后提取这些匿名类到最顶层的类上。
             */
            bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline() ;
                        pipeline.addLast("decoder", new StringDecoder()) ;
                        pipeline.addLast("encoder", new StringEncoder()) ;
                        pipeline.addLast("handler", new ServerMsgHandler()) ;
                    }
                }) ;
            bootstrap.bind().addListener(new ChannelFutureListener() {            
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if ( future.isSuccess() ) {
                        System.out.println("服务端开始监听") ;
    //                    logger.info("服务端开始监听") ;
                    }else {
                        logger.error("服务端无法使用监听端口",future.cause()) ;
                    }
                }
            }) ;
        }
        
        public void shutdown() {
            logger.info("关闭 Server 端口");
            bossGroup.shutdownGracefully() ;
            workerGroup.shutdownGracefully() ;
        }
    
    }
    View Code

    配置 ServerMsgHandler.java 

    package com.example.demo.handler;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ServerMsgHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 这里我们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用,
         * 这个例子中,收到的消息的类型是ByteBuf
         * 
         * @param ctx
         *            通道处理的上下文信息
         * @param msg
         *            接收的消息
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
            System.out.println("服务端接收的消息:"+msg.toString()) ;
            //向客户端发送消息
            String str = msg.toString() ;
            if ( "高性能NIO框架——Netty".equals(str) ) {
                ctx.writeAndFlush( "客户端 , 你好!") ;
            }
    //        ctx.writeAndFlush(msg.toString()+"你好!") ;
        }
        
        /***
         * 这个方法会在发生异常时触发
         * 
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**
             * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO
             * 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel
             * 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
             */
            // 出现异常就关闭
            cause.printStackTrace() ;
            ctx.close() ;
        }
        
    }
    View Code

    配置 ClientConnection.java

    package com.example.demo.net;
    
    import java.net.InetSocketAddress;
    
    import org.apache.log4j.Logger;
    
    import com.example.demo.handler.ClientMsgHandler;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class ClientConnection {
    
        Logger logger = Logger.getLogger(ClientConnection.class);
        private final int port ;
        private EventLoopGroup bossGroup ;
        private EventLoopGroup workerGroup ;
        
        private Channel channel ;
        
        public ClientConnection(int port) {
            this.port = port ;
        }    
        
        public Channel getChannel() {
            return this.channel ;
        }
        
        /***
         * NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
         * Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。 在这个例子中我们实现了一个服务端的应用,
         * 因此会有2个NioEventLoopGroup会被使用。
         * 第一个经常被叫做‘boss’,用来接收进来的连接。
         * 第二个经常被叫做‘worker’,用来处理已经被接收的连接, 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
         * 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,
         * 并且可以通过构造函数来配置他们的关系。
         * @throws InterruptedException 
         */
        public void run() {
            System.out.println("启动客户端Netty连接");
            bossGroup = new NioEventLoopGroup() ;
            workerGroup = new NioEventLoopGroup() ;
            /**
             * Bootstrap 是客户端一个启动NIO服务的辅助启动类 , 可以在这个服务中直接使用Channel
             */
            Bootstrap bootstrap = new Bootstrap() ;
    //        ServerBootstrap bootstrap = new ServerBootstrap() ;
            /**
             * 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
             */
    //        bootstrap.group(bossGroup, workerGroup)
            bootstrap.group(bossGroup)
            /***
             * ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接
             * 这里告诉Channel如何获取新的连接.
             */
    //            .channel(NioServerSocketChannel.class)
                .channel(NioSocketChannel.class)
                /***
                 * 绑定端口,等价于bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把这个注释掉,不然会报错
                 */
    //            .localAddress(new InetSocketAddress(port))
                .remoteAddress("127.0.0.1", port)
                /***
                 * 你可以设置这里指定的通道实现的配置参数。 我们正在写一个TCP/IP的服务端,
                 * 因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。
                 * 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
                 */
    //            .option(ChannelOption.SO_BACKLOG, 128)
                /***
                 * option()是提供给NioServerSocketChannel用来接收进来的连接。
                 * childOption()是提供给由父管道ServerChannel接收到的连接,
                 * 在这个例子中也是NioServerSocketChannel。
                 */
    //            .childOption(ChannelOption.SO_KEEPALIVE, true)
                /***
                 * 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。 ChannelInitializer是一个特殊的处理类,
                 * 目的是帮助使用者配置一个新的Channel。
                 * 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
                 * 或者其对应的ChannelPipeline来实现你的网络程序。 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,
                 * 然后提取这些匿名类到最顶层的类上。
                 */
    //            .childHandler(new ChannelInitializer<SocketChannel>() {
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline() ;
                        pipeline.addLast("decoder", new StringDecoder()) ;
                        pipeline.addLast("encoder", new StringEncoder()) ;
    //                    pipeline.addLast("handler", new ClientMsgHandler()) ;
                        pipeline.addLast(new ClientMsgHandler()) ;
                    }
                }) ;
    /*        bootstrap.bind().addListener(new ChannelFutureListener() {            
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if ( future.isSuccess() ) {
                        System.out.println("客户端开始监听") ;
                        logger.info("客户端开始监听") ;
                    }else {
                        logger.error("客户端无法使用监听端口",future.cause()) ;
                    }
                }
            }) ;    */
            
    /*        //绑定端口,开始接收进来的连接
            ChannelFuture cFuture;
            try {
    //            cFuture = bootstrap.connect(host, port).sync();            
    //            cFuture = bootstrap.bind("127.0.0.1", port).sync();
                cFuture = bootstrap.bind().sync() ;
                //在这里拿到这个channel,是为了 等下 测试消息发送 用的
                channel = cFuture.channel();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        */
                        
            ChannelFuture cf;
            try {
                cf = bootstrap.connect().sync();
                channel = cf.channel();
                channel.writeAndFlush("ClientConnection客户端已成功启动!");
                
                cf.addListener(new ChannelFutureListener() {            
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if ( future.isSuccess() ) {
                            System.out.println("客户端开始监听") ;
    //                        logger.info("客户端开始监听") ;
                        }else {
    //                        logger.error("客户端无法使用监听端口",future.cause()) ;
                        }
                    }
                }) ;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
        
        public void shutdown() {
            System.out.println("关闭 Client 端口");
            logger.info("关闭 Client 端口");
            bossGroup.shutdownGracefully() ;
            workerGroup.shutdownGracefully() ;
        }
        
    }
    View Code

    配置 ClientMsgHandler.java

    package com.example.demo.handler;
    
    import org.apache.log4j.Logger;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ClientMsgHandler extends ChannelInboundHandlerAdapter {
    
        Logger logger = Logger.getLogger(ClientMsgHandler.class) ;
        
        /**
         * 这里我们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用,
         * 这个例子中,收到的消息的类型是ByteBuf
         * 
         * @param ctx
         *            通道处理的上下文信息
         * @param msg
         *            接收的消息
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
    //        super.channelRead(ctx, msg);
            System.out.println("客户端接收的消息:"+msg.toString()) ;
            //向服务端发送消息
            ctx.writeAndFlush("服务端 , 你好!") ;
        }
        
        /***
         * 这个方法会在发生异常时触发
         * 
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            /**
             * exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO
             * 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel
             * 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
             */
            // 出现异常就关闭
            cause.printStackTrace() ;
            ctx.close() ;
        }
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //        super.channelActive(ctx);
    //        logger.info("client channel active");
            System.out.println("client channel active");
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel inactive");
            ctx.close() ;
        }
        
    }
    View Code

    编写测试类 DemoTest.java 

     1 package com.example.demo.netty;
     2 
     3 import org.junit.runner.RunWith;
     4 import org.springframework.boot.test.context.SpringBootTest;
     5 import org.springframework.test.context.ActiveProfiles;
     6 import org.springframework.test.context.junit4.SpringRunner;
     7 
     8 import com.example.demo.DemoApplicationTests;
     9 import com.example.demo.net.ClientConnection;
    10 import com.example.demo.net.ServerConnection;
    11 
    12 import io.netty.channel.Channel;
    13 
    14 @RunWith(SpringRunner.class)
    15 @SpringBootTest(classes = DemoApplicationTests.class)
    16 @ActiveProfiles("test")
    17 public class DemoTest {
    18 
    19     public static void main(String[] args) {
    20         int port = 2222 ;
    21         Thread serverThread = new Thread( new Runnable() {            
    22             @Override
    23             public void run() {
    24                 new ServerConnection(port).run() ;
    25             }
    26         } ) ;
    27         serverThread.start() ;
    28         ClientConnection clientConnection = new ClientConnection(port) ;
    29 //        Thread clientThread = new Thread( new Runnable() {            
    30 //            @Override
    31 //            public void run() {
    32 //                new ClientConnection(port).run() ;
    33 //            }
    34 //        } ) ;
    35 //        clientThread.start() ;
    36         
    37         clientConnection.run();
    38         Channel channel = clientConnection.getChannel() ;
    39         channel.writeAndFlush("高性能NIO框架——Netty");
    40         
    41 //        new Thread( ()->{
    42 //            new ServerConnection(port) ;
    43 //        } ).start() ;        
    44 //        new Thread( ()->{
    45 //            new ClientConnection(port) ;
    46 //        } ).start() ;
    47         
    48     }
    49 }
    View Code

    服务端与客户端的区别:

      1. 在客户端只创建了一个NioEventLoopGroup实例,因为客户端并不需要使用I/O多路复用模型,需要有一个Reactor来接受请求。只需要单纯的读写数据即可

      2. 在客户端只需要创建一个Bootstrap对象,它是客户端辅助启动类,功能类似于ServerBootstrap。

    共同学习,共同进步,若有补充,欢迎指出,谢谢!

  • 相关阅读:
    ssh登录 The authenticity of host 192.168.0.xxx can't be established. 的问题
    linux系统之间互传文件
    Ubuntu16.04上Docker的安装及基本用法
    Ubuntu git 与 gitlab 关联
    Ubuntu E: Sub-process /usr/bin/dpkg returned an error code (1)
    ubuntu16.04搭建jdk1.8运行环境
    Ubuntu18.04 安装Tomcat 8.5
    VMware Ubuntu安装详细过程
    Ubuntu 14.04远程登录服务器--ssh的安装和配置
    Java中文编程开发,让Java编写更改复杂
  • 原文地址:https://www.cnblogs.com/dengguangxue/p/11330950.html
Copyright © 2011-2022 走看看