zoukankan      html  css  js  c++  java
  • 网络I/O模型--07Netty基础

          Netty 是由 JBOSS 提供的一个 Java 开源框架。 Netty 提供异步的、事件驱动的网络应用程序框架和工具 ,用以快速开发高性能 、 高可靠性的网络服务器和客户端程序。
          Netty 框架是对 Java BIO 、 Java NIO 框架的再次封装。 Netty 框架是一个面向上层业务实现进行封装的“业务层”框架。而Java Socket 框架、 Java NIO 框架、 Java AIO 框架更偏向于对下层技术实现的封装,是面向“技术层” 的框架

     

    1.工作原理

          1) Boss 使用的线程池:Boss 线程池实际上就是 Java NIO 框架中的 Selector 工作角色,针对一个本地 IP 的端口,Boss 线程池中有一条线程工作,工作内容也相对简单,就是发现新的连接: Ne时支持同时监听多个端口,所以 Boss 线程池的大小按照需要监昕的服务器端口数量进行设置。
           2) Work 线程池中的线程:(如果封装的是 Java NIO,那么具体的线程实现类就是 NIOEventLoop )都固定负责指派给它的网络连接的事件监听,并根据状态调用不同的Channe!Handler 事件方法。而最后一个参数 SelectorProvider 说明了这个 EventLoop 所使用的 多路复用I/O模型的具体实现由操作系统决定。
    option 方法可以设置这个 ServerChannel 相应的各种属性(在代码中使用的是NIOServerSocketChannel) ; childOption 方法用于设置这个 ServerChannel 收到客户端事件后,所生成的新的 Channel 的各种属性(代码中生成的是 NIOSocketChannel ) 。


    2. Netty 线程中几制
           在 Netty 中,原来Java NIO中的Selector 的工作就交给 Boss 线程完成,而且建议使用线程池技术 。 Boss 线程负责发现连接到服务器的新的 Channel ( SocketServerChannel的 ACCEPT 事件),并且将这个 Channel 经过检查后注册到 Work 连接池的某个 EventLoop 线程中 。 而当 Work 线程发现操作系统有一个它感兴趣的I/O事件时(例如 SocketChannel 的 READ 事件 ) ,则调用相应的 ChannelHandler事件 。当某 个 Channel 失效后(例如显示调用 ctx.close() ) ,这个 Channel 将从绑定的EventLoop 中被剔除 。


           在 Netty 中,如果我们使用的是 Java NIO 框架实现的对多路复用I/O模型的支持,那么进行这个循环的是 NIOEventLoop 类 (可参见该类中的 processSelectedKeysPlain 方法和processSelectedKey 方法) 。

     

          一个 Work 线程池的线程将按照底层封装 Java NIO 框架中 Selector 的事件状态, 决定执行ChannelHandler 中的哪一个事件方法( Netty 中包括了 channelRegistered 、 channelUnregistered 、channelActive 、 channellnactive 等事件方法)。执行完成后, Work 线程将一直轮询直到操作系统回复下一个它所管理的 Channel 发生了新的 I/O 事件。

     

    3.Netty 的几个概念: Channel 、 Buffer、 ChannelPipeline 、 ChanneHandler 、 ChannelHandlerContext等

    1) ByteBuf
           Netty 重写了 Java NIO 框架中的缓存结构,井将这个结构应用在更上层的封装ByteBuf(其实现有:EmptyByteBuf、 ReadOnlyByteBuf、 UnpooledDirectByteBuf、 PooledByteBuf)中 。

     

    2) Channel
           Netty中的 Channel 专门代表网络通信,这个和 Java NIO 框架中的 Channel 不一样,它是专门代表网络通信,所以它是由客户端地址+服务器地址+网络操作状态构成的
           在 Netty中,不止封装了多路复用I/O 模型,还封装了 Java BIO 支持的同步网络I/O通信模型。将它们在表现上都抽象成 Channel了。而我们知道在 Java BIO 支持的同步网络I/O模型 中,原来是不存在 Channel 这个概念的 。

     

    3) ChannelPipeline 与 ChannelHandler
           Netty 中的每一个 Channel ,都有一个独立的 ChannelPipeline , 它是双向的,数据可以通过这个它流入到服务器,也可以通过它从服务器流出。
           在 ChannelPipeline 中, 有若干个过滤器,我们称之为“ ChannelHandler ” (也可以称为过滤器) 。 同“流入”和“流出”的概念相对应:用于处理/过滤“流入数据”的 ChannelHandler,被称为“ ChannellnboundHandler ”: 用于处理/过滤“流出数据”的 ChannelHandler,被称为“ ChannelOutboundHandler”,

     

    (1) 责任链和适配器的应用
            数据在 ChannelPipeline 中由 一个一个的 Handler 进行处理,井形成一个新的数据状态 。这是典型的“责任链”模式。
           虽然数据管道中的 Handler 是按照顺序执行的, 但不代表某一个 Handler 会处理任何一种由“上一个 Handler”发送过来的数据。某些 Handler 会检查传来的数据是否符合要求,如果不符合自己的处理要求,则不进行处理 。

     

    (2) ChannellnboundHandler 类举例
           HttpRequestDecoder、ByteArrayDecoder、DelimiterBasedFrameDecoder、ProtobuIDecoder 和 ProtobufVarint32FrameDecoder等等

     

    (3) ChannelOutboundHandler 类举例
           HttpResponseEncoder、ByteArrayEncoder、ProtobutEncoder 、 ProtobufVarint32LengthFieldPrepender 、 MarshallingEncoder 、JZlibEncoder 等等

     

    4. 信息格式
           对数据信息格式的封装: Protobuf 数据协议、JBoss Marshalling 数据协议、HTTP Request/HTTP Response 协议

     

    5. 解决半包问题和粘包问题
    1)MSS: MSS 属性是 TCP 连接双方在三次握手时所确认的每一个 TCP 报文段中数据宇段的最大长度。
    2)半包是指,接收方应用程序在接收信息时,没有接收到 一个完整的信息格式块;
    3)粘包是指,接收方应用程序在接收信息时,除了接收到发送方应用程序发送的某一个完整数据信息描述,还接收到了 一下发送方应用程序发送的下一个数据信息的一部分 。
    4)半包和粘包是针对应用程序来说的,这个问题只 会发生在 TCP 协议进行连续发送数据时 ( TCP 长连接 )
    5)半包/粘包是一个应用层问题,在应用程序层面上、在业务层面上,我们自行定义的“数据块”,但在 TCP 层面上并不被协议认可。
    6)常见的有两种方式 :

          一是消息定长,即保证每一个完整的信息描述的长度都是一定的,这样无论 TCP/IP 协议如何进行分片,数据接收方都可以按照固定长度进行消息的还原 ;
          二是在完整的一块数据结束后增加协商一致的分隔符 (例如增加一个回车符,再例如我们之前示例中一直使用的“over”关键字。
    7)在Netty中可以应用FixedLengthFrameDecoder 、 DelimiterBasedFrameDecoder 、 LineBasedFrameDecoder 解决半包/粘包问题

     

    6.实例

    1.TestTCPNettyServer1

    package testNetty;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.concurrent.ThreadFactory;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.bytes.ByteArrayDecoder;
    import io.netty.handler.codec.bytes.ByteArrayEncoder;
    import io.netty.util.concurrent.DefaultThreadFactory;
    
    public class TestTCPNettyServer1 {
        public static void main(String[] args) throws Exception {
            // 这就是主要的服务启动器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // =======================下面设置线程池
            // Boss 线程池
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
            // Work 线程池:这样的申明方式,主要是为了向读者说明 Netty 的线程组是怎样工作的
            ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool");
            // CPU 个数
            int processorsNumber = Runtime.getRuntime().availableProcessors();
            EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber, threadFactory,
                    SelectorProvider.provider());
            // 指定 Netty 的 Boss 线程和 Work 线程
            serverBootstrap.group(bossLoopGroup, workLoogGroup);
            // 如果是以下的申明方式,则说明 Boss 线程和 Work 线程共享一个线程池
            // serverBootstrap . group(workLoogGroup);
            // ========================下面我们设置服务的通道类型
            // 只能是实现了 ServerChannel 接口的 “ 服务器”通道类
            serverBootstrap.channel(NioServerSocketChannel.class);
            // 当然也可以这样创建( SelectorProvider 是不是感觉很熟悉〉
            /*
             * serverBootstrap.channelFactory(new ChannelFactory<NioServerSocketChannel>() {
             * 
             * @Override public NioServerSocketChannel newChannel() { return new
             * NioServerSocketChannel(SelectorProvider.provider());
             * 
             * } });
             */
    
            // ========================设置处理器
            // 这里设置了一组简单的 ByteArrayDecoder 和 ByteArrayEncoder
            // Netty 的特色就在这一连串“通道水管 ” 中的“处理器 ”
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ByteArrayEncoder());
                    ch.pipeline().addLast(new TcpServerHandler());
                    ch.pipeline().addLast(new ByteArrayDecoder());
                }
            });
            // === =====================设置 Netty 服务器绑起的 IP 和端口
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 8888));
            // 还可以监控多个端口
            // serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 8080));
    
        }
    }

     

    2.TcpServerHandler

    package testNetty;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.AttributeKey;
    
    @Sharable
    public class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
        private final static Logger LOGGER = LoggerFactory.getLogger(TcpServerHandler.class);
    
        // 每一个 Channel ,都有独立的 handler 、 ChannelHandlerContext 、 ChannelPipeline 、 Attribute
        // 所以不需要担心多个 Channel 中的这些对象相互影响
        // 这里我们使用 Content 这个 Key ,记录这个 handler 中已经接收到的客户端信息
        private static AttributeKey<StringBuffer> content = AttributeKey.valueOf("content");
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            TcpServerHandler.LOGGER.info("super.channelRegistered(ctx)");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            TcpServerHandler.LOGGER.info("super.channelUnregistered(ctx)");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            TcpServerHandler.LOGGER.info("super.channelActive(ctx) = " + ctx.toString());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            TcpServerHandler.LOGGER.info("super.channelinactive(ctx)");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            TcpServerHandler.LOGGER.info("channelRead(ChannelHandlerContext ctx, Object msg ) ");
            // 我们使用 IDE 工具模拟长连接中的数据缓慢提交
            // 由 read 方法负责接收数据,但只是进行数据累加,不进行任何处理
            ByteBuf byteBuf = (ByteBuf) msg;
            try {
                StringBuffer contextBuffer = new StringBuffer();
                while (byteBuf.isReadable()) {
                    contextBuffer.append((char) byteBuf.readByte());
                }
                // 加入临时区域
                StringBuffer content = ctx.attr(TcpServerHandler.content).get();
                if (content == null) {
                    content = new StringBuffer();
                    ctx.attr(TcpServerHandler.content).set(content);
                }
                content.append(contextBuffer);
            } catch (Exception e) {
                throw e;
            } finally {
                byteBuf.release();
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            TcpServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContextctx)");
            // 由 readComplete 方法负责检查数据是否接收完了
            StringBuffer content = ctx.attr(TcpServerHandler.content).get();
            // 如果条件成立,则说明还没有接收到完整的客户端信息
            if (content.indexOf("ver") == -1) {
                return;
            }
            // 当接收到信息后,首先要做的是清空原来的历史信息
            ctx.attr(TcpServerHandler.content).set(new StringBuffer());
            // 准备向客户端发送响应
            ByteBuf byteBuf = ctx.alloc().buffer(1024);
            byteBuf.writeBytes("回发响应信息!".getBytes());
            ctx.writeAndFlush(byteBuf);
            // 正常终止这个通道上下文,就可以关闭通道了
            // 如果不关闭,这个通道的会话将一直存在,
            // 只要网络是稳定的,服务器就可以随时通过这个会话向客户端发送信息
            // 关闭通道意味着 TCP 将正常新开,其中所有的
            // handler 、 ChannelHandlerContext 、 ChannelPipeline 、 Attribute 等信息
            // 都将被注销
            ctx.close();
        }
    }

     

    3.TestTCPNettyClient

    package testNetty;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.net.URLEncoder;
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class TestTCPNettyClient implements Runnable {
        private final static Logger LOGGER = LoggerFactory.getLogger(TestTCPNettyClient.class);
        private CountDownLatch countDownLatch;
    
        private Integer clientindex;
    
        public TestTCPNettyClient(CountDownLatch countDownLatch, Integer clientindex) {
            this.countDownLatch = countDownLatch;
            this.clientindex = clientindex;
        }
    
        @Override
        public void run() {
            Socket socket = null;
            OutputStream clientRequest = null;
            InputStream clientResponse = null;
            try {
                socket = new Socket("localhost", 8888);
                clientRequest = socket.getOutputStream();
                clientResponse = socket.getInputStream();
                this.countDownLatch.await();
                clientRequest.write(URLEncoder.encode("第" + this.clientindex + "个客户端请求 11 。", "UTF-8").getBytes());
                clientRequest.flush();
                clientRequest.write(URLEncoder.encode("第" + this.clientindex + "个客户端请求 22 。 over。", "UTF-8").getBytes());
                TestTCPNettyClient.LOGGER.info("第" + this.clientindex + "个客户端请求发送完成 , 等待服务器返回 ");
                int maxLen = 1024;
                byte[] contextBytes = new byte[maxLen];
                int realLen;
                String message = "";
                while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
                    message += new String(contextBytes, 0, realLen);
                }
                TestTCPNettyClient.LOGGER.info("接收到来自服务器的信息:" + message);
            } catch (Exception e) {
                TestTCPNettyClient.LOGGER.error(e.getMessage(), e);
            } finally {
                // 试图关闭连接
                try {
                    clientRequest.close();
                    clientResponse.close();
                    socket.close();
                } catch (Exception e) {
                    TestTCPNettyClient.LOGGER.error(e.getMessage(), e);
                }
            }
        }
    }

     

    7.使用 Netty 的 HTTP 编码/解码处理器设计的一个简单的 Web 服务器

     

    package testNetty;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.concurrent.ThreadFactory;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpContent;
    import io.netty.handler.codec.http.HttpHeaders;
    import io.netty.handler.codec.http.HttpMethod;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.util.AttributeKey;
    import io.netty.util.concurrent.DefaultThreadFactory;
    import java.nio.charset.Charset;
    
    public class TestHTTPNettyServer1 {
        public static void main(String[] args) throws Exception {
            // 这就是主要的服务启动器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // ===========下面设置线程池
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
            ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool");
            int processorsNumber = Runtime.getRuntime().availableProcessors();
            EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory,
                    SelectorProvider.provider());
            serverBootstrap.group(bossLoopGroup, workLoogGroup);
            // =======下面设置服务的通道类型(代码已经详细讲解过,就不再赘述了)
            serverBootstrap.channel(NioServerSocketChannel.class);
            // =======设置处理器
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 我们在 socket channel pipeline 中加入 HTTP 的编码/解码器
                    ch.pipeline().addLast(new HttpResponseEncoder());
                    ch.pipeline().addLast(new HttpRequestDecoder());
                    ch.pipeline().addLast(new HTTPServerHandler());
                }
            });
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 8888));
        }
    }
    
    @Sharable
    class HTTPServerHandler extends ChannelInboundHandlerAdapter {
        private final static Logger LOGGER = LoggerFactory.getLogger(ChannelInboundHandlerAdapter.class);
        // 由于一次 HttpContent 可能没有传输完全部的请求信息,所以这里要做一个连续的记录
        // 然后在 channelReadComplete 方法中(执行了这个方法说明这次所有的 HTTP 内 容都传输完了〉 进行处理
        private static AttributeKey<StringBuffer> CONNTENT = AttributeKey.valueOf("content");
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            /**
             * 在测试中,首先取出客户端传来的参数 、 URL 信息,并且返回一个确认信息要使用 HTTP 服务,我们首先要了解 Netty 中 HTTP 的格式, 所
             * 以通过 HttpRequestDecoder channel handler 解码后的 msg 可能有两种类型 : HttpRquest :
             * 里面包含了请求 head、请求的 URL 等信息 HttpContent : 请求的主体内容
             */
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                HttpMethod method = request.getMethod();
                String methodName = method.name();
                String url = request.getUri();
                HTTPServerHandler.LOGGER.info("methodName = " + methodName + "&& url =" + url);
            }
            // 如果条件成立,则在这个代码段实现 HTTP 请求内容的累加
            if (msg instanceof HttpContent) {
                StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
                if (content == null) {
                    content = new StringBuffer();
                    ctx.attr(HTTPServerHandler.CONNTENT).set(content);
                }
                HttpContent httpContent = (HttpContent) msg;
                ByteBuf contentBuf = httpContent.content();
                String preContent = contentBuf.toString(Charset.forName("UTF-8"));
                content.append(preContent);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            HTTPServerHandler.LOGGER.info("super.channelReadComplete (ChannelHandlerContext ctx)");
            // 一旦本次 HTTP 请求传输完成,就可以进行业务处理了,并且返回响应
            StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
            HTTPServerHandler.LOGGER.info(" HTTP 客户端传来的信息为 :" + content);
            // 开始返回信息了
            String returnValue = "return response";
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            HttpHeaders httpHeaders = response.headers();
            // 这些就是 HTTP response 的 head 信息 , 参见 HTTP 规范。另外还可以设置自己的head 属性
            httpHeaders.add("param", "value");
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
            // 一定要设置长度,否则 HTTP 客户端会一室等待(因为返回的信息长度客户端不知道〉
            response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, returnValue.length());
            ByteBuf responseContent = response.content();
            responseContent.writeBytes(returnValue.getBytes("UTF-8"));
            // 开始返回
            ctx.writeAndFlush(response);
        }
    }

     

     

     

     

     

  • 相关阅读:
    MySQL5.7安装详细教程
    Java之GUI编程
    Java基础
    生成JavaDoc文档
    SpringtMVC运行流程:@RequestMapping 方法中的 Map、HttpServletRequest等参数信息是如何封装和传递的(源码理解)
    SpringCache @Cacheable 在同一个类中调用方法,导致缓存不生效的问题及解决办法
    Spring源码学习:第1步--在Spring源码中添加最简单的Demo代码
    Spring源码学习:第2步--使用SLF4j+Log4j日志框架替换掉其自身的commons-logging日志框架
    Spring源码学习:第0步--环境准备
    JasperReport报表
  • 原文地址:https://www.cnblogs.com/gispathfinder/p/9032997.html
Copyright © 2011-2022 走看看