zoukankan      html  css  js  c++  java
  • Java

    最近学习时碰到事件驱动和非阻塞的相关知识,
    随之想到了Java中的Reactor、io与nio的一些东西;
    在前辈的博客上翻了翻、复习复习,在此记录一番。


    实在找不到比较大点的东西,于是随便弄了个压缩包,大小在1G左右;
    写个程序模拟一下下载,开两个客户端线程请求下载;
    结果会是:一个请求会一直阻塞,直到一个文件下载完成后另一个文件才开始下载。
    先看看服务端的代码:

    
    class DownLoadServer implements Runnable {
        @Override
        public void run() {
            try {
                @SuppressWarnings("resource")
                final ServerSocket ss = new ServerSocket(8989);
                while (true) {
                    Socket server = ss.accept();
                    byte[] bfile = new byte[1024];
                    try {
                        FileInputStream fis = new FileInputStream("D:/doc_backup.rar");
                        OutputStream os = server.getOutputStream();
                        while (fis.read(bfile) > -1) {
                            os.write(bfile);
                        }
                        fis.close();
                        server.close();
                    } catch (IOException e) {
                        System.out.println("server线程输出流我的天");
                    }
                }
            } catch (Exception e) {
                System.out.println("server线程 我的天~");
            }
        }
    }
    
    


    很简单,就是accept后开个inputStream和outputStream,边读边写。

    接着再看看客户端的代码:

    
    class DownlLoadClient implements Runnable {
        @SuppressWarnings("resource")
        @Override
        public void run() {
            try {
                Socket client = new Socket("127.0.0.1", 8989);
                InputStream is = client.getInputStream();
                FileOutputStream fos = new FileOutputStream(
                        "E:/testfolder/langchao" + Thread.currentThread().getId()
                                + ".txt");
                byte[] fromServer = new byte[1024];
                while (is.read(fromServer) > -1) {
                    fos.write(fromServer);
                }
                client.close();
            } catch (IOException e) {
                System.out.println("client线程我的天~");
            }
        }
    }
    
    


    输出的文件名是随便取的,也没什么特别,只是把读过来的输出去。
    结果当然是这个样子的:


    服务端只有一对inputStream和outputStream对象在受理请求,前面的没写完后面的别想写。
    那如果有很多inputStream和outputStream对象受理请求呢?
    想法不错,也就是说把服务端代码改成这样子:

    
    class DownLoadServer implements Runnable {
        @Override
        public void run() {
            try {
                @SuppressWarnings("resource")
                final ServerSocket ss = new ServerSocket(8989);
                while (true) {
                    final Socket server = ss.accept();
                    Thread t = new Thread() {
                        @Override
                        public void run() {
                            super.run();
                            byte[] bfile = new byte[1024];
                            try {
                                FileInputStream fis = new FileInputStream("D:/doc_backup.rar");
                                OutputStream os = server.getOutputStream();
                                while (fis.read(bfile) > -1) {
                                    os.write(bfile);
                                }
                                fis.close();
                                server.close();
                            } catch (IOException e) {
                                System.out.println("server线程输出流我的天");
                            }
                        }
                    };
                    t.start();
                }
            } catch (Exception e) {
                System.out.println("server线程 我的天~");
            }
        }
    }
    
    


    大概就是这个意思,每accept到就为客户端提供"一对一特殊服务";
    嗯,或者也可以算一下获取了多少下载请求,每N次请求开1次"特殊服务"。
    但无论如何都无法回避一个问题——"特殊服务"的成本很高,线程的切换和线程的资源都是开销。
    如果继续按照这个方法做下去,也只能是弄个Thread Pool。
    但如果请求数量超过了pool的maxActive数量,那问题又饶了一圈回来了。


    我们追求低成本高效率,于是早在JDK1.4就有了java.nio;
    nio怎么讲?有说是new io的、也有叫native io,或许叫non-block io...
    概念上也就是channel、buffer、selector、selectionKey...
    先看一下server代码:

    
    System.out.println("server start...");
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.bind(new InetSocketAddress(8989));
    serverChannel.configureBlocking(false);
    Selector sel = Selector.open();
    serverChannel.register(sel, SelectionKey.OP_ACCEPT);
    File file = new File("D:/doc_backup.rar");
    ByteBuffer buffer = ByteBuffer.allocate(100*1024);
    CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
    while(true){
        sel.select();
        Iterator selKeyItr = sel.selectedKeys().iterator();
        while(selKeyItr.hasNext()){
            SelectionKey key = selKeyItr.next();
            selKeyItr.remove();
                                                           
            String outputFilePath=StringUtils.EMPTY;
            if(key.isAcceptable()){
                System.out.println("server acceptable");
                SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
                channel.configureBlocking(false);
                                                                
                channel.register(sel, SelectionKey.OP_READ);
            }else if(key.isReadable()){
                System.out.println("server readable");
                SocketChannel channel = (SocketChannel) key.channel();
                channel.configureBlocking(false);
                channel.read(buffer);
                buffer.flip();
                CharBuffer clientBuffer = decoder.decode(buffer);
                outputFilePath = clientBuffer.toString();
                buffer.clear();
                SelectionKey writeKey = channel.register(sel, SelectionKey.OP_WRITE);
            }else if(key.isWritable()){
                                                           
                System.out.println("server writable");
                SocketChannel channel =(SocketChannel) key.channel();
                FileChannel fileChannel = new FileInputStream(file).getChannel();
                ByteBuffer fileByte = ByteBuffer.allocate(1024*100);
                while(fileChannel.read(fileByte)!=-1){
                    fileByte.flip();
                    channel.write(fileByte);
                    fileByte.clear();
                }
    
                channel.register(sel, SelectionKey.OP_READ);
            }
        }
    }
    
    

    代码贴出来有点乱,但也就开一个线程,监听与注册事件。
    select()方法必须,不然client的send根本recv不到。
    socketChannel将blocking设置为false,不然会在事件注册时出现java.nio.channels.IllegalBlockingModeException
    Unchecked exception thrown when a blocking-mode-specific operation is invoked upon a channel in the incorrect blocking mode.
    同样地,在write事件中把blocking设置为true或者使用阻塞的面向流的IO也会出现同样的异常。


    client继承Thread,run method如下:

    
    public void run() {
        try {
            System.out.println("client...");
            SocketAddress addr = new InetSocketAddress(8989);
            SocketChannel client = SocketChannel.open();
            client.configureBlocking(false);
            Selector sel = Selector.open();
            client.register(sel, SelectionKey.OP_CONNECT);
            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();                                                                                                                            
            client.connect(addr);
            while (true) {
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while (selKeyItr.hasNext()) {
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if (key.isConnectable()) {
                        System.out.println("client connectble");
                        SocketChannel channel = (SocketChannel) key.channel();
                        String filePath = "E:/testfolder/channelTest"+Thread.currentThread().getId()+".rar";
                        channel.finishConnect();
                        channel.write(encoder.encode(CharBuffer.wrap(filePath)));
                        channel.register(sel, SelectionKey.OP_READ).attach(filePath);                                                                                                                                  
                    } else if (key.isReadable()) {
                        System.out.println("client readble...");                  
                        SocketChannel channel = (SocketChannel) key.channel();                          
                        if(key.attachment()!=null){
                            @SuppressWarnings("resource")
                            FileChannel fc = new FileOutputStream(key.attachment().toString()).getChannel();
                            ByteBuffer fileByte = ByteBuffer.allocate(1024*100);
                            while(channel.read(fileByte)!=-1){
                                fileByte.flip();
                                fc.write(fileByte);
                                fileByte.clear();
                            }
                        }                                                                                                                                           
                        channel.register(sel, SelectionKey.OP_CONNECT);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    

    在上面代码中的attach()并没有发挥太大用处,attach()可以为selectionKey对象添加任何一个object。
    但仅限一个,若没添加,attachment()会取出null。


    运行后发现事件都获取到了,但文件仍然是一个接一个的下载。
    原因是server触发write事件后创建fileChannel并一次写完。
    事件响应的执行体太大,影响后面的执行。
    非阻塞嘛,要得就是立即返回。
    解决方法是分多次事件去读写,每次事件继续读写上一次事件的缓冲。
    我可以好好使用一下这个attach()了。
    首先我加了一个resolver类,我打算把他的实例加到attachment中去:

    
    class ChannelResolver{
        private FileChannel channel;
        private ByteBuffer buffer;
        private FileInputStream fis;
                                                                                                                                                                                                                                                                                                    
        public ChannelResolver(String filePath){
            try {
                this.fis = new FileInputStream(filePath);
                this.channel = this.fis.getChannel();
                buffer = ByteBuffer.allocate(1024*100);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
                                                                                                                                                                                                                                                                                                    
        ByteBuffer readInto(){
            try {
                buffer.clear();
                int i = channel.read(buffer);
                buffer.flip();
                if(i<0){
                    return null;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return buffer;
        }
    }
    
    

    将channel注册write事件后在return的selectionKey上attach一个实例。
    然后在write事件中获取attachment进行读写:

    
    public void run() {
        System.out.println("server start...");
        ServerSocketChannel serverChannel;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress(8989));
            serverChannel.configureBlocking(false);
            Selector sel = Selector.open();
            serverChannel.register(sel, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(100*1024);
            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
            while(true){
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while(selKeyItr.hasNext()){
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if(key.isAcceptable()){
                        System.out.println("server acceptable");
                        SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
                        channel.configureBlocking(false);
                        channel.register(sel, SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        System.out.println("server readable"+Thread.currentThread().getName());
                        SocketChannel channel = (SocketChannel) key.channel();
                        if(channel.read(buffer)>0){
                            buffer.flip();
                            CharBuffer clientBuffer = decoder.decode(buffer);
                            System.out.println("from client::"+clientBuffer.toString());
                            buffer.clear();
                        }
                        channel.register(sel, SelectionKey.OP_WRITE).attach(new ChannelResolver("D:/doc_backup.rar"));
                    }else if(key.isWritable()){
                        SocketChannel channel =(SocketChannel) key.channel();
                        if(key.attachment()!=null){
                            ChannelResolver resolver = (ChannelResolver)key.attachment();
                            buffer = resolver.readInto();
                            if(buffer!=null){
                                channel.write(buffer);
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    

    在handler的readInto()中已经进行了flip(),在这里就不用再flip()了。
    相应地,client的读取也要改一下:

    
    public void run() {
        try {
            System.out.println("client...");
            SocketAddress addr = new InetSocketAddress(8989);
            SocketChannel client = SocketChannel.open();
            client.configureBlocking(false);
            Selector sel = Selector.open();
            client.register(sel, SelectionKey.OP_CONNECT);
            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
            ByteBuffer buffer= ByteBuffer.allocate(1024*500);
            client.connect(addr);
            while (true) {
                sel.select();
                Iterator selKeyItr = sel.selectedKeys().iterator();
                while (selKeyItr.hasNext()) {
                    SelectionKey key = selKeyItr.next();
                    selKeyItr.remove();
                    if (key.isConnectable()) {
                        System.out.println("client connectble");
                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.configureBlocking(false);
                        channel.finishConnect();
                        channel.write(encoder.encode(CharBuffer.wrap("start download")));
                        channel.register(sel, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        if(channel.read(buffer)>0){
                            buffer.flip();
                            fc.write(buffer);
                            buffer.clear();
                        }else{
                            channel.close();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    下面引用书本上的一段话:
    [是基于事件驱动思想的,实现上通常采用Reactor模式,从程序角度而言,当发起IO的读写操作时,是非阻塞的;当socket有流可读或可写入socket时,操作系统会相应地通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。对于网络IO而言,主要有连接建立、流读取和流写入三种事件。
    AIO同样基于事件驱动思想,实现上通常采用Proactor模式。从程序角度而言,和NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。
    较之NIO而言,AIO一方面简化了程序的编写,流的读取和写入都由操作系统来代替完成;另一方面省去了NIO中程序要遍历事件通知队列(selector)的代价。windows基于iocp、Linux基于epoll。]

  • 相关阅读:
    关于might_sleep的一点说明---CONFIG_DEBUG_ATOMIC_SLEEP【转】
    让你的软件飞起来:RGB转为YUV【转】
    Linux终端彩色打印+终端进度条【转】
    Linux中实现一个简单的进度条【转】
    Linux内核官方文档atomic_ops.txt【摘自Linux 内核文档】
    Linux 内核链表的使用及深入分析【转】
    Linux2.6.32内核笔记(5)在应用程序中移植使用内核链表【转】
    spin_lock & mutex_lock的区别? 【转】
    Linux c括号作用域【原创笔记】
    linux C 中的volatile使用【转】
  • 原文地址:https://www.cnblogs.com/kavlez/p/4014257.html
Copyright © 2011-2022 走看看