zoukankan      html  css  js  c++  java
  • Netty零拷贝技术在RocketMQ中的实践

     

    零拷贝技术

    实现零拷贝有2种方式实现

    1 mmap+write

    系统调用函数会直接把内核缓冲区里的数据「映射」到用户空间,这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝操作。

     

    public static void mappedByteBufferTest() {
            try (RandomAccessFile randomAccessFile = new RandomAccessFile("netty/src/main/resources/1.txt", "rw");) {
    
                final FileChannel channel = randomAccessFile.getChannel();
                /**
                 * 参数1 FileChannel.MapMode.READ_WRITE 读写模式
                 * 参数2 0: 可以直接修改的起始位置
                 * 参数3 5: 是映射到内存的大小,即将1.txt的多少个字节映射到内存
                 * 实际类型:directByteBuffer
                 */
                final MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    
                map.put(0, (byte) 'H');
                map.put(3, (byte) '9');
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    2 sendfile

     可以直接把内核缓冲区里的数据拷贝到 socket 缓冲区里,不再拷贝到用户态。

    FileChannel类型
    
    public abstract long transferTo(long position, long count, WritableByteChannel target)throws IOException;
    

      

    netty零拷贝实现

    CompositeByteBuf 

    • 指在 Java 之上(user space)允许 CompositeByteBuf 使用单个 ByteBuf 一样操作多个 ByteBuf 而不需要任何 copy。
    • 以及允许使用slice,切分单个ByteBuf为多个,而实际上操作的还是同一个ByteBuf,不需要cotpy。
    package netty.zerocopy;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    
    import java.nio.charset.StandardCharsets;
    
    /**
     * netty 零拷贝
     * @author huangyichun
     * @date 2020/12/9
     */
    public class CompositeDemo {
    
        public static void main(String[] args) {
            ByteBuf buf1 = Unpooled.copiedBuffer("hello, world", StandardCharsets.UTF_8);
    
            ByteBuf buf2 = Unpooled.copiedBuffer("let's go", StandardCharsets.UTF_8);
    
            ByteBuf compositeBuf = Unpooled.wrappedBuffer(buf1, buf2);
    
            compositeBuf.setBytes(1, "my name".getBytes());
    
            System.out.println(buf1.toString(StandardCharsets.UTF_8));
            System.out.println(buf2.toString(StandardCharsets.UTF_8));
            System.out.println(compositeBuf.toString(StandardCharsets.UTF_8));
        }
    }
    

      

     FileRegion 

      如果你所在的系统支持 zero copy,则可以使用 FileRegion 来写入 Channel,实际是就是调用上文Nio零拷贝中的transferTo方法进行传输。

     public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            RandomAccessFile raf = null;
    
            long length = -1;
            try {
                raf = new RandomAccessFile(msg, "r");
                length = raf.length();
            } catch (Exception e) {
                ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '
    ');
                return;
            } finally {
                if (length < 0 && raf != null) {
                    raf.close();
                }
            }
    
            ctx.write("OK: " + raf.length() + '
    ');
            if (ctx.pipeline().get(SslHandler.class) == null) {
                // SSL not enabled - can use zero-copy file transfer.
                ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));
            } else {
                // SSL enabled - cannot use zero-copy file transfer.
                ctx.write(new ChunkedFile(raf));
            }
            ctx.writeAndFlush("
    ");
        }

    RocketMQ使用FileRegion实现零拷贝

     在broker的拉取消息处理器PullMessageProcessor中,如果不使用堆内存,则使用Netty提供的零拷贝方案:

    FileRegion fileRegion =
               new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
               
    channel.writeAndFlush(fileRegion).addListener(
    new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause()); } } });

    创建 ManyMessageTransfer   extends AbstractReferenceCounted implements FileRegion ,具体看一下transferTo方法

        @Override
        public long transferTo(WritableByteChannel target, long position) throws IOException {
            if (this.byteBufferHeader.hasRemaining()) {
                transferred += target.write(this.byteBufferHeader);
                return transferred;
            } else {
                List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
                for (ByteBuffer bb : messageBufferList) {
                    if (bb.hasRemaining()) {
                        transferred += target.write(bb);
                        return transferred;
                    }
                }
            }
    
            return 0;
        }

    实际上,将多个缓冲区直接写入到socketchannel里面,避免了在内存中copy到一个缓冲区。

    Ref:https://www.jianshu.com/p/3f9f56235d49

  • 相关阅读:
    2.谈谈算法
    1.数据结构和算法笔记
    初次使用博客
    Unity中关于在一个场景中使用多个摄像机
    基于unity的单例设计模式写法
    unity3D读取Txt文件中信息
    转载雨松的unity中使用ITween插件和ITweenPath
    Unity3D游戏开发之数据持久化PlayerPrefs的使用
    [转载]Unity3d更改3d Text的字体的材质球的shader,使字体不显示
    C#写的Socket Server端在unity运行时和关闭时没事,但是在打开直接unity崩溃问题
  • 原文地址:https://www.cnblogs.com/gaojy/p/15074253.html
Copyright © 2011-2022 走看看