zoukankan      html  css  js  c++  java
  • Java NIO

    Java NIO

      以前写过一篇Java Socket的用法,不过觉得介绍的不够细致也不够全面,因此今天想在细谈一下Java NIO,也算是对上一篇博客的补充吧。在以前的博客中提到Java NIO的三个核心部分Buffers、Channels、Selectors,这里不再赘述三者之间的关系,接下来我们重点看看这三个核心部分。

    Buffer

      该区域本质是一块可以读写的数据的内存区,这组内存区被包装成NIO Buffer对象,并提供了一组方法,方便访问该块内存。为了更清楚的理解Buffer的工作原理,需要熟悉它的三个属性capacity、position、limit。capacity表示缓冲区大小。而position和limit的含义取决于Buffer处在读模式还是写模式下。在读模式下,position表示开始读的位置,limit表示最后能读的数据位置。在写模式下,position表示当前数据需要写入的位置,最大值为capacity-1。当由写模式切换到读模式时,position=0,limit=position。

      抽象类Buffer具体实现类有ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。接下来我们以ByteBuffer为例来了解一下Buffer的具体用法。

    public class TestByteBuffer {
        public static void test(){
            readFromChannel();   //从channel读取数据到Buffer中
            readFromPut();   //put方法放入数据
        }
        public static void readFromChannel(){
            try {
                RandomAccessFile aFile = new RandomAccessFile("data/byte.txt","rw");
                FileChannel channel = aFile.getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(64);  //设置buffer缓冲区的大小
                int bytesRead = channel.read(buffer);   //read to buffer
                while(bytesRead != -1){
                    System.out.println("write mode position is " + buffer.position());
                    System.out.println("write mode limit is " + buffer.limit());
                    buffer.flip();  //切换到读模式,limit=posit,position=0,
                    System.out.println("Read mode position is " + buffer.position());
                    System.out.println("Read mode limit is " + buffer.limit());
                    while(buffer.hasRemaining()){
                        System.out.print((char)buffer.get());  //1byte的读数据
                    }
                    System.out.println();
                    buffer.clear();  //将position设置为0,limit设置成capacity值
                    bytesRead = channel.read(buffer);
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        public static void readFromPut(){
            ByteBuffer buffer = ByteBuffer.allocate(48);
            for(int i = 0; i < 12; i++){
                buffer.putInt(i);
            }
            buffer.flip();
            while(buffer.hasRemaining()){
                System.out.print(buffer.get());
            }
        }
    }
    View Code

    Channels

      Channel充当的其实是搬运工的角色,它负责把数据搬运到Buffer中,也可以从Buffer中把数据搬运出去。具体的实现Channel接口的类有:FileChannel、DatagramChannel、SocketChannel、ServerSocketChannel。FileChannel从文件中读取数据到缓冲区(已经在Buffer中介绍过了),DatagramChannel能通过UDP读写网络中的数据,SocketChannel能通过TCP读写网络中的数据,ServerSocketChannel可以监听新进来的TCP连接,对每个新进来的连接都会创建一个SocketChannel。如下是利用SocketChannel和ServerSocketChannel实现客户端和服务器端(IP地址192.168.6.42)通信:

    public class Server {
        public static void main(String[] args){
            try {
                //创建一个ServerSocketChannel
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //监听8080端口
                ssc.socket().bind(new InetSocketAddress(8080));
                SocketChannel socketChannel = ssc.accept();  //阻塞,开始监听8080端口
                ByteBuffer buffer = ByteBuffer.allocate(100); //设置buffer的capacity为100
                int readBytes = socketChannel.read(buffer);  //利用channel将数据写入buffer
                while(readBytes != -1){
                    buffer.flip();  //切换为读模式
                    while(buffer.hasRemaining()){ //检查buffer是否读完
                        System.out.print((char)buffer.get());  //1byte的读数据
                    }
                    buffer.clear();  //清空buffer缓冲区
                    readBytes = socketChannel.read(buffer);
                }
                socketChannel.close(); //关闭socketChannel
                ssc.close();  //关闭ServerSocketChannel
                System.out.println("It it over");
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

     客户端程序如下:

    public class TestSocketChannel {
        public static void main(String args){
            SocketChannel socketChannel;
            try {
                //创建SocketChannel
                socketChannel = SocketChannel.open();
                //连接到某台服务器的某个端口
                socketChannel.connect(new InetSocketAddress("192.168.6.42",8080));
                String sendString = "This is a message from client, Please read it carefully. Thanke you very much";
                ByteBuffer buffer = ByteBuffer.wrap(sendString.getBytes());
    socketChannel.write(buffer);
    //关闭通道 socketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

    Selector

      Selector是Java NIO中能够检测到一到多个NIO通道,并能够知晓通道是否为诸如读写时间做好准备的组件。这样就可以实现一个单独的线程可以管理多个Channel,从而管理多个网络连接。我们从如下服务器端和客户端的程序介绍Selector吧。

    客户端程序如下,首先是一个创建SocketChannel的类如下:

    public class TestSocketChannel {
        /**
         * 创建一个SocketChannel,其中指定连接的IP地址和端口号
         */
        public static SocketChannel createSocketChannel(String ip, int port){
            SocketChannel socketChannel = null;
            try {
                //创建SocketChannel
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);  //设置为非阻塞模式
                //连接到某台服务器的某个端口
                socketChannel.connect(new InetSocketAddress(ip,port));
                //判断是否连接完成,若未完成则等待连接
                while(!socketChannel.finishConnect()){
                    System.out.println("It is connecting>>>>>>>>>>>>>>>>>>>>>");
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            //连接完成返回该SocketChannel
            return socketChannel;
        }
    }

    客户端主程序通过调用该类的createSocketChannel()方法创建一个SocketChannel对象,主程序如下:

    public class Main {
        public static void main(String[] args){
            try {
                //创建SocketChannel,连接192.168.6.42服务器的8080端口
                SocketChannel sc8080 = TestSocketChannel.createSocketChannel("192.168.6.42",8080);
                
                //创建SocketChannel,连接192.168.6.42服务器的8090端口
                SocketChannel sc8090 = TestSocketChannel.createSocketChannel("192.168.6.42",8090);
                
                 //创建selector
                Selector selector = Selector.open();
                //向通道注册选择器,并设置selector监听Channel时对读操作感兴趣
                sc8080.register(selector, SelectionKey.OP_READ);
                sc8090.register(selector, SelectionKey.OP_READ);
                //启动线程,监听是否从服务器端有数据传过来
                Thread thread = new Thread(new MyRunnable(selector));
                thread.start();
                //分别向服务器的8080和8090端口发送数据
                sendString(sc8080,"This message is going to send to server 8080 port");
                sendString(sc8090,"This message is going to send to server 8090 port");
                
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        private static void sendString(SocketChannel sc, String str){
            ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
            try {
                //将buffer中的数据写入sc通道
                sc.write(buffer);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    class MyRunnable implements Runnable{
        private Selector selector;
        public MyRunnable(Selector s){
            this.selector =s;
        }
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                while(true){
                    //阻塞2000ms,判断是否有通道在注册的事件上就绪了,如果有则该返回值就绪通道的个数
                    if(selector.select(2000) == 0){
                        System.out.println("please waiting.....");
                        continue;
                    }else{
                        //当有通道就绪时,获取SelectionKey,并遍历
                        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                        while(keys.hasNext()){
                            SelectionKey key = keys.next();
                            //判断通道中是否可读事件就绪了,如果是则isReadable()方法返回TRUE
                            if(key.isReadable()){
                                SocketChannel socketChannel = (SocketChannel) key.channel();
                                //默认服务器端发送的数据都小于1024byte,因此一次可以读完
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                socketChannel.read(buffer);  //利用通道将数据读入buffer中
                                buffer.flip();   //将buffer切换为读模式
                                String receiveString = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                                System.out.println(receiveString);
                                buffer.clear();  //清空缓冲区buffer
                            }
                            //设置通道对什么时间感兴趣,该设置是对“读”和“写”感兴趣
                            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                            //移除当前已经处理过的SelectionKey
                            keys.remove();
                        }
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
    }

    服务器端程序如下:

    public class TestServerSocketChannel {
        public ServerSocketChannel createServerSocketChannel(int port){
            ServerSocketChannel ssc = null;
            try {
                ssc = ServerSocketChannel.open();
                ssc.socket().bind(new InetSocketAddress(port));
                ssc.configureBlocking(false);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return ssc;
        }
    }
    public class Server {
        private static Selector selector = null;
        public static void main(String[] args){
            
            try {
                //创建一个ServerSocketChannel,监听8080端口,非阻塞模式
                ServerSocketChannel ssc8080 = (new TestServerSocketChannel()).createServerSocketChannel(8080);
                //创建一个ServerSocketChannel,监听8090端口,非阻塞模式
                ServerSocketChannel ssc8090 = (new TestServerSocketChannel()).createServerSocketChannel(8090);
                //创建监听器
                selector = Selector.open();
                //向通道注册监听器
                ssc8080.register(selector, SelectionKey.OP_ACCEPT);
                ssc8090.register(selector, SelectionKey.OP_ACCEPT);
                //开启线程,监听客户端发送过来的数据
                Thread thread = new Thread(new MyRunnable());
                thread.start();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        static class MyRunnable implements Runnable {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                while (true) {
                    try {
                        //阻塞3s后判断selector注册的通道是否有就绪的
                        if(selector.select(3000) == 0){
                            System.out.println("正在等待请求......");
                            continue;
                        }else{
                            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                            while (keys.hasNext()) {
                                SelectionKey key = keys.next();
                                //判断是否有新的连接
                                if (key.isAcceptable()) {
                                    HandleRequest.handleAccept(key);
                                } else if (key.isReadable()) { //判断是否有读操作
                                    HandleRequest.handleRead(key);
                                } else if (key.isValid() && key.isWritable()) {  //判断是否对写操作感兴趣
                                    HandleRequest.handleWrite(key);
                                }
                                keys.remove(); // 移除处理过的键
                            }
                        }
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public class HandleRequest {
        //当有新的连接时
        public static void handleAccept(SelectionKey key){
            try {
                //通过SelectionKey对象key创建SocketChannel对象
                SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
                //设置socketChannel为非阻塞模式
                socketChannel.configureBlocking(false);
                //向通道注册选择器和感兴趣事件
                socketChannel.register(key.selector(), SelectionKey.OP_READ);
                //输出数据从服务器的哪个端口传入
                System.out.println("receive data from port:" + socketChannel.socket().getLocalPort());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        public static void handleRead(SelectionKey key){
            SocketChannel socketChannel = (SocketChannel) key.channel();
            System.out.println("receive data from port:" + socketChannel.socket().getLocalPort());
            ByteBuffer buffer = ByteBuffer.allocate(32);
            try {
                int readBytes = socketChannel.read(buffer);
                //输出数据从哪个客户端地址传入
                System.out.println("receive data from " + socketChannel.socket().getRemoteSocketAddress() + ", the data are ");
                //读取缓冲区中的数据
                while(readBytes != 0){
                    buffer.flip();
                    String receiveString = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                    System.out.print(receiveString);
                    buffer.clear();
                    readBytes = socketChannel.read(buffer);
                }
                //更改通道感兴趣的事件为“读操作”和“写操作”
                key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        public static void handleWrite(SelectionKey key){
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer writeBuffer = ByteBuffer.wrap("This message is from server".getBytes());
            try {
                socketChannel.write(writeBuffer);
                System.out.println("The message is writen in channel");
                key.interestOps(SelectionKey.OP_READ);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    哥德尔不完备定理
    关于欧拉公式证明的一个延拓
    关于贝叶斯定理的一个延拓
    贝克莱悖论
    自然数的公理化理论体系定义的新方法
    关于Spring中的<context:annotation-config/>配置
    <mvc:default-servlet-handler/>的作用
    web.xml context-param配置
    Spring JDBC框架操作mysql数据库
    Spring + JDBC example
  • 原文地址:https://www.cnblogs.com/zhanglei93/p/6653923.html
Copyright © 2011-2022 走看看