zoukankan      html  css  js  c++  java
  • Java Nio

    1.简述

      Java NIO是一种同步非阻塞IO(指的是网络编程中客户端与服务端连接交换数据的过程是非阻塞的,普通的文件读写依然是阻塞的。)。NIO是为了弥补IO操作的不足而诞生的,NIO的一些新特性有:非阻塞I/O,选择器,缓冲以及管道。通道(Channel),缓冲(Buffer) ,选择器(Selector)是其主要特征。提供基于缓冲区(buffer)的块写入/读取,而以前的I/O是基于流(Stream)的方式,NIO基于块的IO操作,将最耗时的缓存区读取和填充交由底层操作系统实现,因此速度上要快得多。

    2.NIO三大核心

    (1)Buffer缓冲

      Buffer是一个可以读写数据的内存块,对象里提供了一组方法可以更轻松使用内存块,程序读写数据都必须经过Buffer。Buffer根据数据类型不同(boolean 除外),提供了相应类型的缓冲区。

      Buffer类定义了所有的缓冲区都具有的四个属性

    • position:位置,下一个要被读或写的元素的索引,每次读写缓存区数据时都会改变这个值,为下次读写作准备。
    • limit:缓冲区终点,不能对超过极限的位置进行读写操作,可以修改。
    • capacity:可以容纳的最大数据量,缓冲区创建时就被设定且不可改变。
    • mark:标记。
    • 这4个属性总会满足的关系:mark <= position <= limit <= capacity。
    • position和limit的意义依赖于当前Buffer是处于读模式还是写模式。capacity的含义无论读写模式都是相同的。

      Buffer是一个顶层父类,它是一个抽象类,常用的Buffer的抽象子类有

    • ByteBuffer(常用):存储字节数据到缓冲区
    • IntBuffer:存储整数数据到缓冲区
    • CharBuffer:存储字符数据到缓冲区
    • LongBuffer:存储长整型数据到缓冲区
    • DoubleBuffer:存储高精度小数到缓冲区
    • FloatBuffer:存储小数到缓冲区
    • ShortBuffer:存储字符串数据到缓冲区

      常用方法简介

    /**ByteBuffer类提供了4个静态工厂方法来获得ByteBuffer的实例
         */
        //从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为非直接缓冲区,缓冲区建立在JVM的内存中
        static ByteBuffer allocate(int capacity);
        
        //从堆外内存中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为直接缓冲区,缓冲区建立在操作系统的物理内存(系统磁盘)中
        static ByteBuffer allocateDirect(int capacity);
        
        //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据
        static ByteBuffer wrap(byte[] array);
        
        //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据,offset也就是包装后byteBuffer的position,而length呢就是limit-position的大小,从而我们可以得到limit的位置为length+position(offset)
        static ByteBuffer wrap(byte[] array, int offset, int length);
        
        /**常用的方法
         */
        //flip方法将Buffer从写入模式切换到读取模式,将 position设置回0,并将limit置为刚才的位置。
        final Buffer flip();
        
        //该方法做的很简单就是 position置为0,limit置为capacity,mark置为-1,buffer内容 并没有清空。
        final Buffer clear();
        
        //将所有未读的数据复制到Buffer的开头。
        ByteBuffer compact();
        
        //把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方。
        final Buffer reset();
        
        //把position设为0,mark设为-1,不改变limit的值
        final Buffer rewind();
        
        //limit和position之间相对位置差
        final int remaining();
        
        //是否还有未读内容
        final boolean hasRemaining();
        
        //相对读,从position位置读取一个byte,并将position+1,为下次读写作准备
        byte get();
        
        //绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position
        byte get(int index);
        
        //从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域
        ByteBuffer get(byte[] dst, int offset, int length);
        
        //相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备
        ByteBuffer put(byte b);
        
        //绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position
        ByteBuffer put(int index, byte b);
        
        //相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer
        ByteBuffer put(ByteBuffer src);
        
        //从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer
        ByteBuffer put(byte[] src, int offset, int length);
        
        //获取缓冲区终点
        final int limit();
        
        //设置缓冲区终点
        final Buffer limit(int newLimit);
    View Code

      获得ByteBuffer的实例的示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            //不止需要多大的内容才会产生变化
            System.out.println("----------测试JVM、系统内存创建ByteBuffer--------");
            //JVM
            System.out.println("使用alocate创建前JVM剩余容量:" + Runtime.getRuntime().freeMemory());
            
            ByteBuffer buffer = ByteBuffer.allocate(204800);
            System.out.println("buffer内容:" + buffer);
            
            System.out.println("使用alocate创建后JVM剩余容量:" + Runtime.getRuntime().freeMemory());
            
            //系统内存
            File file = File.listRoots()[0];
            System.out.println("使用allocateDirect创建前系统磁盘剩余容量:" + file.getFreeSpace());
            
            ByteBuffer directBuffer = ByteBuffer.allocateDirect(204800);
            System.out.println("directBuffer内容:" + directBuffer);
            
            file = File.listRoots()[0];
            System.out.println("使用allocateDirect创建后系统磁盘剩余容量:" + file.getFreeSpace());
            
            System.out.println("----------使用wrap创建ByteBuffer--------");
            byte[] bytes = new byte[32];
            buffer = ByteBuffer.wrap(bytes);
            System.out.println("wrap(byte by)创建的内容:" + buffer);
            
            buffer = ByteBuffer.wrap(bytes, 5, 10);
            System.out.println("wrap(byte[] array, int offset, int length)创建的内容:" + buffer);
        }
    }
    View Code

      具体方法简介可以到https://blog.csdn.net/mrliuzhao/article/details/89453082查看

    (2)Channel通道

      Channel通道表示IO源与目标打开的连接,类似于传统的“流”。通道(Channel)不能单独存在,它永远需要绑定一个缓存区(Buffer),所有的数据只会存在于缓存区(Buffer)中,无论你是写或是读,必然是缓存区(Buffer)通过通道(Channel)到达磁盘文件,或是磁盘文件通过通道(Channel)到达缓存区(Buffer)。

      Channel通道和流非常相似,主要有以下几点区别

    • 通道可以读也可以写,流一般来说是单向的(只能读或者写)。
    • 通道可以异步读写。
    • 通道总是基于缓冲区Buffer来读写。

      Channel通道之间的数据传输

    • transferFrom:transferFrom方法把数据从通道源传输到FileChannel
    • transferTo:transferTo方法把FileChannel数据传输到另一个channel

      Channel接口提供的最主要实现类如下

    • FileChannel:用于读取、写入、映射和操作文件。
    • SocketChannel :通过TCP读写网络中的数据,客户端。
    • ServerSocketChannel:可以监听新进来的TCP连接,并对每个链接创建对应的SocketChannel,服务器端。
    • DatagramChannel :通过UDP读写网络中的数据。

      Scatter、Gather多个buffer读写

    • scattering read:是把数据从单个Channel写入到多个buffer。
    • gathering write:把多个buffer的数据写入到同一个channel中。

      FileChannel读写示例如下

    /**
     * 测试类
     */
    public class Data {
        public static void main(String[] args) throws IOException {
            //1.创建文件读写对象,这个类比较陈旧,正式项目中还是使用FileInputStream
            RandomAccessFile file = new RandomAccessFile("d:/Desktop/hello.txt", "rw");
            //2.获取文件读写通道
            FileChannel fileChannel = file.getChannel();
            //3.创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(512);
            /*System.out.println("开始文件写入...");
            buffer.put("FileChannel测试".getBytes("utf-8"));//将内容写入缓冲区
            buffer.flip();//切换缓冲区至可读模式
            fileChannel.write(buffer);//把内容写入文件
            System.out.println("完成文件写入...");*/
            
            //4.读取文件内容填充到缓冲区
            System.out.println("开始文件读取...");
            fileChannel.read(buffer);
            buffer.flip();//切换缓冲区至可写模式
            byte data [] = new byte[buffer.limit()];
            buffer.get(data);//将数据写至 byte数组
            System.out.println(new String(data));
            System.out.println("完成文件读取...");
            
            //关闭文件读写流和通道
            file.close();
            fileChannel.close();
        }
    }
    View Code

      通过上面的示例可以看出,FileChannel必须被打开 ,但是你无法直接打开FileChannel(FileChannel是抽象类)。需要通过InputStream,OutputStream或RandomAccessFile获取FileChannel。  

      SocketChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            //1.通过SocketChannel的open()方法创建一个SocketChannel对象,客户端
            SocketChannel socketChannel = SocketChannel.open();
            //2.连接到远程服务器(连接此通道的socket)
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 3333));
            // 3.创建写数据缓存区对象,往服务器发送数据
            ByteBuffer writeBuffer = ByteBuffer.allocate(128);
            writeBuffer.put("SocketChannel测试".getBytes());
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
            //创建读数据缓存区对象,读取传输数据
            ByteBuffer readBuffer = ByteBuffer.allocate(128);
            socketChannel.read(readBuffer);
            //String 字符串常量,不可变;StringBuffer 字符串变量(线程安全),可变;StringBuilder 字符串变量(非线程安全),可变
            StringBuilder stringBuffer = new StringBuilder();
            //4.将Buffer从写模式变为可读模式
            readBuffer.flip();
            while (readBuffer.hasRemaining()) {
                stringBuffer.append((char) readBuffer.get());
            }
            System.out.println("从服务端接收到的数据:"+stringBuffer);
            socketChannel.close();
        }
    }
    View Code

      通过上面的示例可以看出,SocketChannel用于创建基于tcp协议的客户端对象,因为SocketChannel中不存在accept()方法,所以它不能成为一个服务端程序。通过 connect()方法 ,SocketChannel对象可以连接到其他tcp服务器程序。

      ServerSocketChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            try {
                //1.通过ServerSocketChannel 的open()方法创建一个ServerSocketChannel对象,open方法的作用:打开套接字通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //2.通过ServerSocketChannel绑定ip地址和port(端口号)
                ssc.socket().bind(new InetSocketAddress("127.0.0.1", 3333));
                //通过ServerSocketChannelImpl的accept()方法创建一个SocketChannel对象用户从客户端读/写数据
                SocketChannel socketChannel = ssc.accept();
                //3.创建写数据的缓存区对象
                ByteBuffer writeBuffer = ByteBuffer.allocate(128);
                writeBuffer.put("ServerSocketChannel测试".getBytes());
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
                //创建读数据的缓存区对象
                ByteBuffer readBuffer = ByteBuffer.allocate(128);
                //读取缓存区数据
                socketChannel.read(readBuffer);
                StringBuilder stringBuffer = new StringBuilder();
                //4.将Buffer从写模式变为可读模式
                readBuffer.flip();
                while (readBuffer.hasRemaining()) {
                    stringBuffer.append((char) readBuffer.get());
                }
                System.out.println("从客户端接收到的数据:"+stringBuffer);
                socketChannel.close();
                ssc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

      通过上面的示例可以看出,ServerSocketChannel用于创建基于tcp协议的服务器端对象,ServerSocketChannel允许我们监听TCP链接请求,通过ServerSocketChannelImpl的 accept()方法 可以创建一个SocketChannel对象用户从客户端读/写数据。

      DatagramChannel读写示例如下

    /**
     * 测试类
     */
    public class Test{
        public static void main(String[] args) throws IOException {
            try {
                //1.通过DatagramChannel 的open()方法创建一个DatagramChannel对象
                DatagramChannel channel = DatagramChannel.open();
                channel.socket().bind(new InetSocketAddress(9999));
                
                //2.接收服务器发送的数据
                ByteBuffer buf = ByteBuffer.allocate(1024);
                buf.clear();
                channel.receive(buf);
                
                //3.往指定服务器发送数据
                String newData = "New String to wrte to file...";
                buf.clear();
                buf.put(newData.getBytes());
                buf.flip();
                channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

      通过上面的示例可以看出,类似于java 网络编程的DatagramSocket类。使用UDP进行网络传输, UDP是无连接,面向数据报文段的协议,对传输的数据不保证安全与完整。

    (3)Selector选择器

      Selector称为选择器,当然你也可以翻译为多路复用器。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接

      为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明需要监听的事件。

      一共有4种事件

    • OP_CONNECT:客户端连接服务端事件,对应值8
    • OP_ACCEPT:服务端接收客户端连接事件,对应值16
    • OP_READ:读事件,对应值1
    • OP_WRITE:写事件,对应值4

       这4种事件很好理解,可以理解为每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据。所以,当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行相应的处理。

      Selector、SelectableChannel、SelectionKey的含义

    • Selector(选择器):管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
    • SelectableChannel(可选择通道):这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,哪种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
    • SelectionKey(选择键):选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。

      常用方法:

    /**Selector的常用方法
     */
    //创建一个Selector对象,一定要使用configureBlocking(false)把Channel设置成非阻塞模式,否则会出现异常
    static Selector open();
    
    //获取Selector对象是否为开启
    abstract boolean isOpen();
    
    //是个阻塞方法,有通道就绪才会返回。
    abstract int select();
    
    //最多阻塞timeout毫秒,即使没有通道就绪也会返回,若超时返回,则当前线程中断标志位被设置。若阻塞时间内有通道就绪,就提前返回。
    abstract int select(long timeout);
    
    //是个非阻塞方法,即使没有通道就绪也是立即返回。
    abstract int selectNow();
    
    //获取已选择的键集合
    abstract Set<SelectionKey> selectedKeys();
    
    //该方法让处在阻塞状态的select()方法立刻返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
    abstract Selector wakeup();
    
    //该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。
    abstract void close();
    
    
    /**SelectionKey的常用方法
     */
    //返回此选择键所关联的通道,即使此key已经被取消,仍然会返回。
    abstract SelectableChannel channel();
    
    //返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回。
    abstract Selector selector();
    
    //检测此key是否有效,当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效。
    abstract boolean isValid();
    
    //请求将此键取消注册,一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中,
    abstract void cancel();
    
    //获得此键的interes集合数
    abstract int interestOps();
    
    //将此键的interst设置为指定值,此操作会对ops和channel、validOps进行校验。如果此ops不会当前channel支持,将抛出异常。
    abstract SelectionKey interestOps(int ops);
    
    //获取此键上ready操作集合,即在当前通道上已经就绪的事件。
    abstract int readyOps();
    
    //测试此键的通道是否已完成其套接字连接操作。
    final boolean isconnectable();
    
    //测试此键的通道是否已准备好接受新的套接字连接。
    final boolean isacceptable();
    
    //检测此键是否为read读事件
    final boolean isReadable();
    
    //检测此键是否为write写事件
    final boolean isWritable();
    
    //将给定的对象作为附件添加到此key上,在key有效期间,附件可以在多个ops事件中传递。
    final Object attach(Object ob);
    
    //获取一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期中共享,但是attachment数据不会作为socket数据在网络中传输。
    final Object attachment();
    View Code

     3.NIO的实现示例

      先启动服务器程序,后启动客户端程序,可以看到结果。

    (1)简单示例

      服务端示例如下

    /**
     * 测试类
     */
    public class Server{
        public static void main(String[] args) throws IOException {
            startServer();
        }
    
        public static void startServer() {
            ServerSocketChannel serverChannel = null;
            Selector selector = null;
            try {
                // 1、获取Selector选择器
                selector = Selector.open();
                // 2、获取通道
                serverChannel = ServerSocketChannel.open();
                // 3.设置为非阻塞模式
                serverChannel.configureBlocking(false);
                // 4、绑定连接端口
                serverChannel.bind(new InetSocketAddress(8888));
                // 5、将通道注册到选择器上,并注册的操作为:接收操作
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                System.out.println("NIO服务器创建失败,失败原因为:"+e.getMessage());
                return;
            }
            while (true) {
                try {
                    //6.查询获取准备就绪的注册过的操作,如果等于0则不进行以下操作,因为调用该serverSocket的accept方法,会阻塞
                    if(selector.select() == 0)
                        continue;
                    
                    //7、获取当前选择器中所有注册的选择键(已经准备就绪的操作)
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        // 8、获取准备就绪SelectionKey的相关数据,并移除选择键
                        SelectionKey key = iter.next();
                        iter.remove();
    
                        // 9、判断key是具体的什么事件
                        if (key.isAcceptable()) {
                            // 10、若接受的事件是接收就绪操作,就获取客户端连接
                            SocketChannel socketChannel = serverChannel.accept();
                            // 11、切换为非阻塞模式
                            socketChannel.configureBlocking(false);
                            // 12、将该通道注册到selector选择器上
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            // 13、获取该选择器上的读就绪状态的通道
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            // 14、读取数据
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            String content = "";
                            while (socketChannel.read(buffer) > 0) {
                                buffer.flip();
                                content += decode(buffer);
                                buffer.clear();
                                socketChannel.write(buffer);
                            }
                            System.out.println(content);
                            socketChannel.close();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            //关闭连接,因为此示例即便是异常也不会退出,所以不需要关闭
            /*try {
                serverChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }*/
        }
        
        /**数据字符编码处理
         */
        private static Charset charset = Charset.forName("UTF-8");
        public static String decode(ByteBuffer buffer) {
            try {
                return String.valueOf(charset.newDecoder().decode(buffer));
            } catch (Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
    }
    View Code

      客户端示例如下

    public class Client{
        public static void main(String[] args) {
            // 创建 20 个线程, 跟服务器发起连接
            for (int i = 0; i < 20; i++) {
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        try {
                            Socket socket = new Socket();
                            socket.connect(new InetSocketAddress("127.0.0.1", 8888));
                            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                            writer.write ("当前线程名称为:" + Thread.currentThread().getName());
                            writer.flush ();
                            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
                                System.out.println("接收服务器回复内容为: " + line);
                            }
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace ();
                        }
                    }
                }).start ();
            }
        }
    }
    View Code

    (2)应用实例(简易版)

       服务端示例如下

    /**
     * 测试类
     */
    public class Server{
        public static void main(String[] args) throws IOException {
            NioTcpManager.getInstance().startServerPort();
        }
    }
    /**SelectionKey附件实体,用于存储写入客户端的数据
     */
    class KeyAttach {
        private String sn;//序号,唯一编码
        private int commandType;//命令类型
        private List<byte[]> commands = new ArrayList<byte[]>();//等待写的数据集合
    
        public String getSn() {
            return sn;
        }
    
        public void setSn(String sn) {
            this.sn = sn;
        }
    
        public int getCommandType() {
            return commandType;
        }
    
        public void setCommandType(int commandType) {
            this.commandType = commandType;
        }
        
        public List<byte[]> getCommands() {
            return commands;
        }
    
        public void setCommands(List<byte[]> commands) {
            this.commands = commands;
        }
    }
    /**NIO管理类
     */
    class NioTcpManager implements Runnable {
        private static ServerSocketChannel serverChannel = null;//服务器TCP通道
        private static Selector selector = null;//选择器对象
        private static int PORT = 4042;//tcp监听端口
        
        public static NioTcpManager nioTcpManager = new NioTcpManager();
        private static List<SelectionKey> rpool = new LinkedList<SelectionKey>(); //读取队列
        private static List<SelectionKey> wpool = new LinkedList<SelectionKey>(); //写队列
        
        private NioTcpManager() {
            //创建读线程
            for (int i = 0; i < 10; i++)
                new Thread(new NioTcpReader()).start();
    
            //创建写线程
            for (int i = 0; i < 10; i++)
                new Thread(new NioTcpWriter()).start();
        }
        public static NioTcpManager getInstance() {
            return nioTcpManager;
        }
        
        /**开启TCP端口监听
         */
        public void startServerPort() {
            if (serverChannel != null && serverChannel.isOpen())//检查TCP是否已经开启监听
                return;
            try {
                System.out.println("启动绑定TCP端口...");
                selector = Selector.open();//获取Selector选择器
                serverChannel = ServerSocketChannel.open();//获取通道
                serverChannel.socket().bind(new InetSocketAddress(PORT));//绑定连接端口
                serverChannel.configureBlocking(false);//设置为非阻塞模式
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,注册的操作为:接收操作
                new Thread(this).start();//监听线程启动
            } catch (IOException e) {
                System.out.println("该TCP端口已被使用...");
            }
        }
    
        public void run() {
            System.out.println("启动TCP监听线程...");
            while (true) {
                try {
                    int keyAdded = selector.select();//获取准备就绪且注册过的操作
                    if (keyAdded > 0) {//存在准备就绪操作
                        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取当前选择器中已经准备就绪的选择键
                        while (iter.hasNext()) {//循环已就绪的选择键
                            SelectionKey key = (SelectionKey)iter.next();//取出一个就绪SelectionKey的相关数据
                            iter.remove();//移除已经取出选择键
                            
                            if (key.isAcceptable()) {//是否为接收就绪操作
                                ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();//获取客户端连接
                                SocketChannel socketChannel = serverChannel.accept();//获取连接通道
                                socketChannel.configureBlocking(false);//设置通道为非阻塞模式
                                socketChannel.register(selector, SelectionKey.OP_READ);//将通道注册到选择器上,注册的操作为:读操作
                                System.out.println("客户端发送请求连接,地址: " + socketChannel.getRemoteAddress());
                                selector.wakeup();
                            } else if (key.isReadable()) {//是否为读就绪操作
                                KeyAttach keyAttach = (KeyAttach)key.attachment();//SelectionKey附件实体对象
                                if (keyAttach != null && (!keyAttach.getCommands().isEmpty())) {//判断附件中是否存在需要写入客户端数据
                                    NioTcpWriter.processRequest(key);//添加到TCP写入队列处理
                                    key.cancel();//取消此键的注册
                                } else {
                                    NioTcpReader.processRequest(key);//添加到TCP读取队列处理
                                    key.cancel();//取消此键的注册
                                }
                            } else if (key.isWritable()) {//是否为写就绪操作
                                NioTcpWriter.processRequest(key);//添加到TCP写入队列处理
                                key.cancel();//取消此键的注册
                            }
                        }
                    } else {//不存在准备就绪操作,则注册新的通道
                        addRegister();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        /**添加新的通道注册
         */
        private void addRegister() {
            synchronized (wpool) {
                while (!wpool.isEmpty()) {
                    SelectionKey key = wpool.remove(0);
                    SocketChannel schannel = (SocketChannel) key.channel();
                    try {
                        schannel.register(selector, SelectionKey.OP_WRITE, key.attachment());//将通道注册到选择器上,注册的操作为:写操作
                    } catch (Exception e) {
                        System.out.println("注册写入通道失败...");
                    }
                }
            }
            
            synchronized (rpool) {
                while (!rpool.isEmpty()) {
                    SelectionKey key = rpool.remove(0);
                    SocketChannel schannel = (SocketChannel) key.channel();
                    try {
                        schannel.register(selector, SelectionKey.OP_READ, key.attachment());//将通道注册到选择器上,注册的操作为:读操作
                    } catch (Exception e) {
                        System.out.println("注册读取通道失败...");
                    }
                }
            }
        }
        
        /**添加新的客户端读请求,并唤醒所有等待的读线程
         */
        public static void processReadRequest(SelectionKey key) {
            synchronized (rpool) {
                if (!rpool.contains(key)) {
                    rpool.add(rpool.size(), key);//添加新的SelectionKey到队列中,等待处理
                    rpool.notifyAll();//唤醒所有等待(对象的)读线程
                }
            }
            selector.wakeup();//解除selector的阻塞状态,以便注册新的通道
        }
    
        /**添加新的客户端写请求,并唤醒所有等待的写线程
         */
        public static void processWriteRequest(SelectionKey key) {
            synchronized (wpool) {
                if (!wpool.contains(key)) {
                    wpool.add(wpool.size(), key);//添加新的SelectionKey到队列中,等待处理
                    wpool.notifyAll();//唤醒所有等待(对象的)写线程
                }
            }
            selector.wakeup(); //解除selector的阻塞状态,以便注册新的通道
        }
    }
    /**NIO写入类
     */
    class NioTcpWriter implements Runnable {
        private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
        private boolean busy = true; //当前线程是否正在处理写事务
        
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            if (busy)
                                busy = false;
                            //释放当前的锁,进入等待状态。
                            pool.wait();
                        }
                        if (!busy)
                            busy = true;
                        //从队列中取出一个SelectionKey来进行处理
                        key = pool.remove(0);
                    }
                    //写入数据到客户端
                    write(key);
                } catch (Exception e) {
                    continue;
                }
            }
        }
        /**向客户端发送数据
         */
        public void write(SelectionKey key) {
            //获取SelectionKey中存储的自定义KeyAttach数据
            KeyAttach keyAttach = (KeyAttach) key.attachment();
            if (keyAttach != null && !keyAttach.getCommands().isEmpty()) {//发送命令的数据不为空
                try {
                    //获取通道
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    synchronized (keyAttach.getCommands()) {
                        for (byte[] command : keyAttach.getCommands()) {//循环写
                            //创建缓冲区,并把需要写入客户端数据存入缓冲区
                            ByteBuffer byteBuffer = ByteBuffer.allocate(command.length);
                            byteBuffer.put(command);
                            byteBuffer.flip();
                            //发送命令
                            socketChannel.write(byteBuffer);
                            if (byteBuffer.remaining() == 0)//判断缓存区数据是否写完,写完则清空
                                byteBuffer.clear();
                        }
                    }
                } catch (IOException e) {
                    // e.printStackTrace();
                    System.out.println("写入客户端数据失败...");
                } finally {
                    keyAttach.getCommands().clear();
                }
            }
            NioTcpManager.processReadRequest(key);
        }
    
        /**处理需要往客户端写入的数据队列
         */
        public static void processRequest(SelectionKey key) {
            synchronized (pool) {
                pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理
                pool.notify();//唤醒一个等待(对象的)写线程并使该线程开始执行
            }
        }
    }
    /**NIO读取类
     */
    class NioTcpReader implements Runnable {
        public static NioTcpAnalyzer nioTcpAnalyzer = NioTcpAnalyzer.getInstance();
        private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
        private boolean busy = true; //当前线程是否正在处理读事务
        
        @Override
        public void run() {
            while (true) {
                try {
                    SelectionKey key;
                    synchronized (pool) {
                        while (pool.isEmpty()) {
                            if (busy)
                                busy = false;
                            //释放当前的锁,进入等待状态。
                            pool.wait();
                        }
                        if (!busy)
                            busy = true;
                        //从队列中取出一个SelectionKey来进行处理
                        key = pool.remove(0);
                    }
                    //读取客户端数据
                    read(key);
                } catch (Exception e) {
                    continue;
                }
            }
        }
        
        /**读取客户端发送数据
         */
        public void read(SelectionKey key) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);//创建缓冲区
            SocketChannel socketChannel = (SocketChannel) key.channel();//获取连接通道
            KeyAttach keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据
            try {
                int size = socketChannel.read(buffer);//读取客户端发送内容
                String content = "";
                while (size == 1024) {
                    buffer.flip();
                    content += decode(buffer);
                    buffer.clear();
                    content = nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content);//处理客户端数据
                    size = socketChannel.read(buffer);
                }
                buffer.flip();
                if (size > 0) {//存在未读完数据,但是不满足1024
                    content += decode(buffer);
                    buffer.clear();
                    nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content);
                    //如果不需要回复,则重新注册读事件
                    keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据
                    if (keyAttach == null) {
                        NioTcpManager.processReadRequest(key);
                    } else {
                        synchronized (keyAttach.getCommands()) {
                            if (keyAttach.getCommands().isEmpty()) {
                                NioTcpManager.processReadRequest(key);
                            }
                        }
                    }
                } else if (size < 0)//客户端的数据发送完毕,并且主动的关闭连接,则服务器也移除连接
                    nioTcpAnalyzer.removeChannel(key);//移除客户端连接
            } catch (Exception e) {
                nioTcpAnalyzer.removeChannel(key);//移除客户端连接
                System.out.println(keyAttach.getSn() + "客户端强迫关闭了一个连接 ...");
                //e.printStackTrace();
            }
        }
        
        /**数据编码处理
         */
        private Charset charset = Charset.forName("UTF-8");
        public String decode(ByteBuffer buffer) {
            try {
                return String.valueOf(charset.newDecoder().decode(buffer));
            } catch (Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
        
        /**处理需要读取客户端的数据队列
         */
        public static void processRequest(SelectionKey key) {
            synchronized (pool) {
                pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理
                pool.notify();//唤醒一个等待(对象的)读线程并使该线程开始执行
            }
        }
    }
    /**NIO数据解析类
     */
    class NioTcpAnalyzer {
        // 初始化本类
        public static NioTcpAnalyzer nioTcpAnalyzer = new NioTcpAnalyzer();
        public static NioTcpAnalyzer getInstance() {//获取本类实例
            return nioTcpAnalyzer;
        }
        
        /**解析收到的数据
         */
        public synchronized String analyzerTcpData(SelectionKey key, SocketChannel socketChannel, String content) {
            try {
                //以下为随意弄的一个测试解析,可以根据需求,修改
                System.out.println(content);
                String[] data = content.split(",");
                int commandType = Integer.valueOf(data[1].substring(data[1].length()-1, data[1].length()));//获取命令类型
                if(commandType == 1){
                    // 未识别(未知DataLog序列号)的Socket,全新的socket连接
                    if (key.attachment() == null) {
                        // 给当前通道添加附加对象
                        KeyAttach keyAttach = new KeyAttach();
                        keyAttach.setSn(data[0]);
                        keyAttach.setCommandType(commandType);
                        key.attach(keyAttach);
                    }
                    String info = "服务器回复:"+data[0].substring(3) + "," + data[1] + ",内容:连接正常";
                    writeByteData(info.getBytes("utf-8"), key);//回复客户端数据
                }
            } catch (Exception e) {
            }
            return content;
        }
        
        /**回复数据到对应通道(客户端)
         */
        private void writeByteData(byte[] data, SelectionKey key) {
            KeyAttach keyAttach = (KeyAttach) key.attachment();
            synchronized (keyAttach.getCommands()) {
                keyAttach.getCommands().add(data);
            }
            NioTcpManager.processWriteRequest(key);
        }
        
        /**移除TCP通道连接
         */
        public boolean removeChannel(SelectionKey key) {
            try {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                socketChannel.finishConnect();
                socketChannel.socket().close();
                socketChannel.close();
                key.cancel();
            } catch (Exception e) {
                // e.printStackTrace();
                System.out.println("移除远程连接失败 , 有可能并不存在此连接...");
                return false;
            }
            return true;
        }
    }
    View Code

      客户端示例如下

    public class Client{
        public static void main(String[] args) {
            // 创建 20 个线程, 跟服务器发起连接
            //for (int i = 0; i < 20; i++) {
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        SocketChannel socketChannel = null;
                        Selector selector = null;
    
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        
                        try {
                            socketChannel = SocketChannel.open();
                            socketChannel.configureBlocking(false);
                            socketChannel.connect(new InetSocketAddress("127.0.0.1", 4042));
                            selector = Selector.open();
                            socketChannel.register(selector, SelectionKey.OP_CONNECT);
                            if (socketChannel.finishConnect()) {
                                String info = "客户端" + Thread.currentThread().getName() + ",命令类型:1,内容:连接是否正常";
                                while (true) {
                                    writeBuffer.clear();
                                    writeBuffer.put(info.getBytes("utf-8"));
                                    writeBuffer.flip();
                                    while (writeBuffer.hasRemaining()) {
                                        socketChannel.write(writeBuffer);
                                    }
                                    
                                    int bytesRead = socketChannel.read(readBuffer);
                                    if (bytesRead > 0) {
                                        readBuffer.flip();
                                        byte[] bytes = new byte[bytesRead];
                                        readBuffer.get(bytes, 0, bytesRead);
                                        String str = new String(bytes);
                                        System.out.println(str);
                                        readBuffer.clear();
                                    }
                                    Thread.sleep(2000);
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                if (socketChannel != null) {
                                    socketChannel.close();
                                    System.out.println("服务器为启动...");
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start ();
           // }
        }
    }
    View Code
  • 相关阅读:
    【洛谷P4557】【JSOI2018】—战争(Minkowski Sum)
    【洛谷P4557】【JSOI2018】—战争(Minkowski Sum)
    【BZOJ3879】—SvT(后缀自动机+虚树/后缀自动机+单调栈)
    【BZOJ3879】—SvT(后缀自动机+虚树/后缀自动机+单调栈)
    多测师讲解pyhon__hashlib_高级讲师肖sir
    多测师讲解python函数 _open_高级讲师肖sir
    多测师讲解python函数 _zip_高级讲师肖sir
    多测师讲解内置函数 _format_高级讲师肖sir
    多测师讲解python _函数return_高级讲师肖sir
    多测师讲解python _函数中变量_高级讲师肖sir
  • 原文地址:https://www.cnblogs.com/bl123/p/13803479.html
Copyright © 2011-2022 走看看