zoukankan      html  css  js  c++  java
  • Netty(二、深入理解)

    reactor模式

    在深入了解Netty之前,我们需要先知道reactor(反应器模式),是高性能网络编程必须知道的模式。

    BIO

    我们先了解下原始socket编程:

    //这里可以是个多线程,每个线程对应一个socket,循环处理业务,此处代码就略了,主要讲逻辑
    while
    (true){
           //new Thread()...
    //Server监听指定端口 ServerSocket server = new ServerSocket(8080); //socket阻塞,一直等待着连接到来 Socket socket = server.accept(); //从socket获取输入流 InputStream inputStream = socket.getInputStream(); //建立缓冲区进行读取 byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { //指定编码格式 sb.append(new String(bytes, 0, len,"UTF-8")); } System.out.println(sb); inputStream.close(); socket.close(); server.close(); }

      以服务端为例,Socket建立好后不断循环监听是否有套接字连接,获取到连接后,从socket获取输入流。在发送/接收数据时,并不是直接从网络中读取或发送,而是要通过缓冲区,例如:发送数据时,现将数据写入缓冲区,然后再由TCP/IP协议将数据由缓冲区发送目标的缓冲区,目标从缓冲区中读取。

      这种多线程的socket虽然通过一个线程一个socket的方式,提高了服务器的吞吐,但每个线程内部还是阻塞的,当并发量大时,线程的反复创建和销毁会对系统造成巨大的负担。针对这种情况,我们就需要用到reactor模式。

    单线程reactor模式

    reactor模式,基于java NIO之上,抽象出了两个组件:Reactor和Handler。

    Reactor:负责响应IO事件,如新事件的连接、读写,将事件交给Handler处理。

    Handler:负责事件的处理,完成channel的读取,事件逻辑处理,channel的写出。

    Reactor

    package com.wk.test.nettyTest;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    class Reactor implements Runnable {
    
        //选择器
        final Selector selector;
        //服务端通道
        final ServerSocketChannel serverChannel;
    
        //构造函数初始化
        Reactor(int port) throws IOException {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            //绑定连接
            serverChannel.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverChannel.configureBlocking(false);
            //将服务端的通道绑定到选择器上面,并定义事件为接收连接时间
            //OP_ACCEPT:接收连接就绪事件,服务端监听到客户端,可接收连接 1<<4
            //OP_CONNECT:连接就绪事件,表示客户端与服务端建立连接成功 1<<3
            //OP_READ:读就绪事件,表示通道中有可读数据,可执行读操作 1<<0
            //OP_WRITE:写就绪事件,表示可以向通道写数据 1<<2
            SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            //选择键通过attach方法附加一个对象
            selectionKey.attach(new Acceptor());
    
        }
    
    
    
        @Override
        public void run() {
            //不中断的线程则循环,interrupted方法,判断线程是否中断,并能释放已经中断的线程
            while (!Thread.interrupted()){
                try {
                    //这里每一个request封装一个channel,所有的channel注册在一个选择器上,selector选择器不断轮询查看可读状态
                    selector.select();
                    //获取选择器的选择键集合
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectedKeys.iterator();
                    while (iterator.hasNext()){
                        SelectionKey selectedKey = iterator.next();
                        //attachement方法可以获取attach方法附加的对象,这里就是前面附加进来的Handler对象,也就是事件处理类
                        Runnable r = (Runnable) selectedKey.attachment();
                        if(r!=null){
                            r.run();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        class Acceptor implements Runnable{
    
            @Override
            public void run() {
                try {
                    //获取已连接上的channel通道
                    SocketChannel channel = serverChannel.accept();
                    if(channel!=null){
                        //自定义Handler,事件处理类,将通道绑定到选择器上面
                        new Handler(selector,channel);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    Handler

    package com.wk.test.nettyTest;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class Handler implements Runnable {
    
        //通道
        final SocketChannel channel;
        //绑定到选择器的选择键
        final SelectionKey selectionKey;
        //定义输入输出缓冲区
        ByteBuffer inputBuffer = ByteBuffer.allocate(102400);
        ByteBuffer outputBuffer = ByteBuffer.allocate(102400);
        static final boolean READING = true, WRITING = false;
        //初始化定义可读就绪
        boolean status = READING;
    
        Handler(Selector selector, SocketChannel c) throws IOException {
            channel = c;
            //非阻塞
            c.configureBlocking(false);
            //这里将通道注册到选择器上,本应后面的int是 1(读),4(写),8(连接),16(可连接)的
            //这种操作貌似是判断JDK的selector有没有立即返回或报错,并不引起任何实质操作。
            //https://github.com/netty/netty/issues/1836 这个讨论问题的地址,外国友人貌似也搞不懂,似乎是个JDK NIO的BUG
            selectionKey = channel.register(selector, 0);
            //选择键将本身也就是Handler附加
            selectionKey.attach(this);
            //定义当前选择键是读就绪状态
            selectionKey.interestOps(SelectionKey.OP_READ);
            //唤醒选择器
            selector.wakeup();
        }
    
        @Override
        public void run() {
            try {
                if (status) {
                    read();
                } else {
                    write();
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        public void read() throws IOException {
            channel.read(inputBuffer);
            //一系列逻辑判定和处理
            status = WRITING;
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    
        public void write() throws IOException {
            channel.write(outputBuffer);
            //判定写操作执行完毕后,关闭selectKey
            selectionKey.cancel();
        }
    }

      客户端每个请求都封装成一个channel通道连接到selector上面,并有一个selectionKey选择键,选择器的附加对象是Handler处理器,将请求分派到handler中。

    单线程的缺点是当Handler阻塞时,会导致其他client的请求也阻塞,这种实际使用不多,一般使用多线程的reactor模式。

    多线程reactor模式

    多线程是将handler放入一个线程池,多线程的进行业务处理

     具体代码就不展示了,也就是在Handler中建立一个线程池来进行读写操作。

    ps:以上就是基于java NIO的reactor模式,虽然逻辑有些复杂且不易理解。但是在理解Netty之前一定要先理解它。

    Netty DEMO

    我们先将demo代码贴上来,通过代码来对Netty进行理解。

    服务端

    NettyServerTest

    package com.wk.test.nettyTest;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyServerTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);
    
        public static void main(String[] args) {
    
            //实例化两个线程组
            //处理服务器与客户端的连接
            EventLoopGroup pGroup = new NioEventLoopGroup(1);
            //进行网络通信(读写)
            EventLoopGroup cGroup = new NioEventLoopGroup(10);
            //配置容器,配置相关信息
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(pGroup,cGroup)                                   //绑定两个线程组
                    .channel(NioServerSocketChannel.class)                  //指定NIO的模式
                    .childHandler(new ChannelInitializer<SocketChannel>() { //配置业务处理类
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解码器
                            socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            //编码器
                            socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            //自定义事件处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 1024)         //设置TCP缓冲区
                    .childOption(ChannelOption.SO_KEEPALIVE, true); //保持连接
            try {
                //绑定端口启动
                ChannelFuture channelFuture = bootstrap.bind(8090).sync();
                logger.info("服务器启动开启监听端口:{}",8090);
                //等待关闭
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //Netty优雅退出
                pGroup.shutdownGracefully();
                cGroup.shutdownGracefully();
            }
    
        }
    }

    NettyServerHandler

    package com.wk.test.nettyTest;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 服务端事件处理器,基础入站处理器类
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    
        /**
         * 客户端连接时触发
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("Channel active");
        }
    
        /**
         * 客户端发送消息时触发
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("服务端接收的消息:{}", msg.toString());
            ctx.write("服务器返回");
            ctx.flush();
        }
    
        /**
         * 发生异常时触发
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

    客户端

    NettyClientTest

    package com.wk.test.nettyTest;
    
    import cn.jiguang.common.connection.NettyClientInitializer;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyClientTest {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);
    
        public static void main(String[] args) {
            //客户端只需要定义一个读写的线程组
            EventLoopGroup group = new NioEventLoopGroup();
            //客户端是bootstrap,其他和服务端配置大同小异
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            try {
                //连接服务器地址
                ChannelFuture future = bootstrap.connect("127.0.0.1",8090).sync();
                logger.info("客户端启动成功");
                //发送信息
                future.channel().writeAndFlush("你好啊").sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //优雅关闭
                group.shutdownGracefully();
            }
        }
    }

    NettyClientHandler

    package com.wk.test.nettyTest;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 客户端处理类,继承入站处理适配器
     */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("客户端Active .....");
        }
    
        /**
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("客户端接收的消息:{}", msg.toString());
        }
    
        /**
         * 发生异常时触发
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

     EventLoop

    //实例化两个线程组
    //处理服务器与客户端的连接
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    //进行网络通信(读写)
    EventLoopGroup workerGroup = new NioEventLoopGroup(10);

    以服务端为例,设置两个线程组。

    一个线程组负责监听连接的parentChannel,定义为BossLoopGroup

    另一个线程组负责客户端连接读写的childChannel,定义为WorkerLoopGroup

    一个线程封装到一个EventLoop,多个EventLoop就组成了线程组。而每一个channel绑定一个EventLoop,一个EventLoop可以有多个channel。

     Bootstrap

    Bootstrap是Netty提供的一个工厂类,我们可以通过它来完成对Netty服务端或客户端的初始化配置,这样我们就省去了用JDK NIO繁琐的创建channel、设置、启动等步骤,将重心放在事件业务处理上面。

    Bootstrap分为服务端的ServerBootstrap和客户端的Bootstrap

    Bootstrap执行分为8个步骤:

    ServerBootstrap bootstrap = new ServerBootstrap()
                    //1.设置reactor线程
                    .group(bossGroup,workerGroup)
                    //2.设置channel通道的类型,这里是NIO
                    .channel(NioServerSocketChannel.class)
                    //3.设置监听端口
                    .localAddress(new InetSocketAddress(8090))
                    //4.设置通道的选项
                    .option(ChannelOption.SO_BACKLOG, 1024)         //设置TCP缓冲区
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //心跳检测保持连接
                    //5.配发事件处理器流水线
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解码器
                            socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            //编码器
                            socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            //自定义事件处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
    
            try {
                //6.绑定servr,这里使用了sync方法,直到绑定成功为止
                ChannelFuture channelFuture = bootstrap.bind().sync();
                logger.info("服务器启动开启监听端口:{}",8090);
                //7.等待关闭,直到channel关闭为止
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //8.Netty优雅退出
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }

    这8个步骤和我上面写的DEMO顺序方面可能稍有不同,不过不影响。

    Channel

    核心概念以及流程

    Channel是服务端与客户端的通信通道,每一个request都可以封装成一channel。

    ChannelPipeline是用于存放Handler的容器,里面存放这事件处理器流水线。

    ChannelHandler是处理器,分为入站处理器和出站处理器,以客户端的角度来看,客户端到服务端是出站,服务端到客户端就是入站。

    ChannelContext是通信管道的上下文,当一个入站或出站处理器处理完后,将上下文传给下一个入站或出站处理器。

     ChannelHandler

    //5.配发事件处理器流水线
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解码器
                            socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            //编码器
                            socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            //自定义事件处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

    这里就是配发事件处理器流水线,编码器实质上也是个处理器,出站编码,入站解码。

    自定义处理器也要继承出站或入站的事件处理配置类

    public class NettyServerHandler extends ChannelInboundHandlerAdapter 

    ByteBuf

    数据在网络中传输并不是直接传输的,而是要通过缓冲区。写出时,先将数据写到缓冲区,再由TCP协议将数据从缓冲区。读取时也是一样,从缓冲区读取。

     JAVA NIO中的缓冲区是ByteBuff,长度固定且只有一个索引,在读写操作的时候还需要切换读写状态。而Netty的ByteBuf则改良了这些问题。

    在ByteBuf中,提供了三个索引,读索引(readIndex)、写索引(writeIndex)、最大容量(maxCapacity)

    缓冲区的释放

    我们再看看这张图,入站时,当走到tailHandler(最后一个Handler)的时候,会释放掉缓冲区。出站则是在headHandler释放。

  • 相关阅读:
    python:返回函数,闭包
    对象的行为和数组
    类、对象和包
    Java语言中的程序流程控制
    初识Java,Java语言概述
    有限广播地址与直接广播地址
    H3C模拟器HCL注意事项
    HDLC协议
    NETBIOS的作用
    HP DL380G7 RAID配置
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12658874.html
Copyright © 2011-2022 走看看