zoukankan      html  css  js  c++  java
  • 盘一盘 NIO (二)—— Channel解析

    Channel是个啥?

    Channel,顾名思义,它就是一个通道。NIO中的所有IO都是从 Channel 开始的。
    Channel通道和流非常类似,主要有以下几点区别:
    1、流是单向的,通道是双向的,可读可写。
    2、流读写是阻塞的,通道可以异步读写。
    3、流中的数据可以选择性的先读到缓存中,通道的数据总是要先读到一个缓存Buffer中,或从缓存Buffer中写入。
     

    继承关系图

    Channel有两种分类方式。一种是按同步Channel和异步Channel划分,还有一种是按功能划分。
    后面我们会主要讲解Channel的几个重要实现,如下所示:
    FileChannel: 从文件中读写数据
    DatagramChannel: 通过UDP读写网络中的
    SocketChannel: 通过TCP读写网络中的,一般是客户端实现
    ServerSocketChannel: 允许我们监听TCP链接请求,每个请求会创建会一个SocketChannel,一般是服务器实现
     
     

    接口方法

    public interface Channel extends Closeable {
        
        // 判断Channel的开关状态
        public boolean isOpen();
    
        // 关闭此Channel
        public void close() throws IOException;
    
    }

    FileChanel

    FileChannel是一个连接到文件的通道,可以通过文件通道读写文件。众所周知文件通道总是阻塞式的,因此FileChannel无法设置为非阻塞模式。
    FileChannel中重要方法,read、write通过其实现类FileChannelImpl实现
        // 从这个通道读入一个字节序列到给定的缓冲区
        public abstract int read(ByteBuffer dst) throws IOException;
    
        // 从这个通道读入指定开始位置和长度的字节序列到给定的缓冲区
        public abstract long read(ByteBuffer[] dsts, int offset, int length)
            throws IOException;
    
        // 从这个通道读入一个字节序列到给定的缓冲区
        public final long read(ByteBuffer[] dsts) throws IOException {
            return read(dsts, 0, dsts.length);
        }
    
    
        // 从给定的缓冲区写入字节序列到这个通道
        public abstract int write(ByteBuffer src) throws IOException;
    
           // 从给定缓冲区的子序列向该信道写入字节序列
        public abstract long write(ByteBuffer[] srcs, int offset, int length)
            throws IOException;
    
           // 从给定的缓冲区写入字节序列到这个通道
        public final long write(ByteBuffer[] srcs) throws IOException {
            return write(srcs, 0, srcs.length);
        }

    FileChannelImpl中read、write方法实现

    // read 方法实现
    public int read(ByteBuffer var1) throws IOException {
        this.ensureOpen();
        if (!this.readable) {
            throw new NonReadableChannelException();
        } else {
            synchronized(this.positionLock) {
                int var3 = 0;
                int var4 = -1;
    
                try {
                    this.begin();
                    var4 = this.threads.add();
                    if (!this.isOpen()) {
                        byte var12 = 0;
                        return var12;
                    } else {
                        do {
                            var3 = IOUtil.read(this.fd, var1, -1L, this.nd);
                        } while(var3 == -3 && this.isOpen());
    
                        int var5 = IOStatus.normalize(var3);
                        return var5;
                    }
                } finally {
                    this.threads.remove(var4);
                    this.end(var3 > 0);
    
                    assert IOStatus.check(var3);
                }
            }
        }
    }
    
    // write方法实现
    public int write(ByteBuffer var1) throws IOException {
        this.ensureOpen();
        if (!this.writable) {
            throw new NonWritableChannelException();
        } else {
            synchronized(this.positionLock) {
                int var3 = 0;
                int var4 = -1;
    
                byte var5;
                try {
                    this.begin();
                    var4 = this.threads.add();
                    if (this.isOpen()) {
                        do {
                            var3 = IOUtil.write(this.fd, var1, -1L, this.nd);
                        } while(var3 == -3 && this.isOpen());
    
    
                        int var12 = IOStatus.normalize(var3);
                        return var12;
                    }
                    var5 = 0;
                } finally {
                    this.threads.remove(var4);
                    this.end(var3 > 0);
    
                    assert IOStatus.check(var3);
                }
                return var5;
            }
        }
    }

    IOUtil中read、write实现:

    //  read方法实现
    static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
            // 申请一块和缓存同大小的ByteBuffer var5
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
    
            int var7;
            try {
                // 读取数据到缓存,底层由NativeDispatcher的read实现。
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
                    // 把数据读取到var1(用户定义的缓存,在jvm中分配内存)
                    var1.put(var5);
                }
    
                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }
    
            return var7;
        }
    }
    
    // write 方法实现
    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            int var5 = var1.position();
            int var6 = var1.limit();
            assert var5 <= var6;
    
            int var7 = var5 <= var6 ? var6 - var5 : 0;
            // 申请一块ByteBuffer,大小为byteBuffer中的limit - position
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
    
            int var10;
            try {
                // 复制byteBuffer中的数据
                var8.put(var1);
                var8.flip();
                var1.position(var5);
    
                // 把数据写入到文件,底层由NativeDispatcher的write实现
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }
                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }
            return var10;
        }
    }
     

    小结

    1、文件通道不能直接创建,只能通过InputStream、OutputStream或RandomAccessFile等创建对应的文件通道
    2、文件通道FileChannel从缓冲区中读取数据,使用read方法
    3、文件通道FileChannel的read方法只能读ByteBuffer缓冲区
     

    DatagramChannel

    DatagramChannel,使用UDP协议来进行传输。由于不需要建立连接,其实没有客户端服务端的概念,为了便于理解,我们定义其中一端为客户端,一端为服务端

    客户端

    public static void main(String[] args) throws Exception {
        // 打开DatagramChannel
        DatagramChannel datagramChannel = DatagramChannel.open();
        // 绑定一个端口发送数据
        ByteBuffer byteBuffer = ByteBuffer.wrap("A".getBytes());
        int byteSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 8000));
        System.out.println("Byte sent is: " + byteSent);
    }

    服务端

    public static void main(String[] args) throws Exception { 
        // 打开DatagramChannel,绑定一个端口
        DatagramChannel datagramChannel = DatagramChannel.open();
        datagramChannel.socket().bind(new InetSocketAddress(8000));
        
        while (true) {
            // 接收数据并输出
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            datagramChannel.receive(byteBuffer);
            byteBuffer.flip();
            if(byteBuffer.hasRemaining()) {
                System.out.print((char) byteBuffer.get());
            }
        }
    }

    ServerSocketChannel和SocketChannel

    ServerSocketChannel是一个可以监听新进来的TCP连接的通道。ServerSocketChannel本身不具备传输数据的能力,而只是负责监听传入的连接和创建新的SocketChannel。
    SocketChannel是一个连接到TCP网络套接字的通道。通常SocketChannel在客户端向服务器发起连接请求,每个SocketChannel对象创建时都关联一个对等的Socket对象。同样SocketChannel也可以运行在非阻塞模式下。
    可以通过以下2种方式创建SocketChannel:
    1、打开一个SocketChannel并连接到互联网上的某台服务器
    2、一个新连接到达ServerSocketChannel时,会创建一个SocketChannel
     

    服务端

    public static void main(String[] args) throws Exception {
        // 服务端首先打开ServerSocketChannel,然后绑定一个端口
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8000));
    
        // 服务端ServerSocketChannel收到连接请求时,返回一个SocketChannel对象
        SocketChannel socketChannel = serverSocketChannel.accept();
        while(true) {
            // 把数据从channel中读出来,然后写入到buffer中然后打印
            ByteBuffer buffer = ByteBuffer.allocate(128);
            socketChannel.read(buffer);
            buffer.flip();
            if(buffer.hasRemaining()) {
                System.out.println((char) buffer.get());
            }
        }
    }

    客户端

    public static void main(String[] args) throws Exception {
        // 客户端建立连接的过程,首先打开SocketChannel,然后连接到服务端
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
    
        //连接是否建立成功 
        boolean isConnect = socketChannel.isConnected();
        
        while (true) {
            // 通过buffer,向channel中写入数据
            ByteBuffer buffer = ByteBuffer.allocate(128);
            buffer.clear();
            buffer.put(("A").getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            Thread.sleep(1000);
        }
    }
  • 相关阅读:
    tomact与eclipces的配置
    IDEA中Flink环境pom.xml文件配置
    python读写mysql
    用pyspark实现Wordcount
    python计算相似度
    Spark读取mysql
    Spark之WordCount
    IDEA中Spark环境pom.xml文件配置
    Spark之HelloWorld
    (转)二叉树的中序遍历与后序遍历构造二叉树思路(递归方法,含图片说明)
  • 原文地址:https://www.cnblogs.com/LemonFive/p/11406408.html
Copyright © 2011-2022 走看看