zoukankan      html  css  js  c++  java
  • 二、Java NIO通信基础

    Java NIO属于第三种模型--IO多路复用模型,主要有三个核心组件组成:

    • Channel 通道:在NIO中,同一个网络连接使用一个通道表示,所有的NIO的IO操作都是从这个通道开始的。一个通道类似于OIO中的两个流的结合体,既可以从通道中读取,也可以向通道中写入。
    • Buffer缓冲区:应用程序与通道的主要交互操作就是数据的读写,通道的读取和写入就是将数据与缓冲区的读取和写入。
    • Selector选择器:(什么是IO多路复用?----指的是一个线程/进程可以同时监视多个文件描述符,一旦其中一个或者多个文件描述符可写或者可读,系统内核就通知该线程/进程)Selector是一个事件的查询器,通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。

    一、NIO Buffer类及其属性

           NIO的Buffer本质上是一个内存块,可读也可写。Java中NIO的Buffer类是一个抽象类,其内部是一个内存块(数组)。它提供了一组更加有效的方法来进行写入和读取的交替访问。

    Buffer类在其内部有一个byte[]数组内存块作为内存缓冲区。为了记录读写的状态和位置,有以下

    1.几个重要的属性:

     // Invariants: mark <= position <= limit <= capacity
        private int mark = -1;
        private int position = 0;
        private int limit;
        private int capacity;
    • capacity表示内部容量的大小,Buffer类的capacity一旦初始化就不能再改变了。
    • position表示当前的位置(下一个要被读和写的位置)。与缓冲区的读写模式有关,在读写模式切换时会进行调整。写入时:最初position为0,每当一个数据写入,position会向后移动一个位置,最大可谓limit-1。当position到达limit时,则表示缓冲区已满,无法继续写入。读取时:position重置为0,读取数据后position相应向后移动,当position达到limit是表示无数据可读。 
    • limit表示读写的最大上限。与缓冲区的读写模式有关,写模式:刚写入时limit会被置为capacity值。读模式:flip()方法调用后,limit的值会被置为写模式的position值,position被置为0。表示可以从0开始读取,读取数据大小为limit。
    • mark相当于一个暂存属性,暂时保存position的值,方便后面重复使用position值。调用mark()方法来是指mark=position,在调用reset()可以让position恢复到mark标记的位置。

    2.几个重要的方法:

    • allocate()创建缓冲区。
    • put()写入到缓冲区。写入数据时,position根据写入的数据量向后移动。
    • flip()翻转,向缓冲区写入数据后并不是可以直接读取数据,需要调用该方法,limit被置为原position值,position被置为0,表示从头开始读取。
    • get()从缓冲区读取。
    • rewind()倒带,缓冲区的数据是否可以重复读呢,调用该方法position置为0,limit不变,之前的mark被清理。
    • mark()和reset(),mark先记录当前读取的position值,继续向后读取,当调用reset方法时,postion将被置为刚才保存在mark的旧的position值。读取数据也将从mark值标记的位置开始读取。
    • clear()清空缓冲区,调用该方法缓冲区切换为写入模式,position置为0,limit被置为capacity值。

    3.使用Buffer类的基本步骤

    • 使用创建子类实例对象的allocate()方法,创建Buffer类的实例对象。
    • 调用put()方法,将数据写入到缓冲区。
    • 写入完成后,在开始读取数据之前,调用buffer.flip()方法,将缓冲区变为读模式。
    • 调用get()方法,从缓冲区获取数据。
    • 读取完成后调用clear()或者compact()方法,将缓冲区转换为写入模式。

    一个简单的示例:

    public class IntBufferDemo {
        public static void main(String[] args) {
            IntBuffer buffer = IntBuffer.allocate(5);
            System.out.println("初始化时:"+buffer);
            buffer.put(3);
            buffer.put(2);
            buffer.mark();
            buffer.put(4);
            System.out.println("mark调用后,reset方法之前:"+buffer+"
    值为"+Arrays.toString(buffer.array()));
            buffer.reset();
            buffer.put(5);
            System.out.println("reset方法调用后第三个值被替换"+Arrays.toString(buffer.array()));
            buffer.put(6);
            System.out.println("调用flip方法前:"+buffer);
            buffer.flip();
            System.out.println("调用flip方法后:"+buffer);
            for (int i = 0; i <buffer.limit() ; i++) {
                System.out.println(buffer.get());
                System.out.println("读取第"+(i+1)+"个数据后:"+buffer);
            }
            buffer.clear();
            System.out.println("清空缓存后:"+buffer);
        }
    }

    结果如下:

    初始化时:java.nio.HeapIntBuffer[pos=0 lim=5 cap=5]
    mark调用后,reset方法之前:java.nio.HeapIntBuffer[pos=3 lim=5 cap=5]
    值为[3, 2, 4, 0, 0]
    reset方法调用后第三个值被替换[3, 2, 5, 0, 0]
    调用flip方法前:java.nio.HeapIntBuffer[pos=4 lim=5 cap=5]
    调用flip方法后:java.nio.HeapIntBuffer[pos=0 lim=4 cap=5]
    3
    读取第1个数据后:java.nio.HeapIntBuffer[pos=1 lim=4 cap=5]
    2
    读取第2个数据后:java.nio.HeapIntBuffer[pos=2 lim=4 cap=5]
    5
    读取第3个数据后:java.nio.HeapIntBuffer[pos=3 lim=4 cap=5]
    6
    读取第4个数据后:java.nio.HeapIntBuffer[pos=4 lim=4 cap=5]
    清空缓存后:java.nio.HeapIntBuffer[pos=0 lim=5 cap=5]

    二、NIO Channel通道

    Java中主要的四种Channel为:

    • FileChannel文件通道,用于文件的数据读写。
    • SocketChannel套接字通道,用于Socket套接字TCP连接的数据读写。
    • ServerSocketChannel服务器监听通道,允许我们监听TCP连接请求,为每个监听到的请求创建一个SocketChannel套接字通道。
    • DatagramChannel数据报通道,用于UDP协议的数据读写。

    下面是一个使用FileChannel完成复制文件的简单案例代码:

    public class FileChannelCopyDemo {
        private static final Logger log = LoggerFactory.getLogger(FileChannelCopyDemo.class);
    
        public static void main(String[] args) {
            nioCopyFile();
        }
    
        /**
         * 复制资源文件
         */
        private static void nioCopyFile() {
            long startTime = System.currentTimeMillis();
            File sourceFile = new File("");
            File destFile = new File("");
            try {
                boolean fileExsits = destFile.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
            FileInputStream fileInputStream = null;
            FileOutputStream fileOutputStream = null;
            FileChannel inChannel = null;
            FileChannel outChannel = null;
            try {
                fileInputStream = new FileInputStream(sourceFile);
                fileOutputStream = new FileOutputStream(destFile);
                inChannel=fileInputStream.getChannel();
                outChannel=fileOutputStream.getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //从输入通道读取到buffer
                //以下也可以用  outChannel.transferFrom(inChannel, pos, srcFile.length());替换
                while ((inChannel.read(buffer))!=-1){
                    //第一次切换(buffer读取满了,需要写入到输出通道中),翻转buffer,变成读取模式
                    buffer.flip();
                    int outLength;
                    while ((outLength=outChannel.write(buffer))!=0){
                        System.out.println("写入的字节数:"+outLength);
                    }
                    //第二次切换(数据写完,读取下一部分):清除buffer,变成写入模式
                    buffer.clear();
                }
                //强制刷新到磁盘
                outChannel.force(true);
                long endTime = System.currentTimeMillis();
                log.debug("复制耗时毫秒数:" + (endTime - startTime));
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    fileInputStream.close();
                    fileOutputStream.close();
                    inChannel.close();
                    outChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    三、NIO Selector选择器

    选择器的使命就是完成IO的多路复用。一个通道代表一个连接通道,通过选择器可以同时监控多个通道的IO状况。选择器和t通道的关系,是监控和被监控的关系。选择器提供了一系列API能选择出所监控的通道那些拥有已经准备好的就绪的IO操作事件。

    这里有一个很重要的概念--->SelectionKey选择键:简单来说,SelectionKey就是那些被选择器选中的IO事件,其中包含了Channel。主要有以下四种事件:

    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;

    Tips:注册到选择器的通道必须处于非阻塞模式下,否则将抛出IllegalBlockingModeExpection异常。并且服务器监听通道ServerSocketChannel仅仅支持Accept事件,而SocketChannel传输通道不支持Accept(接收到新链接)IO事件。

    NIO处理的大致流程图如下:

    • ServerSocketChannel会被先注册到Selector上,后台线程会轮询感兴趣的IO事件,即SelectionKey.OP_ACCEPT 事件时,就会自动处理。
    • 当有客户端进行连接时,会被ServerSocketChannel监听到并生成SocketChannel,通过Register()方法将SocketChannel注册到Selector上,注册后返回一个SelectionKey。
    • Selector会调用select方法返回就绪的IO事件,得到SelectionKey并进行处理。

    下面是一个简单的NIO代码:

    服务端代码:

    public class NIOServerDemo {
        public static void main(String[] args) throws IOException {
            //1.获取选择器
            Selector selector = Selector.open();
            //2.获取通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //3.设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            //4.绑定连接
            serverSocketChannel.socket().bind(new InetSocketAddress(8878));
            //5.将通道注册到选择器上,并注册的IO事件为:"接收新链接"
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("注册后的SelectionKey 数量=" + selector.keys().size()); // 1
            //6.选择感兴趣的IO就绪事件(选择键集合)
            //如果返回的>0, 就获取到相关的 selectionKey集合
            while (true) {
                if (selector.select(1000) == 0) {
                    //无事件发生,返回继续等待
                    continue;
                }
                //7.获取选择键集合
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    //8.获取单个的选择键并处理
                    SelectionKey key = iterator.next();
                    //9.判断key是什么事件,并处理
                    if (key.isAcceptable()) {
                        //10.如果是新链接事件,就获取客户端新链接
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = server.accept();
                        if (socketChannel == null) continue;
                        //11.客户端新链接切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //12.将客户端新链接注册到选择器上
                        SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("客户端连接后 ,注册的SelectionKey 数量=" + selector.keys().size());
                    } else if (key.isReadable()) {
                        //通过key 反向获取到对应channel
                        SocketChannel channel = (SocketChannel) key.channel();
                        //获取到该channel关联的buffer
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        //此处连接关闭会造成死循环??Tips
                        if (channel.read(byteBuffer) > -1) {
                            Charset charset = Charset.forName("UTF-8");
                            String message = new String(byteBuffer.array(), 0, byteBuffer.limit(), charset);
                            System.out.println("从客户端收到消息:" + message);
                        }else{
                            System.out.println("远程连接关闭");
                            key.cancel();
                        }
                    }
    
                    iterator.remove();
                }
            }
        }
    }

    客户端可以用Telnet进行测试,下面是个简单的Java代码:

    public class NIOClientDemo {
        public static void main(String[] args) throws IOException {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.socket().connect(new InetSocketAddress("127.0.0.1", 8878));
            socketChannel.configureBlocking(false);
            while (!socketChannel.finishConnect()) {
                //等待链接完成
            }
            System.out.println("成功连接服务器");
            //如果连接成功,就发送数据
            String str = "hello, world";
            ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
            //发送数据,将 buffer 数据写入 channel
            socketChannel.write(buffer);
            socketChannel.shutdownOutput();
            socketChannel.close();
        }
    }

    四、基于NIO的简单群聊系统

    业务逻辑如下:服务端代码做转发功能,某一个客户端发送消息到服务端后,服务端将消息转发给所有在线的其他用户。

    服务端代码如下:

    public class GroupChatServer {
        //定义属性
        private Selector selector;
        private ServerSocketChannel listenChannel;
        private static final int PORT = 6667;
    
        //初始化工作
        private GroupChatServer() {
            try {
                //得到选择器
                selector = Selector.open();
                //ServerSocketChannel
                listenChannel = ServerSocketChannel.open();
                //绑定端口
                listenChannel.socket().bind(new InetSocketAddress(PORT));
                //设置非阻塞模式
                listenChannel.configureBlocking(false);
                //将该listenChannel 注册到selector
                listenChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        //监听
        private void listen() {
            System.out.println("服务端已启动监听");
            try {
                //循环处理
                while (true) {
                    int count = selector.select();
                    if (count > 0) {//有事件处理
                        //遍历得到selectionKey 集合
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            //取出selectionKey
                            SelectionKey key = iterator.next();
    
                            //监听到accept
                            if (key.isAcceptable()) {
                                SocketChannel sc = listenChannel.accept();
                                sc.configureBlocking(false);
                                //将该 sc 注册到selector
                                sc.register(selector, SelectionKey.OP_READ);
                                //提示
                                System.out.println(sc.getRemoteAddress() + " 上线 ");
                            }
                            if (key.isReadable()) { //通道发送read事件,即通道是可读的状态
                                //处理读 (专门写方法..)
                                readData(key);
                            }
                            //当前的key 删除,防止重复处理
                            iterator.remove();
                        }
                    } else {
                        System.out.println("等待....");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //发生异常处理....
                System.out.println("服务端发生异常");
            }
        }
    
        //读取客户端消息
        private void readData(SelectionKey key) {
            //取到关联的channel
            SocketChannel channel = null;
            try {
                //得到channel
                channel = (SocketChannel) key.channel();
                //创建buffer
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = channel.read(buffer);
                //根据count的值做处理
                if (count > 0) {
                    //把缓存区的数据转成字符串
                    String msg = new String(buffer.array());
                    //输出该消息
                    System.out.println("form 客户端: " + msg);
                    //向其它的客户端转发消息(去掉自己)
                    sendInfoToOtherClients(msg, channel);
                }
            } catch (IOException e) {
                try {
                    System.out.println(channel.getRemoteAddress() + " 离线了..");
                    //取消注册
                    key.cancel();
                    //关闭通道
                    channel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
            }
        }
    
        //转发消息给其它客户(通道)
        private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
            System.out.println("服务器转发消息");
            //遍历 所有注册到selector 上的 SocketChannel,并排除 self
            for (SelectionKey key : selector.keys()) {
                //通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                //排除自己
                if (targetChannel instanceof SocketChannel && targetChannel != self) {
                    SocketChannel dest = (SocketChannel) targetChannel;
                    //将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    //将buffer 的数据写入 通道
                    dest.write(buffer);
                }
            }
        }
    
        public static void main(String[] args) {
            //创建服务器对象
            GroupChatServer groupChatServer = new GroupChatServer();
            groupChatServer.listen();
        }
    }

    客户端代码:

    public class GroupChatClient {
    
        private Selector selector;
        private SocketChannel socketChannel;
        private String username;
    
        //构造器, 完成初始化工作
        private GroupChatClient() throws IOException {
    
            selector = Selector.open();
            //连接服务器
            socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6667));
            //设置非阻塞
            socketChannel.configureBlocking(false);
            //将channel 注册到selector
            socketChannel.register(selector, SelectionKey.OP_READ);
            //得到username
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + " is ok...");
        }
    
        //向服务器发送消息
        private void sendInfo(String info) {
            info = username + " 说:" + info;
            try {
                socketChannel.write(ByteBuffer.wrap(info.getBytes()));
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        //读取从服务器端回复的消息
        private void readInfo() {
            try {
                int readChannels = selector.select();
                if(readChannels > 0) {//有可以用的通道
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
    
                        SelectionKey key = iterator.next();
                        if(key.isReadable()) {
                            //得到相关的通道
                            SocketChannel sc = (SocketChannel) key.channel();
                            //得到一个Buffer
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //读取
                            sc.read(buffer);
                            //把读到的缓冲区的数据转成字符串
                            String msg = new String(buffer.array());
                            System.out.println(msg.trim());
                        }
                    }
                    iterator.remove();
                } else {
                    //System.out.println("没有可以用的通道...");
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
            //启动客户端
            GroupChatClient chatClient = new GroupChatClient();
            //启动一个线程读取服务器发送数据
            new Thread(chatClient::readInfo).start();
            //发送数据给服务器端
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String s = scanner.nextLine();
                chatClient.sendInfo(s);
            }
        }
    }

    效果如下,

    服务器端:

    服务端启动监听 
    /127.0.0.1:58965 上线 
    /127.0.0.1:58977 上线 
    form 客户端: 127.0.0.1:58965 说:你好 
    服务器转发消息
    form 客户端: 127.0.0.1:58977 说:你也好

    客户端1:

    127.0.0.1:58965 is ok...
    你好
    127.0.0.1:58977 说:你也好

    客户端2:

    127.0.0.1:58977 is ok...
    127.0.0.1:58965 说:你好
    你也好
  • 相关阅读:
    BZOJ3575 [Hnoi2014]道路堵塞
    BZOJ4456/UOJ184 [Zjoi2016]旅行者
    BZOJ4455/UOJ185 [Zjoi2016]小星星
    BZOJ1036 [ZJOI2008]树的统计Count
    BZOJ2594 [Wc2006]水管局长数据加强版
    BZOJ3669/UOJ3 魔法森林(LCT)
    BZOJ1012:[JSOI2008]最大数
    洛谷【P1175】表达式的转换
    HDU4699:Editor
    BZOJ3039:玉蟾宫
  • 原文地址:https://www.cnblogs.com/demo-alen/p/13547225.html
Copyright © 2011-2022 走看看