  • Netty(三): 直接内存原理及应用

      Netty作为一个流行的应用框架,它的强悍之处主要有两点:1. 是性能强悍,可以轻松承载数万并发; 2. 其编程模型简单,容易上手; 这就给大家打开了一扇通向高性能的大门。

      我在rocketmq的一篇文章里阐述过,高性能的核心本质和实现原理。主要就是依赖于操作系统提供的高效io模型和内存控制。有兴趣的可以阅读我来之前的文章: RocketMQ(七):高性能探秘之MappedFile 


    1. 通常的内存模型概述

      一般地,系统为了保证系统本身的安全性和健壮性,会将内存从逻辑上隔离成内核区域和用户区域,这很容易理解。因为用户行为不可控性太强,暴露得太多,就容易导致各种神奇的用法,超出系统的控制范围。当然,有的语言是支持直接控制内存的,比如C, 你可以用一个指针,访问内存中的几乎任意位置的数据(除了一些硬件地址)。而像汇编,则可以访问任意地址。而这些底层的语言,已经离我们越来越远了,它基本上和普通程序员关系不大了。

      用户很多时候的编程控制,都是在用户区域进行的,比如我做一些加减乘除,如 Integer a = 2; Integer b = 3; Integer c = a * b; 这种操作, 所有操作就是在用户空间上完成的。这些操作,不会有内核区域的介入。但是有些操作,则必须由内核进行,比如对文件的读写,就是不同设备之间的数据交换,也就是io类操作。这类操作因为有非常的难度实现,所以一定是由操作系统来完成底层的操作的。那么,第一手的数据必定要经过内核区域。然而我们的代码是跑在用户区的,那么,通常情况下,就会存在内核区数据,拷贝到用户区数据的这么一个过程。这是一个读的过程,而写的过程则是一个相反的操作,从用户区拷贝数据到内核区,然后再由内核完成io操作。




    2. Java中的直接内存原理






        // 创建直接内存空间实例
        ByteBuffer buffer = ByteBuffer.allocateDirect(1600);
        for (int i = 0; i < 90_0000; i++) {
            for (int j = 0; j < 199; j++) {
                // 数据的写入
            for (int j = 0; j < 199; j++) {
                // 数据的读取
            // 数据清理

    3. Netty中使用直接内存


      两个场景:1. 向应用传递网络数据时(读过程); 2. 应用向远端传递数据时(写过程);

        // 写过程,将msg转换为直接内存存储的二进制数据
        // io.netty.handler.codec.MessageToByteEncoder#write
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ByteBuf buf = null;
            try {
                if (acceptOutboundMessage(msg)) {
                    I cast = (I) msg;
                    // 默认 preferDirect = true;
                    buf = allocateBuffer(ctx, cast, preferDirect);
                    try {
                        // 调用子类的实现,编码数据,以便实现私有协议
                        encode(ctx, cast, buf);
                    } finally {
                    if (buf.isReadable()) {
                        // 写数据到远端
                        ctx.write(buf, promise);
                    } else {
                        ctx.write(Unpooled.EMPTY_BUFFER, promise);
                    buf = null;
                } else {
                    ctx.write(msg, promise);
            } catch (EncoderException e) {
                throw e;
            } catch (Throwable e) {
                throw new EncoderException(e);
            } finally {
                if (buf != null) {
        // io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
         * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
         * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
        protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                   boolean preferDirect) throws Exception {
            if (preferDirect) {
                // PooledByteBufAllocator
                return ctx.alloc().ioBuffer();
            } else {
                return ctx.alloc().heapBuffer();
        // io.netty.buffer.AbstractByteBufAllocator#ioBuffer()
        public ByteBuf ioBuffer() {
            if (PlatformDependent.hasUnsafe()) {
                return directBuffer(DEFAULT_INITIAL_CAPACITY);
            return heapBuffer(DEFAULT_INITIAL_CAPACITY);
        // io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
        public ByteBuf directBuffer(int initialCapacity) {
            return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
        public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
            if (initialCapacity == 0 && maxCapacity == 0) {
                return emptyBuf;
            validate(initialCapacity, maxCapacity);
            return newDirectBuffer(initialCapacity, maxCapacity);
        // io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            PoolThreadCache cache = threadCache.get();
            PoolArena<ByteBuffer> directArena = cache.directArena;
            final ByteBuf buf;
            if (directArena != null) {
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            return toLeakAwareBuffer(buf);
        // io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            allocate(cache, buf, reqCapacity);
            return buf;
            // io.netty.buffer.PoolArena.DirectArena#newByteBuf
            protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
                if (HAS_UNSAFE) {
                    return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
                } else {
                    return PooledDirectByteBuf.newInstance(maxCapacity);
        private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
            final int normCapacity = normalizeCapacity(reqCapacity);
            if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
                int tableIdx;
                PoolSubpage<T>[] table;
                boolean tiny = isTiny(normCapacity);
                if (tiny) { // < 512
                    if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                    tableIdx = tinyIdx(normCapacity);
                    table = tinySubpagePools;
                } else {
                    if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                        // was able to allocate out of the cache so move on
                    tableIdx = smallIdx(normCapacity);
                    table = smallSubpagePools;
                final PoolSubpage<T> head = table[tableIdx];
                 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
                 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
                synchronized (head) {
                    final PoolSubpage<T> s = head.next;
                    if (s != head) {
                        assert s.doNotDestroy && s.elemSize == normCapacity;
                        long handle = s.allocate();
                        assert handle >= 0;
                        s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
            if (normCapacity <= chunkSize) {
                if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                synchronized (this) {
                    allocateNormal(buf, reqCapacity, normCapacity);
            } else {
                // Huge allocations are never served via the cache so just call allocateHuge
                allocateHuge(buf, reqCapacity);
        // io.netty.util.internal.PlatformDependent0#newDirectBuffer
        static ByteBuffer newDirectBuffer(long address, int capacity) {
            ObjectUtil.checkPositiveOrZero(capacity, "capacity");
            try {
                return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
            } catch (Throwable cause) {
                // Not expected to ever throw!
                if (cause instanceof Error) {
                    throw (Error) cause;
                throw new Error(cause);

      向ByteBuffer中写入数据过程, 即是向直接内存中写入数据的过程,它可能不像普通的堆对象一样简单咯。

        // io.netty.buffer.AbstractByteBuf#writeBytes(byte[])
        public ByteBuf writeBytes(byte[] src) {
            writeBytes(src, 0, src.length);
            return this;
        public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
            setBytes(writerIndex, src, srcIndex, length);
            writerIndex += length;
            return this;
        // io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes(int, byte[], int, int)
        public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
            // addr() 将会得到一个内存地址
            UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
            return this;
        // io.netty.buffer.PooledUnsafeDirectByteBuf#addr
        private long addr(int index) {
            return memoryAddress + index;
        // io.netty.buffer.UnsafeByteBufUtil#setBytes(io.netty.buffer.AbstractByteBuf, long, int, byte[], int, int)
        static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
            buf.checkIndex(index, length);
            if (length != 0) {
                // 将字节数据copy到DirectByteBuffer中
                PlatformDependent.copyMemory(src, srcIndex, addr, length);
        // io.netty.util.internal.PlatformDependent#copyMemory(byte[], int, long, long)
        public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
            PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
        // io.netty.util.internal.PlatformDependent0#copyMemory(java.lang.Object, long, java.lang.Object, long, long)
        static void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
            //UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, length);
            while (length > 0) {
                long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
                // 最终由jvm的本地方法,进行内存的copy, 此处dst为null, 即数据只会copy到对应的 dstOffset 中
                // 偏移基数就是: 各种基础地址 ARRAY_OBJECT_BASE_OFFSET...
                UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
                length -= size;
                srcOffset += size;
                dstOffset += size;

      可以看到,最后直接内存的写入,是通过 Unsafe 类,对操作系统进行内存数据的写入的。


        // io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
        public ChannelFuture write(final Object msg, final ChannelPromise promise) {
            if (msg == null) {
                throw new NullPointerException("msg");
            try {
                if (isNotValidPromise(promise, true)) {
                    // cancelled
                    return promise;
            } catch (RuntimeException e) {
                throw e;
            write(msg, false, promise);
            return promise;
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
            } else {
                AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                safeExecute(executor, task, promise, m);
        private void invokeWrite(Object msg, ChannelPromise promise) {
            if (invokeHandler()) {
                invokeWrite0(msg, promise);
            } else {
                write(msg, promise);
        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            // io.netty.channel.DefaultChannelPipeline.HeadContext#write
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            // io.netty.channel.AbstractChannel.AbstractUnsafe#write
            public final void write(Object msg, ChannelPromise promise) {
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                    // release message now to prevent resource-leak
                int size;
                try {
                    // 转换msg为直接内存,如有必要
                    msg = filterOutboundMessage(msg);
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                // 将msg放入outboundBuffer中,即相当于写完了数据
                outboundBuffer.addMessage(msg, size, promise);
        // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
        protected final Object filterOutboundMessage(Object msg) {
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                if (buf.isDirect()) {
                    return msg;
                return newDirectBuffer(buf);
            if (msg instanceof FileRegion) {
                return msg;
            throw new UnsupportedOperationException(
                    "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
        // io.netty.channel.ChannelOutboundBuffer#addMessage
         * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
         * the message was written.
        public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
            tailEntry = entry;
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            // 如有必要,立即触发 fireChannelWritabilityChanged 事件,从而使立即向网络写入数据
            incrementPendingOutboundBytes(entry.pendingSize, false);



        // io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
            public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                // 分配创建ByteBuffer, 此处实际就是直接内存的体现
                        byteBuf = allocHandle.allocate(allocator);
                // 将数据读取到ByteBuffer中
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                        readPending = false;
                // 读取到一部分数据,就向pipeline的下游传递,而非全部完成后再传递
                        byteBuf = null;
                    } while (allocHandle.continueReading());
                    if (close) {
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
        // io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate
            public ByteBuf allocate(ByteBufAllocator alloc) {
                return alloc.ioBuffer(guess());
        // io.netty.buffer.AbstractByteBufAllocator#ioBuffer(int)
        public ByteBuf ioBuffer(int initialCapacity) {
            if (PlatformDependent.hasUnsafe()) {
                return directBuffer(initialCapacity);
            return heapBuffer(initialCapacity);


      以上,就是netty对整个直接内存的操作方式了。看起来有点复杂,主要netty到处都是其设计哲学的体现,无论是一个写事件、读事件、或者是状态变更事件,都是一长串的流水线操作。当然了,我们此处讨论的是,其如何使用直接内存的。它通过使用一个 PooledUnsafeDirectByteBuf , 最终引用jdk的 direct = ByteBuffer.allocateDirect(1); 使用 DirectByteBuffer 实现直接内存的使用。并使用其构造方法 DirectByteBuffer(long addr, int cap) 进行直接内存对象创建。

    4. 一点闲话





          另外,内存管理是个非常复杂的问题。 但又很重要,值得我们花大量时间去研究。

