zoukankan      html  css  js  c++  java
  • Java

    最近学习时碰到事件驱动和非阻塞的相关知识,
    随之想到了Java中的Reactor、io与nio的一些东西;
    在前辈的博客上翻了翻、复习复习,在此记录一番。


    实在找不到比较大点的东西,于是随便弄了个压缩包,大小在1G左右;
    写个程序模拟一下下载,开两个客户端线程请求下载;
    结果会是:一个请求会一直阻塞,直到一个文件下载完成后另一个文件才开始下载。
    先看看服务端的代码:

    
    class DownLoadServer implements Runnable {
        @Override
        public void run() {
            try {
                @SuppressWarnings("resource")
                final ServerSocket ss = new ServerSocket(8989);
                while (true) {
                    Socket server = ss.accept();
                    byte[] bfile = new byte[1024];
                    try {
                        FileInputStream fis = new FileInputStream("D:/doc_backup.rar");
                        OutputStream os = server.getOutputStream();
                        while (fis.read(bfile) > -1) {
                            os.write(bfile);
                        }
                        fis.close();
                        server.close();
                    } catch (IOException e) {
                        System.out.println("server线程输出流我的天");
                    }
                }
            } catch (Exception e) {
                System.out.println("server线程 我的天~");
            }
        }
    }
    
    


    很简单,就是accept后开个inputStream和outputStream,边读边写。

    接着再看看客户端的代码:

    
    class DownlLoadClient implements Runnable {
        @SuppressWarnings("resource")
        @Override
        public void run() {
            try {
                Socket client = new Socket("127.0.0.1", 8989);
                InputStream is = client.getInputStream();
                FileOutputStream fos = new FileOutputStream(
                        "E:/testfolder/langchao" + Thread.currentThread().getId()
                                + ".txt");
                byte[] fromServer = new byte[1024];
                while (is.read(fromServer) > -1) {
                    fos.write(fromServer);
                }
                client.close();
            } catch (IOException e) {
                System.out.println("client线程我的天~");
            }
        }
    }
    
    


    输出的文件名是随便取的,也没什么特别,只是把读过来的输出去。
    结果当然是这个样子的:


    服务端只有一对inputStream和outputStream对象在受理请求,前面的没写完后面的别想写。
    那如果有很多inputStream和outputStream对象受理请求呢?
    想法不错,也就是说把服务端代码改成这样子:

    
    class DownLoadServer implements Runnable {
        @Override
        public void run() {
            try {
                @SuppressWarnings("resource")
                final ServerSocket ss = new ServerSocket(8989);
                while (true) {
                    final Socket server = ss.accept();
                    Thread t = new Thread() {
                        @Override
                        public void run() {
                            super.run();
                            byte[] bfile = new byte[1024];
                            try {
                                FileInputStream fis = new FileInputStream("D:/doc_backup.rar");
                                OutputStream os = server.getOutputStream();
                                while (fis.read(bfile) > -1) {
                                    os.write(bfile);
                                }
                                fis.close();
                                server.close();
                            } catch (IOException e) {
                                System.out.println("server线程输出流我的天");
                            }
                        }
                    };
                    t.start();
                }
            } catch (Exception e) {
                System.out.println("server线程 我的天~");
            }
        }
    }
    
    


    大概就是这个意思,每accept到就为客户端提供"一对一特殊服务";
    嗯,或者也可以算一下获取了多少下载请求,每N次请求开1次"特殊服务"。
    但无论如何都无法回避一个问题——"特殊服务"的成本很高,线程的切换和线程的资源都是开销。
    如果继续按照这个方法做下去,也只能是弄个Thread Pool。
    但如果请求数量超过了pool的maxActive数量,那问题又饶了一圈回来了。


    我们追求低成本高效率,于是早在JDK1.4就有了java.nio;
    nio怎么讲?有说是new io的、也有叫native io,或许叫non-block io...
    概念上也就是channel、buffer、selector、selectionKey...
    先看一下server代码:

    
    System.out.println("server start...");
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.bind(new InetSocketAddress(8989));
    serverChannel.configureBlocking(false);
    Selector sel = Selector.open();
    serverChannel.register(sel, SelectionKey.OP_ACCEPT);
    File file = new File("D:/doc_backup.rar");
    ByteBuffer buffer = ByteBuffer.allocate(100*1024);
    CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    while(true){
        sel.select();
        Iterator selKeyItr = sel.selectedKeys().iterator();
        while(selKeyItr.hasNext()){
            SelectionKey key = selKeyItr.next();
            selKeyItr.remove();
                                                           
            String outputFilePath=StringUtils.EMPTY;
            if(key.isAcceptable()){
                System.out.println("server acceptable");
                SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
                channel.configureBlocking(false);
                                                                
                channel.register(sel, SelectionKey.OP_READ);
            }else if(key.isReadable()){
                System.out.println("server readable");
                SocketChannel channel = (SocketChannel) key.channel();
                channel.configureBlocking(false);
                channel.read(buffer);
                buffer.flip();
                CharBuffer clientBuffer = decoder.decode(buffer);
                outputFilePath = clientBuffer.toString();
                buffer.clear();
                SelectionKey writeKey = channel.register(sel, SelectionKey.OP_WRITE);
            }else if(key.isWritable()){
                                                           
                System.out.println("server writable");
                SocketChannel channel =(SocketChannel) key.channel();
                FileChannel fileChannel = new FileInputStream(file).getChannel();
                ByteBuffer fileByte = ByteBuffer.allocate(1024*100);
                while(fileChannel.read(fileByte)!=-1){
                    fileByte.flip();
                    channel.write(fileByte);
                    fileByte.clear();
                }
    
                channel.register(sel, SelectionKey.OP_READ);
            }
        }
    }
    
    

    代码贴出来有点乱,但也就开一个线程,监听与注册事件。
    select()方法必须,不然client的send根本recv不到。
    socketChannel将blocking设置为false,不然会在事件注册时出现java.nio.channels.IllegalBlockingModeException
    Unchecked exception thrown when a blocking-mode-specific operation is invoked upon a channel in the incorrect blocking mode.
    同样地,在write事件中把blocking设置为true或者使用阻塞的面向流的IO也会出现同样的异常。


    client继承Thread,run method如下:

    
    public void run() {
        try {
            System.out.println("client...");
            SocketAddress addr = new InetSocketAddress(8989);
            SocketChannel client = SocketChannel.open();
            client.configureBlocking(false);
            Selector sel = Selector.open();
            client.register(sel, SelectionKey.OP_CONNECT);
            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();                                                                                                                            
            client.connect(addr);
            while (true) {
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while (selKeyItr.hasNext()) {
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if (key.isConnectable()) {
                        System.out.println("client connectble");
                        SocketChannel channel = (SocketChannel) key.channel();
                        String filePath = "E:/testfolder/channelTest"+Thread.currentThread().getId()+".rar";
                        channel.finishConnect();
                        channel.write(encoder.encode(CharBuffer.wrap(filePath)));
                        channel.register(sel, SelectionKey.OP_READ).attach(filePath);                                                                                                                                  
                    } else if (key.isReadable()) {
                        System.out.println("client readble...");                  
                        SocketChannel channel = (SocketChannel) key.channel();                          
                        if(key.attachment()!=null){
                            @SuppressWarnings("resource")
                            FileChannel fc = new FileOutputStream(key.attachment().toString()).getChannel();
                            ByteBuffer fileByte = ByteBuffer.allocate(1024*100);
                            while(channel.read(fileByte)!=-1){
                                fileByte.flip();
                                fc.write(fileByte);
                                fileByte.clear();
                            }
                        }                                                                                                                                           
                        channel.register(sel, SelectionKey.OP_CONNECT);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    

    在上面代码中的attach()并没有发挥太大用处,attach()可以为selectionKey对象添加任何一个object。
    但仅限一个,若没添加,attachment()会取出null。


    运行后发现事件都获取到了,但文件仍然是一个接一个的下载。
    原因是server触发write事件后创建fileChannel并一次写完。
    事件响应的执行体太大,影响后面的执行。
    非阻塞嘛,要得就是立即返回。
    解决方法是分多次事件去读写,每次事件继续读写上一次事件的缓冲。
    我可以好好使用一下这个attach()了。
    首先我加了一个resolver类,我打算把他的实例加到attachment中去:

    
    class ChannelResolver{
        private FileChannel channel;
        private ByteBuffer buffer;
        private FileInputStream fis;
                                                                                                                                                                                                                                                                                                    
        public ChannelResolver(String filePath){
            try {
                this.fis = new FileInputStream(filePath);
                this.channel = this.fis.getChannel();
                buffer = ByteBuffer.allocate(1024*100);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
                                                                                                                                                                                                                                                                                                    
        ByteBuffer readInto(){
            try {
                buffer.clear();
                int i = channel.read(buffer);
                buffer.flip();
                if(i<0){
                    return null;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return buffer;
        }
    }
    
    

    将channel注册write事件后在return的selectionKey上attach一个实例。
    然后在write事件中获取attachment进行读写:

    
    public void run() {
        System.out.println("server start...");
        ServerSocketChannel serverChannel;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress(8989));
            serverChannel.configureBlocking(false);
            Selector sel = Selector.open();
            serverChannel.register(sel, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(100*1024);
            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
            while(true){
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while(selKeyItr.hasNext()){
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if(key.isAcceptable()){
                        System.out.println("server acceptable");
                        SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
                        channel.configureBlocking(false);
                        channel.register(sel, SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        System.out.println("server readable"+Thread.currentThread().getName());
                        SocketChannel channel = (SocketChannel) key.channel();
                        if(channel.read(buffer)>0){
                            buffer.flip();
                            CharBuffer clientBuffer = decoder.decode(buffer);
                            System.out.println("from client::"+clientBuffer.toString());
                            buffer.clear();
                        }
                        channel.register(sel, SelectionKey.OP_WRITE).attach(new ChannelResolver("D:/doc_backup.rar"));
                    }else if(key.isWritable()){
                        SocketChannel channel =(SocketChannel) key.channel();
                        if(key.attachment()!=null){
                            ChannelResolver resolver = (ChannelResolver)key.attachment();
                            buffer = resolver.readInto();
                            if(buffer!=null){
                                channel.write(buffer);
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    

    在handler的readInto()中已经进行了flip(),在这里就不用再flip()了。
    相应地,client的读取也要改一下:

    
    public void run() {
        try {
            System.out.println("client...");
            SocketAddress addr = new InetSocketAddress(8989);
            SocketChannel client = SocketChannel.open();
            client.configureBlocking(false);
            Selector sel = Selector.open();
            client.register(sel, SelectionKey.OP_CONNECT);
            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
            ByteBuffer buffer= ByteBuffer.allocate(1024*500);
            client.connect(addr);
            while (true) {
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while (selKeyItr.hasNext()) {
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if (key.isConnectable()) {
                        System.out.println("client connectble");
                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.configureBlocking(false);
                        channel.finishConnect();
                        channel.write(encoder.encode(CharBuffer.wrap("start download")));
                        channel.register(sel, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        if(channel.read(buffer)>0){
                            buffer.flip();
                            fc.write(buffer);
                            buffer.clear();
                        }else{
                            channel.close();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    下面引用书本上的一段话:
    [是基于事件驱动思想的,实现上通常采用Reactor模式,从程序角度而言,当发起IO的读写操作时,是非阻塞的;当socket有流可读或可写入socket时,操作系统会相应地通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。对于网络IO而言,主要有连接建立、流读取和流写入三种事件。
    AIO同样基于事件驱动思想,实现上通常采用Proactor模式。从程序角度而言,和NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。
    较之NIO而言,AIO一方面简化了程序的编写,流的读取和写入都由操作系统来代替完成;另一方面省去了NIO中程序要遍历事件通知队列(selector)的代价。windows基于iocp、Linux基于epoll。]

  • 相关阅读:
    算法训练 P1103
    算法训练 表达式计算
    算法训练 表达式计算
    基础练习 时间转换
    基础练习 字符串对比
    Codeforces 527D Clique Problem
    Codeforces 527C Glass Carving
    Codeforces 527B Error Correct System
    Codeforces 527A Glass Carving
    Topcoder SRM 655 DIV1 250 CountryGroupHard
  • 原文地址:https://www.cnblogs.com/kavlez/p/4014257.html
Copyright © 2011-2022 走看看