zoukankan      html  css  js  c++  java
  • netty(一)---服务端源码阅读

    NIO Select 知识

    select 示例代码 :

    //创建 channel 并设置为非阻塞 
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    serverChannel.socket().bind(new InetSocketAddress(port));
    //打开 selector 
    Selector selector = Selector.open();
    //注册到 selector 
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    //还是要循环遍历,主要的就是通过 select()方法找到 SelectionKey ,利用监听的事件类型处理响应的事情
    //其中需要注意那两个注册方法  
    while(true){
        int n = selector.select();
        if (n == 0) continue;
        Iterator ite = this.selector.selectedKeys().iterator();
        while(ite.hasNext()){
            SelectionKey key = (SelectionKey)ite.next();
            if (key.isAcceptable()){
                SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
                clntChan.configureBlocking(false);
                //将选择器注册到连接到的客户端信道,
                //并指定该信道key值的属性为OP_READ,
                //同时为该信道指定关联的附件
                clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
            }
            if (key.isReadable()){
                handleRead(key);
            }
            if (key.isWritable() && key.isValid()){
                handleWrite(key);
            }
            if (key.isConnectable()){
                System.out.println("isConnectable = true");
            }
          ite.remove();
        }
    }
    
    

    源码阅读

    通过上一篇文章我们知道了,netty 实际是由两个 Reactor 组成,前者维护一个 Acceptor 绑定接口,处理客户端的连接,然后再将读写,解码编码工作交给另外一个 Reactor ,我们先来看一下这样一个过程,明白总体的过程后再了解系统逻辑实现的细节。

    两个类的组成 :

    AbstractEventExecutorGroup

    • MultithreadEventExecuteGroup
      • MultithreadEventLoopGroup
        • NioEventLoopGroup

    AbstractChannel

    • AbstractNioChannel
      • AbstractNioByteChannel
        • NioSocketChannel

    重要类 :

    • NioServerSocketChannel : 服务端连接端口,连接的 channel
    • ChannelPipeline : 存放Handler的链
    • NioEventLoop : SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop 负责网络读取,连接和客户端请求接入的 Reactor线程就是 NioEventLoop

    我们直接看一下 bind 方法,AbstractBootstrap 类

        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(String inetHost, int inetPort) {
            return bind(new InetSocketAddress(inetHost, inetPort));
        }
    
    
        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise;
            if (regFuture.isDone()) {
                promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }        
    
    
    
        /**
         *
         * 
         * 
         */
        final ChannelFuture initAndRegister() {
            Channel channel;
            try {
                // NO.1 Acceptor 线程绑定监听端口,等待来自客户端的连接
                channel = createChannel();
            } catch (Throwable t) {
                return VoidChannel.INSTANCE.newFailedFuture(t);
            }
    
            try {
                // No.2 附加属性,添加 Handler 
                init(channel);
            } catch (Throwable t) {
                channel.unsafe().closeForcibly();
                return channel.newFailedFuture(t);
            }
    
            ChannelPromise regFuture = channel.newPromise();
            // No.3 registed 注册连接事件 
            channel.unsafe().register(regFuture);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
    

    init 方法

        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
    
            //下面的ChannelPipeline 存放着一个链表,链表内部放的都是 handler ,p 是父类的 ChannelPipeline 
            // child下面的ChannelPipeline 是 child Reactor 的,这里会判断有没有 handler 有就加进去(我们开头例子中的 handle()方法),没有的话
            //这个方法的尾部会加多一个 ServerBootstrapAcceptor 作为的默认的链表节点
            //那 childHandler 的handler 在哪里加入呢?在 ServerBootstrapAcceptor 中,我们后续会介绍
            ChannelPipeline p = channel.pipeline();
            if (handler() != null) {
                p.addLast(handler());
            }
    
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            //注意这里: ServerBootstrapAcceptor 放到了pipleline 中,
            //后续当接受客户端请求的时候会执行pipleline 中的方法,具体的是在
            //NioMessageUnsafe 的read 方法
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                            currentChildAttrs));
                }
            });
        }
    

    register 方法

            //先会判断是不是NioEventLoop 自身发起的操作,如果是,不存在并发问题,直接使用 Channel注册
            //如果是由其他线程发起,则封装成一个 Task 放入到消息队列中异步执行。此处由于是由 ServerBootstrap 所在
            //线程执行的注册操作,所有应该会封装成 Task 投递到NioEventLoop中执行。 
            @Override
            public final void register(final ChannelPromise promise) {
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        promise.setFailure(t);
                    }
                }
            }
    
    
            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!ensureOpen(promise)) {
                        return;
                    }
                    // No.1 注册逻辑
                    doRegister();
                    registered = true;
                    promise.setSuccess();
                    // No.2 注册成功后,在channelPiple 的 HeaderHandler 和 TailHandler 中流转 (注意 :内部 SelectKey 注册为 0 ,不是 OP_ACCEPT,会在后面修改)
                    pipeline.fireChannelRegistered();
                    // No.3 在channelPiple 的 HeaderHandler 和 TailHandler 中流转 ,HeaderHandler的 read 方法会调用
                    //  SelectKey 修改为 OP_ACCEPT
                    if (isActive()) {
                        pipeline.fireChannelActive();
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    if (!promise.tryFailure(t)) {
                        logger.warn(
                                "Tried to fail the registration promise, but it is complete already. " +
                                        "Swallowing the cause of the registration failure:", t);
                    }
                }
            }
    
    

    initAndRegisted 方法中,我们从方法明就能知道该方法的主要的操作是初始化和注册,是上面的 No.1 2 3 是关于 channel 的操作。 ServerBootstrap 这个类的 createChannel方法

        @Override
        Channel createChannel() {
            // next 方法会获取一个线程来做连接工作 
            EventLoop eventLoop = group().next();
            return channelFactory().newChannel(eventLoop, childGroup);
        }
    
    

    MultithreadEventExecuteGroup 类中的next 方法 ,group() 返回的就是 bossGroup,它的 next 方法用于从线程组中获取可用线程

        @Override
        public EventExecutor next() {
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    
    

    NioServerSocketChannel 在 createChannel()方法中通过反射被创建,同时注册了连接的事件

        /**
         * Create a new instance
         */
        public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
            super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
            config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    加入服务端已经做好了与客户端的连接操作,那么下一步应该到了IO操作应该会到达 workGroup 的 Reactor 中进行读写处理(创建一个新的线程进行处理),服务端处理在 NioEventLoop 类中的run 方法进行的,

    NioEventLoop 的run 方法

        @Override
        protected void run() {
            for (;;) {
                oldWakenUp = wakenUp.getAndSet(false);
                try {
                    if (hasTasks()) {
                        selectNow();
                    } else {
                        select();
    
                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    }
    
                    cancelledKeys = 0;
    
                    final long ioStartTime = System.nanoTime();
                    needsToSelectAgain = false;
                    if (selectedKeys != null) {
                        processSelectedKeysOptimized(selectedKeys.flip());
                    } else {
                        processSelectedKeysPlain(selector.selectedKeys());
                    }
                    final long ioTime = System.nanoTime() - ioStartTime;
    
                    final int ioRatio = this.ioRatio;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }
    
        // 无论是 processSelectedKeysOptimized 方法 还是  processSelectedKeysPlain 都有经过下面这个方法
        // 可以看到一下就是就是 select 方法监听到的读写操作。 
        private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException e) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    

    假如是 read 事件,那么调用 unsafe.read() 方法,会走到 AbstractNioMessageChannel 的内部类 NioMessageUnsafe 的 read 方法

    public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    
        protected AbstractNioMessageChannel(
                Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
            super(parent, eventLoop, ch, readInterestOp);
        }
    
        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioMessageUnsafe();
        }
    
        private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            private void removeReadOp() {
                SelectionKey key = selectionKey();
                int interestOps = key.interestOps();
                if ((interestOps & readInterestOp) != 0) {
                    // only remove readInterestOp if needed
                    key.interestOps(interestOps & ~readInterestOp);
                }
            }
    
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                if (!config().isAutoRead()) {
                    removeReadOp();
                }
    
                final ChannelConfig config = config();
                final int maxMessagesPerRead = config.getMaxMessagesPerRead();
                final boolean autoRead = config.isAutoRead();
                final ChannelPipeline pipeline = pipeline();
                boolean closed = false;
                Throwable exception = null;
                // 这里有个循环
                try {
                    for (;;) {
                        // No.1 
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    exception = t;
                }
    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    // No.2 调用 pipleline里的方法
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                pipeline.fireChannelReadComplete();
    
                if (exception != null) {
                    if (exception instanceof IOException) {
                        // ServerChannel should not be closed even on IOException because it can often continue
                        // accepting incoming connections. (e.g. too many open files)
                        closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                    }
    
                    pipeline.fireExceptionCaught(exception);
                }
    
                if (closed) {
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            }
        }
    
        ...
        ...
    
    

    最终到了 NioServerSocketChannel 的 doReadMessages 方法 。

        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    //childGroup reactor 中开启一个线程让执行读写操作,并将 NioSocketChannel 放入传入的数组中
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    doReadMessages 执行结束后我们会继续继续执行 No.2 处的代码,执行 pipleline 中放入对象的的方法,又上一篇可以知道将会执行
    ServerBootstrapAcceptor的 channnelRead 方法
    
            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                Channel child = (Channel) msg;
                // No.1 将启动时的childHandler 加入到客户端的SocketChannel 的 ChannelPiple中
                // 回头看一下我们开始的例子,这就是  childHandler 加入的地方 
                child.pipeline().addLast(childHandler);
                // No.2 设置客户端SocketChannel的TCP参数
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    try {
                        if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                            logger.warn("Unknown channel option: " + e);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to set a channel option: " + child, t);
                    }
                }
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
                // No.3 
                child.unsafe().register(child.newPromise());
            }
    
    

    参考资料

    • http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html (Reator 模型 )
    • https://www.jianshu.com/p/052035037297
    • https://www.jianshu.com/p/0d497fe5484a
  • 相关阅读:
    关于lockkeyword
    关于多层for循环迭代的效率优化问题
    Android 面试精华题目总结
    Linux基础回想(1)——Linux系统概述
    linux源代码编译安装OpenCV
    校赛热身 Problem C. Sometimes Naive (状压dp)
    校赛热身 Problem C. Sometimes Naive (状压dp)
    校赛热身 Problem B. Matrix Fast Power
    校赛热身 Problem B. Matrix Fast Power
    集合的划分(递推)
  • 原文地址:https://www.cnblogs.com/Benjious/p/11613235.html
Copyright © 2011-2022 走看看