zoukankan      html  css  js  c++  java
  • Netty 4源码解析:请求处理

    Netty 4源码解析:请求处理

    通过之前《Netty 4源码解析:服务端启动》的分析,我们知道在最前端“扛压力”的是NioEventLoop.run()方法。我们指定创建出的NioServerSocketChannel就是注册到了NioEventLoop中的Selector上。所以我们继续顺藤摸瓜,看看服务端启动完成后,Netty是如何处理每个请求的。


    1.MainReactor

    1.1 事件轮询

    之前我们曾分析过到NioEventLoop.run()方法,但因为之前只关注启动流程所以“浅尝辄止”了,这次我们就以它为起点开始深入分析。NioEventLoop于Selector绑定,它是真正轮询Selector的地方。至于有哪一个或哪些Channel的事件绑定到Selector了,则是注册阶段决定的。对于MainReactor来说,只有一个NioEventLoop负责处理一个ServerSocketChannel的事件。

    // NioEventLoop
        @Override
        protected void run() {
            for (;;) {
                boolean oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select(oldWakenUp);
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    if (ioRatio == 100) {
                        processSelectedKeys();
                        runAllTasks();
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
                }
            }
        }
    
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            if (selectedKeys.isEmpty()) {
                return;
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                }
    
                if (!i.hasNext()) {
                    break;
                }
            }
        }

    1.2 事件触发

    不知道大家是否还记得,NioServerSocketChannel注册时有一个小细节,就是它将自己作为了attachment。所以上面处理SelectionKey时就能通过attachement取到注册时的Channel。为什么一定要拿到当时的Channel呢?继续往下看。

        @Override
        protected void doRegister() throws Exception {
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    // ...
                }
            }
        }

    原来跟之前介绍过的注册和绑定一样,最终都是通过Channel的unsafe工具类来完成的。

    // NioEventLoop
    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }

    1.3 感兴趣的事件

    还有一个小细节,就是doRegister()注册时实际传给register()的事件是0,也就是对任何事件都不感兴趣,这又是怎么回事呢?原来具体对什么事件感兴趣是在子类的构造方法里传入的。如果是isAutoRead(),那么一旦Channel连接成功就会自动触发一次读操作。真正注册感兴趣事件的地方就是第一次读操作的时候。

    // NioServerSocketChannel
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
        }
    
    // AbstractNioChannel
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                //...
            }
        }
    
    // DefaultChannelPipeline
        @Override
        public ChannelPipeline fireChannelActive() {
            head.fireChannelActive();
    
            if (channel.config().isAutoRead()) {
                channel.read();
            }
            return this;
        }
    
    // AbstractNioChannel
        @Override
        protected void doBeginRead() throws Exception {
            final SelectionKey selectionKey = this.selectionKey;
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }

    1.4 事件处理

    紧接着刚才的processSelectedKey()说,既然OP_ACCEPT都已经注册上了,当接收到新用户连接时就会触发unsafe.read()方法。read()会不断调用doReadMessages(),将产生的readBuf逐一发送给Pipeline.fireChannelRead()去处理。

        private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                final ChannelConfig config = config();
                final int maxMessagesPerRead = config.getMaxMessagesPerRead();
                final ChannelPipeline pipeline = pipeline();
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        for (;;) {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
                            if (readBuf.size() >= maxMessagesPerRead) {
                                break;
                            }
                        }
                    } catch (Throwable t) {
                        exception = t;
                    }
                    setReadPending(false);
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
    
                    readBuf.clear();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        pipeline.fireExceptionCaught(exception);
                    }
                }
            }
        }

    来看看NioServerSocketChannel对doReadMessages()的覆写吧,原来接收并创建Channel的工作就是在这完成的。JDK Channel被保存到Netty包装后的NioSocketChannel中,然后传给Pipeline处理。

    // NioServerSocketChannel
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
            return 0;
        }

    2.主Pipeline

    2.1 “主从”的桥梁

    又到了“温故时间”,还记得初始化Channel时Netty是怎么做的吗?我们创建了一个叫做ServerBootstrapAcceptor的Handler,它持有的childGroup和childHandler就是SubReactor的NioEventLoopGroup和Handler。

    // ServerBootstrap
        @Override
        void init(Channel channel) throws Exception {
            // ...
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            // ...
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }

    紧接着前面unsafe.read()方法中的fireChannelRead(),会触发ServerBootstrapAcceptor的channelRead()。在这里,msg就是新创建的SocketChannel,将我们的定义的Handler都加入到子Pipeline中。所以说,ServerBootstrapAcceptor就是主从Reactor间的桥梁,它不断将从主Reactor接收到的Channel绑定到从Reactor的一个EventLoop上

        private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                try {
                    childGroup.register(child);
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
        }

    2.2 注册读事件

    因为Netty 4中已经完全统一了EventLoopGroup的代码,已经不区分主从Reactor的逻辑了。所以实际上,这里的注册过程我们已经分析过了。子EventLoopGroup会选择出一个EventLoop负责轮询绑定上的Channel的事件,而Channel感兴趣的事件前面也提到了,就是Channel构造方法中传入的

    // NioSocketChannel
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
    // AbstractNioByteChannel
        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }

    2.3 创建Handler

    使用Netty时我们通常会在ChannelInitializer中初始化Handler,但Netty是什么时候调用它的呢?答案就在Channel注册到子EventLoop之后。之前看到的fireChannelRegistered()会触发ChannelInitializer。所以说:每个客户端Channel建立成功后会创建Handler,并且后续请求处理都由这一组Handler完成

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
        /**
         * This method will be called once the {@link Channel} was registered.
         */
        protected abstract void initChannel(C ch) throws Exception;
    
        @Override
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ChannelPipeline pipeline = ctx.pipeline();
            try {
                initChannel((C) ctx.channel());
                pipeline.remove(this);
                ctx.fireChannelRegistered();
            } catch (Throwable t) {
                logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
            }
        }
    }

    3.SubReactor

    通过前面的分析能够看到,EventLoop轮询到的事件最终会交给unsafe.read()去处理。NioSocketChannel与NioServerSocketChannel的一个重要区别就是:NioSocketChannel继承AbstractNioByteChannel,而后者继承AbstractNioMessageChannel,两者的unsafe工具类实现是不同的。

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    
            @Override
            public void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final int maxMessagesPerRead = config.getMaxMessagesPerRead();
                RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
                if (allocHandle == null) {
                    this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
                }
    
                ByteBuf byteBuf = null;
                int messages = 0;
                boolean close = false;
                try {
                    int totalReadAmount = 0;
                    boolean readPendingReset = false;
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        int writable = byteBuf.writableBytes();
                        int localReadAmount = doReadBytes(byteBuf);
                        if (localReadAmount <= 0) {
                            // not was read release the buffer
                            byteBuf.release();
                            close = localReadAmount < 0;
                            break;
                        }
    
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
    
                        if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                            // Avoid overflow.
                            totalReadAmount = Integer.MAX_VALUE;
                            break;
                        }
    
                        totalReadAmount += localReadAmount;
    
                        if (localReadAmount < writable) {
                            // Read less than what the buffer can hold,
                            // which might mean we drained the recv buffer completely.
                            break;
                        }
                    } while (++ messages < maxMessagesPerRead);
    
                    pipeline.fireChannelReadComplete();
                    allocHandle.record(totalReadAmount);
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close);
                }
            }
        }
    }

    在AbstractNioMessageChannel中接收到的是SocketChannel,所以并没有发生真正的读操作。而AbstractNioByteChannel是真正地从SocketChannel中读,所以这也是申请缓冲区的地方。每次发生读事件时,都会分配一块ByteBuf,然后尝试从Channel中读出数据写到ByteBuf中。之后触发fireChannelRead(),由Pipeline中的Handler继续处理,最终Tail处理器负责释放掉ByteBuf。

        static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                try {
                    logger.debug(
                            "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                    "Please check your pipeline configuration.", msg);
                } finally {
                    ReferenceCountUtil.release(msg);
                }
            }
        }

    4.总结

    这次代码分析一个很直接的目的就是想了解Netty的线程和对象模型,至此已经差不多摸清了。首先,ServerSocketChannel会由一个EventLoop负责轮询接收事件,得到的SocketChannel是交给子Reactor中的一个EventLoop负责轮询读事件。也就是说多个客户端可能会对应一个EventLoop线程。每个SocketChannel注册完毕就会创建Handler,所以说每个客户端都对应自己的Handler实例,并且一直使用到连接断开。

  • 相关阅读:
    CF Round #427 (Div. 2) C. Star sky [dp]
    顺时针打印矩阵
    堆 栈-相关知识【转】
    二叉树的镜像
    树的子结构
    合并两个排序的链表
    数值的整数次方
    位运算:二进制中1的个数
    斐波那契数列及其变形
    重建二叉树
  • 原文地址:https://www.cnblogs.com/xiaomaohai/p/6157601.html
Copyright © 2011-2022 走看看