zoukankan      html  css  js  c++  java
  • Java Nio

    1.简述

      Java NIO是一种同步非阻塞IO(指的是网络编程中客户端与服务端连接交换数据的过程是非阻塞的,普通的文件读写依然是阻塞的。)。NIO是为了弥补IO操作的不足而诞生的,NIO的一些新特性有:非阻塞I/O,选择器,缓冲以及管道。通道(Channel),缓冲(Buffer) ,选择器(Selector)是其主要特征。提供基于缓冲区(buffer)的块写入/读取,而以前的I/O是基于流(Stream)的方式,NIO基于块的IO操作,将最耗时的缓存区读取和填充交由底层操作系统实现,因此速度上要快得多。

    2.NIO三大核心

    (1)Buffer缓冲

      Buffer是一个可以读写数据的内存块,对象里提供了一组方法可以更轻松使用内存块,程序读写数据都必须经过Buffer。Buffer根据数据类型不同(boolean 除外),提供了相应类型的缓冲区。

      Buffer类定义了所有的缓冲区都具有的四个属性

    • position:位置,下一个要被读或写的元素的索引,每次读写缓存区数据时都会改变这个值,为下次读写作准备。
    • limit:缓冲区终点,不能对超过极限的位置进行读写操作,可以修改。
    • capacity:可以容纳的最大数据量,缓冲区创建时就被设定且不可改变。
    • mark:标记。
    • 这4个属性总会满足的关系:mark <= position <= limit <= capacity。
    • position和limit的意义依赖于当前Buffer是处于读模式还是写模式。capacity的含义无论读写模式都是相同的。

      Buffer是一个顶层父类,它是一个抽象类,常用的Buffer的抽象子类有

    • ByteBuffer(常用):存储字节数据到缓冲区
    • IntBuffer:存储整数数据到缓冲区
    • CharBuffer:存储字符数据到缓冲区
    • LongBuffer:存储长整型数据到缓冲区
    • DoubleBuffer:存储高精度小数到缓冲区
    • FloatBuffer:存储小数到缓冲区
    • ShortBuffer:存储字符串数据到缓冲区

      常用方法简介

    /**ByteBuffer类提供了4个静态工厂方法来获得ByteBuffer的实例
         */
        //从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为非直接缓冲区,缓冲区建立在JVM的内存中
        static ByteBuffer allocate(int capacity);
        
        //从堆外内存中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为直接缓冲区,缓冲区建立在操作系统的物理内存(系统磁盘)中
        static ByteBuffer allocateDirect(int capacity);
        
        //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据
        static ByteBuffer wrap(byte[] array);
        
        //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据,offset也就是包装后byteBuffer的position,而length呢就是limit-position的大小,从而我们可以得到limit的位置为length+position(offset)
        static ByteBuffer wrap(byte[] array, int offset, int length);
        
        /**常用的方法
         */
        //flip方法将Buffer从写入模式切换到读取模式,将 position设置回0,并将limit置为刚才的位置。
        final Buffer flip();
        
        //该方法做的很简单就是 position置为0,limit置为capacity,mark置为-1,buffer内容 并没有清空。
        final Buffer clear();
        
        //将所有未读的数据复制到Buffer的开头。
        ByteBuffer compact();
        
        //把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方。
        final Buffer reset();
        
        //把position设为0,mark设为-1,不改变limit的值
        final Buffer rewind();
        
        //limit和position之间相对位置差
        final int remaining();
        
        //是否还有未读内容
        final boolean hasRemaining();
        
        //相对读,从position位置读取一个byte,并将position+1,为下次读写作准备
        byte get();
        
        //绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position
        byte get(int index);
        
        //从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域
        ByteBuffer get(byte[] dst, int offset, int length);
        
        //相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备
        ByteBuffer put(byte b);
        
        //绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position
        ByteBuffer put(int index, byte b);
        
        //相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer
        ByteBuffer put(ByteBuffer src);
        
        //从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer
        ByteBuffer put(byte[] src, int offset, int length);
        
        //获取缓冲区终点
        final int limit();
        
        //设置缓冲区终点
        final Buffer limit(int newLimit);
    View Code

      获得ByteBuffer的实例的示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            //不止需要多大的内容才会产生变化
            System.out.println("----------测试JVM、系统内存创建ByteBuffer--------");
            //JVM
            System.out.println("使用alocate创建前JVM剩余容量:" + Runtime.getRuntime().freeMemory());
            
            ByteBuffer buffer = ByteBuffer.allocate(204800);
            System.out.println("buffer内容:" + buffer);
            
            System.out.println("使用alocate创建后JVM剩余容量:" + Runtime.getRuntime().freeMemory());
            
            //系统内存
            File file = File.listRoots()[0];
            System.out.println("使用allocateDirect创建前系统磁盘剩余容量:" + file.getFreeSpace());
            
            ByteBuffer directBuffer = ByteBuffer.allocateDirect(204800);
            System.out.println("directBuffer内容:" + directBuffer);
            
            file = File.listRoots()[0];
            System.out.println("使用allocateDirect创建后系统磁盘剩余容量:" + file.getFreeSpace());
            
            System.out.println("----------使用wrap创建ByteBuffer--------");
            byte[] bytes = new byte[32];
            buffer = ByteBuffer.wrap(bytes);
            System.out.println("wrap(byte by)创建的内容:" + buffer);
            
            buffer = ByteBuffer.wrap(bytes, 5, 10);
            System.out.println("wrap(byte[] array, int offset, int length)创建的内容:" + buffer);
        }
    }
    View Code

      具体方法简介可以到https://blog.csdn.net/mrliuzhao/article/details/89453082查看

    (2)Channel通道

      Channel通道表示IO源与目标打开的连接,类似于传统的“流”。通道(Channel)不能单独存在,它永远需要绑定一个缓存区(Buffer),所有的数据只会存在于缓存区(Buffer)中,无论你是写或是读,必然是缓存区(Buffer)通过通道(Channel)到达磁盘文件,或是磁盘文件通过通道(Channel)到达缓存区(Buffer)。

      Channel通道和流非常相似,主要有以下几点区别

    • 通道可以读也可以写,流一般来说是单向的(只能读或者写)。
    • 通道可以异步读写。
    • 通道总是基于缓冲区Buffer来读写。

      Channel通道之间的数据传输

    • transferFrom:transferFrom方法把数据从通道源传输到FileChannel
    • transferTo:transferTo方法把FileChannel数据传输到另一个channel

      Channel接口提供的最主要实现类如下

    • FileChannel:用于读取、写入、映射和操作文件。
    • SocketChannel :通过TCP读写网络中的数据,客户端。
    • ServerSocketChannel:可以监听新进来的TCP连接,并对每个链接创建对应的SocketChannel,服务器端。
    • DatagramChannel :通过UDP读写网络中的数据。

      Scatter、Gather多个buffer读写

    • scattering read:是把数据从单个Channel写入到多个buffer。
    • gathering write:把多个buffer的数据写入到同一个channel中。

      FileChannel读写示例如下

    /**
     * 测试类
     */
    public class Data {
        public static void main(String[] args) throws IOException {
            //1.创建文件读写对象,这个类比较陈旧,正式项目中还是使用FileInputStream
            RandomAccessFile file = new RandomAccessFile("d:/Desktop/hello.txt", "rw");
            //2.获取文件读写通道
            FileChannel fileChannel = file.getChannel();
            //3.创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(512);
            /*System.out.println("开始文件写入...");
            buffer.put("FileChannel测试".getBytes("utf-8"));//将内容写入缓冲区
            buffer.flip();//切换缓冲区至可读模式
            fileChannel.write(buffer);//把内容写入文件
            System.out.println("完成文件写入...");*/
            
            //4.读取文件内容填充到缓冲区
            System.out.println("开始文件读取...");
            fileChannel.read(buffer);
            buffer.flip();//切换缓冲区至可写模式
            byte data [] = new byte[buffer.limit()];
            buffer.get(data);//将数据写至 byte数组
            System.out.println(new String(data));
            System.out.println("完成文件读取...");
            
            //关闭文件读写流和通道
            file.close();
            fileChannel.close();
        }
    }
    View Code

      通过上面的示例可以看出,FileChannel必须被打开 ,但是你无法直接打开FileChannel(FileChannel是抽象类)。需要通过InputStream,OutputStream或RandomAccessFile获取FileChannel。  

      SocketChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            //1.通过SocketChannel的open()方法创建一个SocketChannel对象,客户端
            SocketChannel socketChannel = SocketChannel.open();
            //2.连接到远程服务器(连接此通道的socket)
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 3333));
            // 3.创建写数据缓存区对象,往服务器发送数据
            ByteBuffer writeBuffer = ByteBuffer.allocate(128);
            writeBuffer.put("SocketChannel测试".getBytes());
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
            //创建读数据缓存区对象,读取传输数据
            ByteBuffer readBuffer = ByteBuffer.allocate(128);
            socketChannel.read(readBuffer);
            //String 字符串常量,不可变;StringBuffer 字符串变量(线程安全),可变;StringBuilder 字符串变量(非线程安全),可变
            StringBuilder stringBuffer = new StringBuilder();
            //4.将Buffer从写模式变为可读模式
            readBuffer.flip();
            while (readBuffer.hasRemaining()) {
                stringBuffer.append((char) readBuffer.get());
            }
            System.out.println("从服务端接收到的数据:"+stringBuffer);
            socketChannel.close();
        }
    }
    View Code

      通过上面的示例可以看出,SocketChannel用于创建基于tcp协议的客户端对象,因为SocketChannel中不存在accept()方法,所以它不能成为一个服务端程序。通过 connect()方法 ,SocketChannel对象可以连接到其他tcp服务器程序。

      ServerSocketChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            try {
                //1.通过ServerSocketChannel 的open()方法创建一个ServerSocketChannel对象,open方法的作用:打开套接字通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //2.通过ServerSocketChannel绑定ip地址和port(端口号)
                ssc.socket().bind(new InetSocketAddress("127.0.0.1", 3333));
                //通过ServerSocketChannelImpl的accept()方法创建一个SocketChannel对象用户从客户端读/写数据
                SocketChannel socketChannel = ssc.accept();
                //3.创建写数据的缓存区对象
                ByteBuffer writeBuffer = ByteBuffer.allocate(128);
                writeBuffer.put("ServerSocketChannel测试".getBytes());
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
                //创建读数据的缓存区对象
                ByteBuffer readBuffer = ByteBuffer.allocate(128);
                //读取缓存区数据
                socketChannel.read(readBuffer);
                StringBuilder stringBuffer = new StringBuilder();
                //4.将Buffer从写模式变为可读模式
                readBuffer.flip();
                while (readBuffer.hasRemaining()) {
                    stringBuffer.append((char) readBuffer.get());
                }
                System.out.println("从客户端接收到的数据:"+stringBuffer);
                socketChannel.close();
                ssc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

      通过上面的示例可以看出,ServerSocketChannel用于创建基于tcp协议的服务器端对象,ServerSocketChannel允许我们监听TCP链接请求,通过ServerSocketChannelImpl的 accept()方法 可以创建一个SocketChannel对象用户从客户端读/写数据。

      DatagramChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            try {
                //1.通过DatagramChannel 的open()方法创建一个DatagramChannel对象
                DatagramChannel channel = DatagramChannel.open();
                channel.socket().bind(new InetSocketAddress(9999));
                
                //2.接收服务器发送的数据
                ByteBuffer buf = ByteBuffer.allocate(1024);
                buf.clear();
                channel.receive(buf);
                
                //3.往指定服务器发送数据
                String newData = "New String to wrte to file...";
                buf.clear();
                buf.put(newData.getBytes());
                buf.flip();
                channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

      通过上面的示例可以看出,类似于java 网络编程的DatagramSocket类。使用UDP进行网络传输, UDP是无连接,面向数据报文段的协议,对传输的数据不保证安全与完整。

    (3)Selector选择器

      Selector称为选择器,当然你也可以翻译为多路复用器。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接

      为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明需要监听的事件。

      一共有4种事件

    • OP_CONNECT:客户端连接服务端事件,对应值8
    • OP_ACCEPT:服务端接收客户端连接事件,对应值16
    • OP_READ:读事件,对应值1
    • OP_WRITE:写事件,对应值4

       这4种事件很好理解,可以理解为每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据。所以,当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行相应的处理。

      Selector、SelectableChannel、SelectionKey的含义

    • Selector(选择器):管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
    • SelectableChannel(可选择通道):这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,哪种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
    • SelectionKey(选择键):选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

      常用方法:

    /**Selector的常用方法
     */
    //创建一个Selector对象,一定要使用configureBlocking(false)把Channel设置成非阻塞模式,否则会出现异常
    static Selector open();
    
    //获取Selector对象是否为开启
    abstract boolean isOpen();
    
    //是个阻塞方法,有通道就绪才会返回。
    abstract int select();
    
    //最多阻塞timeout毫秒,即使没有通道就绪也会返回,若超时返回,则当前线程中断标志位被设置。若阻塞时间内有通道就绪,就提前返回。
    abstract int select(long timeout);
    
    //是个非阻塞方法,即使没有通道就绪也是立即返回。
    abstract int selectNow();
    
    //获取已选择的键集合
    abstract Set<SelectionKey> selectedKeys();
    
    //该方法让处在阻塞状态的select()方法立刻返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
    abstract Selector wakeup();
    
    //该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。
    abstract void close();
    
    
    /**SelectionKey的常用方法
     */
    //返回此选择键所关联的通道,即使此key已经被取消,仍然会返回。
    abstract SelectableChannel channel();
    
    //返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回。
    abstract Selector selector();
    
    //检测此key是否有效,当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效。
    abstract boolean isValid();
    
    //请求将此键取消注册,一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中,
    abstract void cancel();
    
    //获得此键的interes集合数
    abstract int interestOps();
    
    //将此键的interst设置为指定值,此操作会对ops和channel、validOps进行校验。如果此ops不会当前channel支持,将抛出异常。
    abstract SelectionKey interestOps(int ops);
    
    //获取此键上ready操作集合,即在当前通道上已经就绪的事件。
    abstract int readyOps();
    
    //测试此键的通道是否已完成其套接字连接操作。
    final boolean isconnectable();
    
    //测试此键的通道是否已准备好接受新的套接字连接。
    final boolean isacceptable();
    
    //检测此键是否为read读事件
    final boolean isReadable();
    
    //检测此键是否为write写事件
    final boolean isWritable();
    
    //将给定的对象作为附件添加到此key上,在key有效期间,附件可以在多个ops事件中传递。
    final Object attach(Object ob);
    
    //获取一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期中共享,但是attachment数据不会作为socket数据在网络中传输。
    final Object attachment();
    View Code

     3.NIO的实现示例

      先启动服务器程序,后启动客户端程序,可以看到结果。

    (1)简单示例

      服务端示例如下

    /**
     * 测试类
     */
    public class Server{
        public static void main(String[] args) throws IOException {
            startServer();
        }
    
        public static void startServer() {
            ServerSocketChannel serverChannel = null;
            Selector selector = null;
            try {
                // 1、获取Selector选择器
                selector = Selector.open();
                // 2、获取通道
                serverChannel = ServerSocketChannel.open();
                // 3.设置为非阻塞模式
                serverChannel.configureBlocking(false);
                // 4、绑定连接端口
                serverChannel.bind(new InetSocketAddress(8888));
                // 5、将通道注册到选择器上,并注册的操作为:接收操作
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                System.out.println("NIO服务器创建失败,失败原因为:"+e.getMessage());
                return;
            }
            while (true) {
                try {
                    //6.查询获取准备就绪的注册过的操作,如果等于0则不进行以下操作,因为调用该serverSocket的accept方法,会阻塞
                    if(selector.select() == 0)
                        continue;
                    
                    //7、获取当前选择器中所有注册的选择键(已经准备就绪的操作)
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        // 8、获取准备就绪SelectionKey的相关数据,并移除选择键
                        SelectionKey key = iter.next();
                        iter.remove();
    
                        // 9、判断key是具体的什么事件
                        if (key.isAcceptable()) {
                            // 10、若接受的事件是接收就绪操作,就获取客户端连接
                            SocketChannel socketChannel = serverChannel.accept();
                            // 11、切换为非阻塞模式
                            socketChannel.configureBlocking(false);
                            // 12、将该通道注册到selector选择器上
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            // 13、获取该选择器上的读就绪状态的通道
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            // 14、读取数据
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            String content = "";
                            while (socketChannel.read(buffer) > 0) {
                                buffer.flip();
                                content += decode(buffer);
                                buffer.clear();
                                socketChannel.write(buffer);
                            }
                            System.out.println(content);
                            socketChannel.close();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            //关闭连接,因为此示例即便是异常也不会退出,所以不需要关闭
            /*try {
                serverChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }*/
        }
        
        /**数据字符编码处理
         */
        private static Charset charset = Charset.forName("UTF-8");
        public static String decode(ByteBuffer buffer) {
            try {
                return String.valueOf(charset.newDecoder().decode(buffer));
            } catch (Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
    }
    View Code

      客户端示例如下

    public class Client{
        public static void main(String[] args) {
            // 创建 20 个线程, 跟服务器发起连接
            for (int i = 0; i < 20; i++) {
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        try {
                            Socket socket = new Socket();
                            socket.connect(new InetSocketAddress("127.0.0.1", 8888));
                            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                            writer.write ("当前线程名称为:" + Thread.currentThread().getName());
                            writer.flush ();
                            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
                                System.out.println("接收服务器回复内容为: " + line);
                            }
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace ();
                        }
                    }
                }).start ();
            }
        }
    }
    View Code

    (2)应用实例(简易版)

       服务端示例如下

    /**
     * 测试类
     */
    public class Server{
        public static void main(String[] args) throws IOException {
            NioTcpManager.getInstance().startServerPort();
        }
    }
    /**SelectionKey附件实体,用于存储写入客户端的数据
     */
    class KeyAttach {
        private String sn;//序号,唯一编码
        private int commandType;//命令类型
        private List<byte[]> commands = new ArrayList<byte[]>();//等待写的数据集合
    
        public String getSn() {
            return sn;
        }
    
        public void setSn(String sn) {
            this.sn = sn;
        }
    
        public int getCommandType() {
            return commandType;
        }
    
        public void setCommandType(int commandType) {
            this.commandType = commandType;
        }
        
        public List<byte[]> getCommands() {
            return commands;
        }
    
        public void setCommands(List<byte[]> commands) {
            this.commands = commands;
        }
    }
    /**NIO管理类
     */
    class NioTcpManager implements Runnable {
        private static ServerSocketChannel serverChannel = null;//服务器TCP通道
        private static Selector selector = null;//选择器对象
        private static int PORT = 4042;//tcp监听端口
        
        public static NioTcpManager nioTcpManager = new NioTcpManager();
        private static List<SelectionKey> rpool = new LinkedList<SelectionKey>(); //读取队列
        private static List<SelectionKey> wpool = new LinkedList<SelectionKey>(); //写队列
        
        private NioTcpManager() {
            //创建读线程
            for (int i = 0; i < 10; i++)
                new Thread(new NioTcpReader()).start();
    
            //创建写线程
            for (int i = 0; i < 10; i++)
                new Thread(new NioTcpWriter()).start();
        }
        public static NioTcpManager getInstance() {
            return nioTcpManager;
        }
        
        /**开启TCP端口监听
         */
        public void startServerPort() {
            if (serverChannel != null && serverChannel.isOpen())//检查TCP是否已经开启监听
                return;
            try {
                System.out.println("启动绑定TCP端口...");
                selector = Selector.open();//获取Selector选择器
                serverChannel = ServerSocketChannel.open();//获取通道
                serverChannel.socket().bind(new InetSocketAddress(PORT));//绑定连接端口
                serverChannel.configureBlocking(false);//设置为非阻塞模式
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,注册的操作为:接收操作
                new Thread(this).start();//监听线程启动
            } catch (IOException e) {
                System.out.println("该TCP端口已被使用...");
            }
        }
    
        public void run() {
            System.out.println("启动TCP监听线程...");
            while (true) {
                try {
                    int keyAdded = selector.select();//获取准备就绪且注册过的操作
                    if (keyAdded > 0) {//存在准备就绪操作
                        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取当前选择器中已经准备就绪的选择键
                        while (iter.hasNext()) {//循环已就绪的选择键
                            SelectionKey key = (SelectionKey)iter.next();//取出一个就绪SelectionKey的相关数据
                            iter.remove();//移除已经取出选择键
                            
                            if (key.isAcceptable()) {//是否为接收就绪操作
                                ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();//获取客户端连接
                                SocketChannel socketChannel = serverChannel.accept();//获取连接通道
                                socketChannel.configureBlocking(false);//设置通道为非阻塞模式
                                socketChannel.register(selector, SelectionKey.OP_READ);//将通道注册到选择器上,注册的操作为:读操作
                                System.out.println("客户端发送请求连接,地址: " + socketChannel.getRemoteAddress());
                                selector.wakeup();
                            } else if (key.isReadable()) {//是否为读就绪操作
                                KeyAttach keyAttach = (KeyAttach)key.attachment();//SelectionKey附件实体对象
                                if (keyAttach != null && (!keyAttach.getCommands().isEmpty())) {//判断附件中是否存在需要写入客户端数据
                                    NioTcpWriter.processRequest(key);//添加到TCP写入队列处理
                                    key.cancel();//取消此键的注册
                                } else {
                                    NioTcpReader.processRequest(key);//添加到TCP读取队列处理
                                    key.cancel();//取消此键的注册
                                }
                            } else if (key.isWritable()) {//是否为写就绪操作
                                NioTcpWriter.processRequest(key);//添加到TCP写入队列处理
                                key.cancel();//取消此键的注册
                            }
                        }
                    } else {//不存在准备就绪操作,则注册新的通道
                        addRegister();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        /**添加新的通道注册
         */
        private void addRegister() {
            synchronized (wpool) {
                while (!wpool.isEmpty()) {
                    SelectionKey key = wpool.remove(0);
                    SocketChannel schannel = (SocketChannel) key.channel();
                    try {
                        schannel.register(selector, SelectionKey.OP_WRITE, key.attachment());//将通道注册到选择器上,注册的操作为:写操作
                    } catch (Exception e) {
                        System.out.println("注册写入通道失败...");
                    }
                }
            }
            
            synchronized (rpool) {
                while (!rpool.isEmpty()) {
                    SelectionKey key = rpool.remove(0);
                    SocketChannel schannel = (SocketChannel) key.channel();
                    try {
                        schannel.register(selector, SelectionKey.OP_READ, key.attachment());//将通道注册到选择器上,注册的操作为:读操作
                    } catch (Exception e) {
                        System.out.println("注册读取通道失败...");
                    }
                }
            }
        }
        
        /**添加新的客户端读请求,并唤醒所有等待的读线程
         */
        public static void processReadRequest(SelectionKey key) {
            synchronized (rpool) {
                if (!rpool.contains(key)) {
                    rpool.add(rpool.size(), key);//添加新的SelectionKey到队列中,等待处理
                    rpool.notifyAll();//唤醒所有等待(对象的)读线程
                }
            }
            selector.wakeup();//解除selector的阻塞状态,以便注册新的通道
        }
    
        /**添加新的客户端写请求,并唤醒所有等待的写线程
         */
        public static void processWriteRequest(SelectionKey key) {
            synchronized (wpool) {
                if (!wpool.contains(key)) {
                    wpool.add(wpool.size(), key);//添加新的SelectionKey到队列中,等待处理
                    wpool.notifyAll();//唤醒所有等待(对象的)写线程
                }
            }
            selector.wakeup(); //解除selector的阻塞状态,以便注册新的通道
        }
    }
    /**NIO写入类
     */
    class NioTcpWriter implements Runnable {
        private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
        private boolean busy = true; //当前线程是否正在处理写事务
        
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            if (busy)
                                busy = false;
                            //释放当前的锁,进入等待状态。
                            pool.wait();
                        }
                        if (!busy)
                            busy = true;
                        //从队列中取出一个SelectionKey来进行处理
                        key = pool.remove(0);
                    }
                    //写入数据到客户端
                    write(key);
                } catch (Exception e) {
                    continue;
                }
            }
        }
        /**向客户端发送数据
         */
        public void write(SelectionKey key) {
            //获取SelectionKey中存储的自定义KeyAttach数据
            KeyAttach keyAttach = (KeyAttach) key.attachment();
            if (keyAttach != null && !keyAttach.getCommands().isEmpty()) {//发送命令的数据不为空
                try {
                    //获取通道
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    synchronized (keyAttach.getCommands()) {
                        for (byte[] command : keyAttach.getCommands()) {//循环写
                            //创建缓冲区,并把需要写入客户端数据存入缓冲区
                            ByteBuffer byteBuffer = ByteBuffer.allocate(command.length);
                            byteBuffer.put(command);
                            byteBuffer.flip();
                            //发送命令
                            socketChannel.write(byteBuffer);
                            if (byteBuffer.remaining() == 0)//判断缓存区数据是否写完,写完则清空
                                byteBuffer.clear();
                        }
                    }
                } catch (IOException e) {
                    // e.printStackTrace();
                    System.out.println("写入客户端数据失败...");
                } finally {
                    keyAttach.getCommands().clear();
                }
            }
            NioTcpManager.processReadRequest(key);
        }
    
        /**处理需要往客户端写入的数据队列
         */
        public static void processRequest(SelectionKey key) {
            synchronized (pool) {
                pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理
                pool.notify();//唤醒一个等待(对象的)写线程并使该线程开始执行
            }
        }
    }
    /**NIO读取类
     */
    class NioTcpReader implements Runnable {
        public static NioTcpAnalyzer nioTcpAnalyzer = NioTcpAnalyzer.getInstance();
        private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
        private boolean busy = true; //当前线程是否正在处理读事务
        
        @Override
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            if (busy)
                                busy = false;
                            //释放当前的锁,进入等待状态。
                            pool.wait();
                        }
                        if (!busy)
                            busy = true;
                        //从队列中取出一个SelectionKey来进行处理
                        key = pool.remove(0);
                    }
                    //读取客户端数据
                    read(key);
                } catch (Exception e) {
                    continue;
                }
            }
        }
        
        /**读取客户端发送数据
         */
        public void read(SelectionKey key) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);//创建缓冲区
            SocketChannel socketChannel = (SocketChannel) key.channel();//获取连接通道
            KeyAttach keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据
            try {
                int size = socketChannel.read(buffer);//读取客户端发送内容
                String content = "";
                while (size == 1024) {
                    buffer.flip();
                    content += decode(buffer);
                    buffer.clear();
                    content = nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content);//处理客户端数据
                    size = socketChannel.read(buffer);
                }
                buffer.flip();
                if (size > 0) {//存在未读完数据,但是不满足1024
                    content += decode(buffer);
                    buffer.clear();
                    nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content);
                    //如果不需要回复,则重新注册读事件
                    keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据
                    if (keyAttach == null) {
                        NioTcpManager.processReadRequest(key);
                    } else {
                        synchronized (keyAttach.getCommands()) {
                            if (keyAttach.getCommands().isEmpty()) {
                                NioTcpManager.processReadRequest(key);
                            }
                        }
                    }
                } else if (size < 0)//客户端的数据发送完毕,并且主动的关闭连接,则服务器也移除连接
                    nioTcpAnalyzer.removeChannel(key);//移除客户端连接
            } catch (Exception e) {
                nioTcpAnalyzer.removeChannel(key);//移除客户端连接
                System.out.println(keyAttach.getSn() + "客户端强迫关闭了一个连接 ...");
                //e.printStackTrace();
            }
        }
        
        /**数据编码处理
         */
        private Charset charset = Charset.forName("UTF-8");
        public String decode(ByteBuffer buffer) {
            try {
                return String.valueOf(charset.newDecoder().decode(buffer));
            } catch (Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
        
        /**处理需要读取客户端的数据队列
         */
        public static void processRequest(SelectionKey key) {
            synchronized (pool) {
                pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理
                pool.notify();//唤醒一个等待(对象的)读线程并使该线程开始执行
            }
        }
    }
    /**NIO数据解析类
     */
    class NioTcpAnalyzer {
        // 初始化本类
        public static NioTcpAnalyzer nioTcpAnalyzer = new NioTcpAnalyzer();
        public static NioTcpAnalyzer getInstance() {//获取本类实例
            return nioTcpAnalyzer;
        }
        
        /**解析收到的数据
         */
        public synchronized String analyzerTcpData(SelectionKey key, SocketChannel socketChannel, String content) {
            try {
                //以下为随意弄的一个测试解析,可以根据需求,修改
                System.out.println(content);
                String[] data = content.split(",");
                int commandType = Integer.valueOf(data[1].substring(data[1].length()-1, data[1].length()));//获取命令类型
                if(commandType == 1){
                    // 未识别(未知DataLog序列号)的Socket,全新的socket连接
                    if (key.attachment() == null) {
                        // 给当前通道添加附加对象
                        KeyAttach keyAttach = new KeyAttach();
                        keyAttach.setSn(data[0]);
                        keyAttach.setCommandType(commandType);
                        key.attach(keyAttach);
                    }
                    String info = "服务器回复:"+data[0].substring(3) + "," + data[1] + ",内容:连接正常";
                    writeByteData(info.getBytes("utf-8"), key);//回复客户端数据
                }
            } catch (Exception e) {
            }
            return content;
        }
        
        /**回复数据到对应通道(客户端)
         */
        private void writeByteData(byte[] data, SelectionKey key) {
            KeyAttach keyAttach = (KeyAttach) key.attachment();
            synchronized (keyAttach.getCommands()) {
                keyAttach.getCommands().add(data);
            }
            NioTcpManager.processWriteRequest(key);
        }
        
        /**移除TCP通道连接
         */
        public boolean removeChannel(SelectionKey key) {
            try {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                socketChannel.finishConnect();
                socketChannel.socket().close();
                socketChannel.close();
                key.cancel();
            } catch (Exception e) {
                // e.printStackTrace();
                System.out.println("移除远程连接失败 , 有可能并不存在此连接...");
                return false;
            }
            return true;
        }
    }
    View Code

      客户端示例如下

    public class Client{
        public static void main(String[] args) {
            // 创建 20 个线程, 跟服务器发起连接
            //for (int i = 0; i < 20; i++) {
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        SocketChannel socketChannel = null;
                        Selector selector = null;
    
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        
                        try {
                            socketChannel = SocketChannel.open();
                            socketChannel.configureBlocking(false);
                            socketChannel.connect(new InetSocketAddress("127.0.0.1", 4042));
                            selector = Selector.open();
                            socketChannel.register(selector, SelectionKey.OP_CONNECT);
                            if (socketChannel.finishConnect()) {
                                String info = "客户端" + Thread.currentThread().getName() + ",命令类型:1,内容:连接是否正常";
                                while (true) {
                                    writeBuffer.clear();
                                    writeBuffer.put(info.getBytes("utf-8"));
                                    writeBuffer.flip();
                                    while (writeBuffer.hasRemaining()) {
                                        socketChannel.write(writeBuffer);
                                    }
                                    
                                    int bytesRead = socketChannel.read(readBuffer);
                                    if (bytesRead > 0) {
                                        readBuffer.flip();
                                        byte[] bytes = new byte[bytesRead];
                                        readBuffer.get(bytes, 0, bytesRead);
                                        String str = new String(bytes);
                                        System.out.println(str);
                                        readBuffer.clear();
                                    }
                                    Thread.sleep(2000);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                if (socketChannel != null) {
                                    socketChannel.close();
                                    System.out.println("服务器为启动...");
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start ();
           // }
        }
    }
    View Code
  • 相关阅读:
    LintCode "Maximum Gap"
    LintCode "Wood Cut"
    LintCode "Expression Evaluation"
    LintCode "Find Peak Element II"
    LintCode "Remove Node in Binary Search Tree"
    LintCode "Delete Digits"
    LintCode "Binary Representation"
    LeetCode "Game of Life"
    LintCode "Coins in a Line"
    LintCode "Word Break"
  • 原文地址:https://www.cnblogs.com/bl123/p/13803479.html
Copyright © 2011-2022 走看看