zoukankan      html  css  js  c++  java
  • 在dubbo的一端,看Netty处理数据包,揭网络传输原理

      如今,我们想要开发一个网络应用,那是相当地方便。不过就是引入一个框架,然后设置些参数,然后写写业务代码就搞定了。

      写业务代码自然很重要,但是你知道:

        你的数据是怎么来的吗?通过网络传输过来的呗。

        你知道网络是通过什么方式传输过来的吗?光纤呗,TCP/IP协议呗。

      看起来都难不住我们的同学们,但是,以上问题都不是我们关注的重点,我们今天要关注的是,TCP.IP协议是如何把数据传输到我们的应用服务器,而且准确地交到对应的业务代码手上的?

      我们也不关注TCP协议的三次握手四次挥手,我们只需要确认一点,那就是TCP.IP协议是流式传输的,即数据是源源不断地从客户端传递到服务端的,而应用层是如何知道这些数据是什么的呢?当然这是上层的应用协议要做的事,比如http,smtp,ftp等等。

      抛开其他不说,咱们使用 netty 来开发应用程序时,netty本身就承担了一个高层应用协议的角色,所以,我们可以从它是怎么识别这些传输过来的数据的过程,来一窥应用层协议的端倪。

      其实大的方向都很简单,即客户端使用一种序列化协议将数据序列化,然后通过网络传输到服务端,然后服务端使用相应的反序列化协议,将数据解出来,再交给业务程序就好了。

      所以,看起来好像只是一个序列化反序列化的问题而已。但如果是这样,咱们今天就不用再想这个问题了。

      我们要考虑的是,客户端发送的数据是一次性到达服务端的吗?如果是这样,那太简单了,直接获取数据主好了。但是,如果我们要发送的数据非常大,TCP.IP能支持一下子传输吗?这是不可能的,TCP有一个MSS最大报文长度限制,超过这个之后,就必须进行拆分发送了。(粘包与拆包,太专业了)

    我们来看下netty是如何处理这些相关数据的?

      

    在dubbo中,是如何处利用netty理数据拆分的呢?
    首先,我们看下dubbo创建netty的方式: (主要添加几个编码器解码器,以及handler)

        // org.apache.dubbo.remoting.transport.netty4.NettyServer
        @Override
        protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // FIXME: should we use getTimeout()?
                            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }
        

      其实netty的使用就是这么简单,你只需定义你的协议,你的handler就可以了,其他复杂的底层工作,一概不管!

    我们首先来看netty是如何监听网络数据到来的?(基于 nio 绑定端口连接)

        // io.netty.channel.socket.nio.NioServerSocketChannel
        // 绑定socket服务到 nio channel 上 
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependentVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
        
        @Override
        protected ServerSocketChannel javaChannel() {
            return (ServerSocketChannel) superChannel();
        }

      所以,其实自己写 nio 的 server/client 可能也不会太难吧,但是你要应用的各种异常情况太多,就不见得能把握好了。

    netty 的线程模型是  reactor 模型,有一个事件循环过程

        // io.netty.channel.nio.NioEventLoop
        // eventLoop 扫描事件
        @Override
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            // 处理事件
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    
        // 处理事件
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                // 使用selectKeys进行处理
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com.netty.netty.issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    // ...
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com.netty.netty.issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com.netty.netty.issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com.netty.netty.issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                // 读取数据,由 unsafe 类进行循环数据读取
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
        // io.netty.channel.nio.AbstractNioMessageChannel
        // 处理真正的读数据过程
        private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        // 循环读取数据,将数据读取到 readBuf 中
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
                            // 记录被读取了多少次数据了
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        // 依次调用管道进行处理
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com.netty.netty.issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
        // io.netty.channel.socket.nio.NioServerSocketChannel
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    
        // io.netty.channel.DefaultChannelPipeline
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
        // io.netty.channel.AbstractChannelHandlerContext
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            // 在处理中,则直接调用,否则放入线程池运行
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    // 调用入站处理器读取消息
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                // NOOP
            }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                // NOOP
            }
    
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) throws Exception {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) throws Exception {
                unsafe.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.fireExceptionCaught(cause);
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelUnregistered();
    
                // Remove all handlers sequentially if channel is closed and unregistered.
                if (!channel.isOpen()) {
                    destroy();
                }
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelInactive();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelReadComplete();
    
                readIfIsAutoRead();
            }
    
            private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {
                    channel.read();
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelWritabilityChanged();
            }
        }
    
    
    
        // DefaultChannelPipeline
        // io.netty.channel.AbstractChannelHandlerContext
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
        
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
        
        
            // io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
        // DefaultChannelPipeline 
        @Override
        public final ChannelPipeline addLast(ChannelHandler... handlers) {
            return addLast(null, handlers);
        }
    
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
            if (handlers == null) {
                throw new NullPointerException("handlers");
            }
    
            for (ChannelHandler h: handlers) {
                if (h == null) {
                    break;
                }
                addLast(executor, null, h);
            }
    
            return this;
        }
        
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                // 添加到pipeline的尾部
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
    
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    
    
        // NioEventLoopGroup
        // io.netty.channel.MultithreadEventLoopGroup
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
        
        // io.netty.channel.SingleThreadEventLoop
        @Override
        public ChannelFuture register(Channel channel) {
            return register(new DefaultChannelPromise(channel, this));
        }
        
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            // 此处注册好之后,就会开启另外的线程池来处理数据了
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
        
            // io.netty.channel.AbstractChannel $ AbstractUnsafe
            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
    
        // io.netty.util.concurrent.SingleThreadEventExecutor
        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                // 唤醒下一次接收数据
                wakeup(inEventLoop);
            }
        }
    
        private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    // 把事件放入到另一个线程池处理, 一个阶段处理结束
                    doStartThread();
                }
            }
        }

    开启新的线程处理逻辑

        // 开启新的线程处理逻辑
        // 把事件放入到另一个线程池处理
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    
        @Override
        protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }
        }
        
        
    实际解析数据信息是在 fireChannelRead 时触发的。
    
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
        // 从 inBound 入站链中依次调用 channelRead() 方法
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
            // HeadContext
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
        
        // AbstractChannelHandlerContext
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
        
        // io.netty.handler.codec.ByteToMessageDecoder 
        // 我们对数据的解析由这个类进行处理
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;
                    // 针对多次到来的包,进行重新计算
                    if (first) {
                        cumulation = data;
                    } else {
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    // 调用解码方法
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com.netty.netty.issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    decodeWasNull = !out.insertSinceRecycled();
                    // 如果解析到数据,就会往下一个 InBound 节点传
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * Called once data should be decoded from the given {@link ByteBuf}. This method will call
         * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
         *
         * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param in            the {@link ByteBuf} from which to read data
         * @param out           the {@link List} to which decoded messages should be added
         */
        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                // 只要有可用的数据,会一直循环调用 decode 方法
                while (in.isReadable()) {
                    int outSize = out.size();
    
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
    
                        // Check if this handler was removed before continuing with decoding.
                        // If it was removed, it is not safe to continue to operate on the buffer.
                        //
                        // See:
                        // - https://github.com.netty.netty.issues/4635
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
    
                    int oldInputLength = in.readableBytes();
                    // 调用自行实现的 decode 方法,实现数据的组装
                    // 通过添加多个 pipeline 来实现业务的处理
                    decode(ctx, in, out);
    
                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com.netty.netty.issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable cause) {
                throw new DecoderException(cause);
            }
        }
    
    
        /**
         * Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
         */
        static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
            // 每个解析到的元素都会调用一次 fireChannelRead
            for (int i = 0; i < numElements; i ++) {
                ctx.fireChannelRead(msgs.getUnsafe(i));
            }
        }

    如果自己来写这个组装包的逻辑,可能会是这样的:(仅仅是等到所有数据都到后,再传入下一个处理器即可)

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 4) {
                return;
            }
            in.markReaderIndex();
            int dataLength = in.readInt();
            // 如果整个包还没完整,则等待下次调用
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
    
            Object obj = JSON.parseObject(data, target); 
            out.add(obj);
        }
        

    针对外部多次调入站程序的方法,通过 cumulate 方法组装数据

        // 针对外部多次调入站程序的方法,通过 cumulate 方法组装数据
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;
                    if (first) {
                        cumulation = data;
                    } else {
                        // 合并数据
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com.netty.netty.issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    decodeWasNull = !out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
         */
        public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
            @Override
            public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
                final ByteBuf buffer;
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                        || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain() or if its read-only.
                    //
                    // See:
                    // - https://github.com.netty.netty.issues/2327
                    // - https://github.com.netty.netty.issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                } else {
                    buffer = cumulation;
                }
                buffer.writeBytes(in);
                in.release();
                return buffer;
            }
        };
        

    下面我们来看下 dubbo 是如何进行数据包的组装的呢?(NEED_MORE_INPUT 的应用)

    // Decoder 处理逻辑
        // org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter
        private class InternalDecoder extends ByteToMessageDecoder {
    
            @Override
            protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    
                ChannelBuffer message = new NettyBackedChannelBuffer(input);
    
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    
                try {
                    // decode object.
                    do {
                        int saveReaderIndex = message.readerIndex();
                        Object msg = codec.decode(channel, message);
                        // 只要遇到 NEED_MORE_INPUT 标识,则不会算本次接收完成,等待下一次回调
                        // 此处会先交给一连串的 codec 处理
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            //is it possible to go here ?
                            if (saveReaderIndex == message.readerIndex()) {
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                out.add(msg);
                            }
                        }
                    } while (message.readable());
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ctx.channel());
                }
            }
        }
        
        // org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec
        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int save = buffer.readerIndex();
            MultiMessage result = MultiMessage.create();
            do {
                Object obj = codec.decode(channel, buffer);
                if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                    buffer.readerIndex(save);
                    break;
                } else {
                    result.addMessage(obj);
                    logMessageLength(obj, buffer.readerIndex() - save);
                    save = buffer.readerIndex();
                }
            } while (true);
            if (result.isEmpty()) {
                return Codec2.DecodeResult.NEED_MORE_INPUT;
            }
            if (result.size() == 1) {
                return result.get(0);
            }
            return result;
        }
    
        // org.apache.dubbo.remoting.exchange.codec.ExchangeCodec
        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int readable = buffer.readableBytes();
            // 可以看到,每个包都会有一个包头,只要解析出来,就可以知道它的类型,长度了
            byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
            buffer.readBytes(header);
            return decode(channel, buffer, readable, header);
        }
    
        @Override
        protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
            // check magic number.
            if (readable > 0 && header[0] != MAGIC_HIGH
                    || readable > 1 && header[1] != MAGIC_LOW) {
                int length = header.length;
                if (header.length < readable) {
                    header = Bytes.copyOf(header, readable);
                    buffer.readBytes(header, length, readable - length);
                }
                for (int i = 1; i < header.length - 1; i++) {
                    if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                        buffer.readerIndex(buffer.readerIndex() - header.length + i);
                        header = Bytes.copyOf(header, i);
                        break;
                    }
                }
                return super.decode(channel, buffer, readable, header);
            }
            // check length.
            if (readable < HEADER_LENGTH) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // get data length.
            int len = Bytes.bytes2int(header, 12);
            checkPayload(channel, len);
    
            // 只要数据未达到要求的长度,则返回 NEED_MORE_INPUT
            int tt = len + HEADER_LENGTH;
            if (readable < tt) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // limit input stream.
            ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    
            try {
                return decodeBody(channel, is, header);
            } finally {
                if (is.available() > 0) {
                    try {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Skip input stream " + is.available());
                        }
                        StreamUtils.skipUnusedStream(is);
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
        
        // org.apache.dubbo.remoting.telnet.codec.TelnetCodec
        @SuppressWarnings("unchecked")
        protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException {
            if (isClientSide(channel)) {
                return toString(message, getCharset(channel));
            }
            checkPayload(channel, readable);
            if (message == null || message.length == 0) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            if (message[message.length - 1] == '') { // Windows backspace echo
                try {
                    boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
                    channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name()));
                } catch (RemotingException e) {
                    throw new IOException(StringUtils.toString(e));
                }
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            for (Object command : EXIT) {
                if (isEquals(message, (byte[]) command)) {
                    if (logger.isInfoEnabled()) {
                        logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command)));
                    }
                    channel.close();
                    return null;
                }
            }
    
            boolean up = endsWith(message, UP);
            boolean down = endsWith(message, DOWN);
            if (up || down) {
                LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
                if (CollectionUtils.isEmpty(history)) {
                    return DecodeResult.NEED_MORE_INPUT;
                }
                Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
                Integer old = index;
                if (index == null) {
                    index = history.size() - 1;
                } else {
                    if (up) {
                        index = index - 1;
                        if (index < 0) {
                            index = history.size() - 1;
                        }
                    } else {
                        index = index + 1;
                        if (index > history.size() - 1) {
                            index = 0;
                        }
                    }
                }
                if (old == null || !old.equals(index)) {
                    channel.setAttribute(HISTORY_INDEX_KEY, index);
                    String value = history.get(index);
                    if (old != null && old >= 0 && old < history.size()) {
                        String ov = history.get(old);
                        StringBuilder buf = new StringBuilder();
                        for (int i = 0; i < ov.length(); i++) {
                            buf.append("");
                        }
                        for (int i = 0; i < ov.length(); i++) {
                            buf.append(" ");
                        }
                        for (int i = 0; i < ov.length(); i++) {
                            buf.append("");
                        }
                        value = buf.toString() + value;
                    }
                    try {
                        channel.send(value);
                    } catch (RemotingException e) {
                        throw new IOException(StringUtils.toString(e));
                    }
                }
                return DecodeResult.NEED_MORE_INPUT;
            }
            for (Object command : EXIT) {
                if (isEquals(message, (byte[]) command)) {
                    if (logger.isInfoEnabled()) {
                        logger.info(new Exception("Close channel " + channel + " on exit command " + command));
                    }
                    channel.close();
                    return null;
                }
            }
            byte[] enter = null;
            for (Object command : ENTER) {
                if (endsWith(message, (byte[]) command)) {
                    enter = (byte[]) command;
                    break;
                }
            }
            if (enter == null) {
                return DecodeResult.NEED_MORE_INPUT;
            }
            LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
            Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
            channel.removeAttribute(HISTORY_INDEX_KEY);
            if (CollectionUtils.isNotEmpty(history) && index != null && index >= 0 && index < history.size()) {
                String value = history.get(index);
                if (value != null) {
                    byte[] b1 = value.getBytes();
                    byte[] b2 = new byte[b1.length + message.length];
                    System.arraycopy(b1, 0, b2, 0, b1.length);
                    System.arraycopy(message, 0, b2, b1.length, message.length);
                    message = b2;
                }
            }
            String result = toString(message, getCharset(channel));
            if (result.trim().length() > 0) {
                if (history == null) {
                    history = new LinkedList<String>();
                    channel.setAttribute(HISTORY_LIST_KEY, history);
                }
                if (history.isEmpty()) {
                    history.addLast(result);
                } else if (!result.equals(history.getLast())) {
                    history.remove(result);
                    history.addLast(result);
                    if (history.size() > 10) {
                        history.removeFirst();
                    }
                }
            }
            return result;
        }

      所以,其实 dubbo 实现拆包的方式,也是依赖于 netty, 通过判定数据长度来决定是否包已到齐的。

      同样,根据数据长度,也可以解决粘包问题,因为从头里指定的长度,即可知道数据到哪里时已取完,从而将粘在一起的包分开。

          其实netty中提供了几个开箱即用的拆包方法 FixedLengthFrameDecoderLineBasedFrameDecoderDelimiterBasedFrameDecoderLengthFieldBasedFrameDecoder。望文生义。只是自己实现也并不难,为什么不呢?

      

      以上就是基于netty的TCP数据包的处理问题,也是一个简单的应用层协议处理过程,使我们可以更直接地了解应用层协议的处理过程。

      当然,对于上面的基于数据长度进行数据包判定,会存在一些问题:

        1. 当数据包很大时,将会阻塞其他请求;
        2. 当数据包很大时,将会占用大量内存;
        3. 同一连接中,不可能存在数据包的乱序传输;(TCP是否支持乱序、混合包传输?这是个问题)

      当然,以上协议并不处理这种情况,针对大数据量请求,我们可以在客户端做好分包请求,从而减轻压力。

    唠叨: 看透本质。

  • 相关阅读:
    python测试开发django-39.xadmin详情页面布局form_layout
    Linux学习20-nohup挂后台启动django
    python测试开发django-38.多对多(ManyToManyField)查询
    python测试开发django-37.外键(ForeignKey)查询
    因子分解机模型简介
    Social regularizations
    MathType插入带序号公式的两种方法
    通俗解释遗传算法及其Matlab实现
    矩阵中路径数目问题
    Word绘制跨行表格
  • 原文地址:https://www.cnblogs.com/yougewe/p/11515368.html
Copyright © 2011-2022 走看看