zoukankan      html  css  js  c++  java
  • IO模型

    1. IO复用

    进程需要这样一种功能:内核一旦发现进程指定的一个或多个IO条件(事件)就绪(输入准备好被读取或者输出准备好被输出),它通知进程,这个就叫做I/O复用。

    IO复用的典型使用场景

    • 客户处理多个描述符号(通常是交互式输入和网络套接字)时候,必须使用I/O复用,例如实际的网络聊天,进程既要等待用户的输入,又要处理来自网络套接字的输入,采用I/O复用可以大幅度提高并发度和节省资源;

    • TCP服务器既要处理监听套接字,又要处理已经连接的套接字,这时候需要I/O复用以提高并发性能;实际上常见的并发连接的客户数多,但是每个连接的任务轻(CPU时间短)比如多人在线聊天室;连接并发数多,但是每个连接不都是长期处于活跃状态,可能是偶尔I/O一些数据,这时候需要I/O复用,让系统别被这些Lazy的客户端拖垮;

    • 如果一个服务器需要同时处理多个服务或协议,一般使用IO复用;

    其实,不难看出I/O复用的目的是为了让计算机的资源得到尽可能最大化的利用,提高并行化程度达到这种目的的一种有效途径。

    I/O复用在某种程度上和操作系统的中断处理机制何其的相似。

    操作系统作为计算机的核心软件,管理着众多的硬件设备,具有极大的并发性能,我们可以同时听音乐,同时拷文件,同时跑程序等等;如此高的并发能力得益与中断机制给操作系统带来的异步能力,当某个硬件设备特定的事件发生时(比如磁盘就绪),就引发一个硬件中断,操作系统能捕获到这些中断,将当前运行的进程挂起,然后转而执行该中断对应的中断处理程序。

    这种机制有点贪心的思想,操作系统总是尽可能的少等待,有活就干,没活就离开;这种机制中,众多硬件设备就类似与IO复用里面的众多Lazy的客户,他们需要与服务器(这里是操作系统)保持连接(做到随叫随到),但是又总是需要操作系统。中断是操作系统运行的动力源泉,这是一个典型的异步模型。I/O复用依赖于内核提供的异步能力,其抽象程度与操作系统和硬件设备的模型相似,这样能充分利用计算机的资源(软件充分利用操作系统,操作系统充分利用硬件)。

    2. I/O模型

    • 阻塞IO
    • 非阻塞IO
    • IO复用
    • 信号驱动IO
    • 异步IO

    应用程序典型的I/O包含两个不同的阶段:

    以输入为例

    1. IO请求就绪阶段 应用程序向内核发出请求,检查硬件设备是否准备好数据(例如等待数据从网络到达、用户键盘敲击);若硬件设备就绪,产生中断;
    2. IO实际读写阶段 内核捕获处理中断,然后内核空间将数据拷贝到用户的进程空间中,CPU切换到用户态;

    这两个阶段的产生主要是由于计算机目前的体系结构决定的(最顶层用户程序运行在用户态,中间层内核工作在内核态,最底层是硬件);这两个阶段都有可能产生阻塞,在内核与硬件之间,应付这种阻塞采用了基于事件的中断机制以提高并发性;而在内核与用户进程之间,异步IO机制也将为用户进程并发程度提高带来曙光。

    2.1 阻塞IO

    最普遍的IO模型都是阻塞模型,这种模型在IO的的第一阶段(向内核发出I/O请求的系统调用)时,被阻塞直到I/O就绪,再执行实际I/O操作

    2.2 非阻塞IO

    进程通知内核,进程所发起的IO请求I/O未就绪的条件下,也不能把进程阻塞(挂起),而是返回一个错误;该模型可能需要进程不断的去轮询某个FD是否IO就绪,这样的轮询模型比较少见,轮询会额外的浪费大量的CPU资源。

    2.3 IO复用

    该模型本质上属于非阻塞模型I/O,常见的是selectpoll机制。对于普通的非阻塞模型,一个进程或者线程始终轮询同一个FD,这样效率较低,特别是对于同时需要处理很多FD的程序;

    而IO复用模型,可以让进程或者线程同时管理(轮询)多个FD,以select为例,select能够管理许多FD上的不同事件,select调用在所有FD都没有IO事件(就绪、可读、可写等)产生时会阻塞,虽然select调用会阻塞,但是其管理轮询的FD不是阻塞的,因此该模型依然算是非阻塞IO模型

    该模型通常和阻塞IO模型+多线程作对比,在典型的socket服务器端,该模型和阻塞IO模型+多线程可以实现相同的功能,大多数情况下效果性能也不会差太离谱;

    但是考虑在某些特定的场景下,例如客户端连接数目众多,但是每个连接今次进行IO次数比较多,IO的数据比较少(典型的就是多人在线聊天,大量的客户端同服务器保持连接,但是每一个连接“走走停停”,隔一会来几个字节的数据),在阻塞IO模型+多线程模型中,每一个连接对应于一个线程,大量的长连接会很快消耗掉计算机中(也可能是线程池)中所有线程资源,从而拒绝了额外的新的客户连接请求,但是此时计算机的CPU却是闲置的,因为很多线程其实是在阻塞状态,因此这种场景下,该模型没有很好的提高系统的并发性;

    而采用I/O复用模型,用一个或多个线程去select或者poll检查多个连接上的IO事件,当IO事件(可读、可写等)发生,就利用新开线程池中的线程来处理IO,由于每一次的IO的任务很轻,并且IO数据已经来到了内核或者进程的缓冲区,这样新开的线程很快就会被线程池回收,这样当新来的客户连接请求进来,系统仍然有足够的线程资源来处理请求,极大的提高了该场景下系统的吞吐量,IO复用模型要远优于阻塞IO模型+多线程

    实际应用中,辨证的选择IO复用模型阻塞IO模型+多线程

    2.4 信号驱动IO

    进程不会阻塞在第一阶段,当IO就绪时,内核会通过信号的形式通知进程;

    2.5 异步IO

    进程在整个IO过程(IO请求和IO读写阶段)都不会阻塞,而是在IO完成后由内核通知进程;

    2.6 同步IO与异步IO

    在某些术语中,阻塞和非阻塞用于区分IO的第一个阶段,即IO的请求阶段
    同步和异步用于区分IO的第二个阶段,即IO的读写阶段

    因此,按照这个来重新分类上述五种IO模型,就可以分为

    • 同步IO

      • 阻塞IO (最普遍的同步阻塞BIO)
      • 非阻塞IO (Java NIO, 需要轮询,不停的轮询某个FD是否就绪)
      • IO复用 (Java NIO的Select机制,同时监听多个FD,虽然单个FD不阻塞,但是当所有FD都没有就绪时,select调用会阻塞;一个阻塞换来多个不阻塞)
      • 信号驱动IO (同同步非阻塞IO,不用轮询,而是直接接收内核发来的IO信号)
    • 异步IO (Java AIO, IO的两个阶段都不会阻塞,进程只需要发起IO请求,在IO完成两个阶段后,由内核负责通知进程,期间进程都不会挂起)

    2.7 5种IO模型的Java示例

    • (1)阻塞IO + 多线程
    
    
    package iomodel;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.Reader;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class BIODemo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Executor pools = Executors.newFixedThreadPool(5);
            try {
                ServerSocket sS = new ServerSocket(8888);
                for (;;) {
                    Socket s = sS.accept();//blocking
                    pools.execute(new BIOWorker(s));
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    class BIOWorker implements Runnable {
        Socket client = null;
    
        public BIOWorker(Socket client) {
            super();
            this.client = client;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                BufferedReader is = new BufferedReader(new InputStreamReader(client.getInputStream()));
                String line = null;
                while ((line = is.readLine()) != null)//blocking
                    System.out.println(line);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    
    
    • (2)非阻塞IO模型
    package iomodel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    public class NBIODemo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            try {
                Charset ascii = Charset.forName("us-ascii");
                CharsetDecoder decoder = ascii.newDecoder();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);
                ServerSocket ss = ssc.socket();
                ss.bind(new InetSocketAddress("localhost", 8888));
    
                SocketChannel sC = null;
    
                while ((sC = ssc.accept()) == null)
                    System.out.println("i am pooling for new connection!");// poll until new connection was established.non-blocking
                sC.configureBlocking(false);
    
                ByteBuffer buffer = ByteBuffer.allocate(1024);
    
                while (true) {//pool for reading
                    int r = sC.read(buffer);//non-blocking
                    System.out.println("i am pooling for new data!");
                    if (r > 0) {
                        buffer.flip();
                        System.err.println("recv " + r +" bytes from " + sC);
                        System.err.print(decoder.decode(buffer));
                        buffer.compact();
                    }
                }
    
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    
    
    • (3)I/O复用
    package iomodel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class MultiplexingIODemo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Executor pool = Executors.newFixedThreadPool(5);
            try {
                Selector selector = Selector.open();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);// listening socket channel is
                                                // non-blocking for accept()
    
                ServerSocket ss = ssc.socket();
                ss.bind(new InetSocketAddress("localhost", 8888));
    
                ssc.register(selector, SelectionKey.OP_ACCEPT);
    
                for (;;) {
    
                    // System.out.println("I am selecting..");
                    selector.select();// may block until there occurred at least one
                                        // IO event.
    
                    Set<SelectionKey> selectKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectKeys.iterator();
    
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                            ServerSocketChannel ssc1 = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc1.accept();
                            sc.configureBlocking(false);
                            sc.register(selector, SelectionKey.OP_READ);
                            iterator.remove();
                            System.out.println("Got connection from " + sc);
                        } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            if (key.attachment() == null) {
                                ByteBuffer buff = ByteBuffer.allocate(1024);
                                key.attach(buff);
                                System.out.println("first recv data from " + sc);
                            }
                            pool.execute(new IOWorker(sc, (ByteBuffer) key.attachment()));
                            iterator.remove();
                        }
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    class IOWorker implements Runnable {
        ByteBuffer buffer = null;
        SocketChannel sC = null;
        Charset gb2312 = Charset.forName("gb2312");
        CharsetDecoder decoder = gb2312.newDecoder();
    
        public IOWorker(SocketChannel sC, ByteBuffer buffer) {
            super();
            this.sC = sC;
            this.buffer = buffer;
    
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            int r;
            try {
                r = this.sC.read(buffer);
                if (r > 0) {
                    System.err.println("recv " + r + " new bytes from " + sC);
                    buffer.flip();
                    // get first byte
                    // decode bytes stream
                    for (; buffer.hasRemaining();) {
                        byte fisrt = buffer.get(0);
                        if (Byte.toUnsignedInt(fisrt) <= 127) {// 高字节的大小判断
                            fisrt = buffer.get();
                            System.err.println("[to " + sC.socket().getPort() + " ] "+ (char) fisrt);
                        } else if (buffer.remaining() >= 2) {
                            byte[] b = new byte[2];
                            b[0] = buffer.get();
                            b[1] = buffer.get();
                            String s = new String(b, "gbk");
                            System.err.println("[to " + sC.socket().getPort() + " ] " + s);
                        } else
                            break;
                    }
                    buffer.compact();
                }
    
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    • (4)异步I/O
    package iomodel;
    
    import java.io.File;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousFileChannel;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.nio.file.StandardOpenOption;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    public class AIODemo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            try {
                AsynchronousFileChannel aFC = AsynchronousFileChannel.open(Paths.get("a.txt"), StandardOpenOption.READ);
                System.out.println("main " + Thread.currentThread().getId());
                ByteBuffer buffer = ByteBuffer.allocate((int) new File("a.txt").length());
                Future<Integer> result = aFC.read(buffer, 0);   
                while (!result.isDone()) {
                    System.out.println("i am doing other useful work...");
                }
                buffer.flip();
                System.out.println(result.get());
    
                System.out.println("read is done");
    
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
    }
    
    class FileReadHandler implements CompletionHandler<Integer, AsynchronousFileChannel> {
    
        @Override
        public void completed(Integer result, AsynchronousFileChannel attachment) {
            System.out.println("new connection from " + result);
        }
    
        @Override
        public void failed(Throwable exc, AsynchronousFileChannel attachment) {
            // TODO Auto-generated method stub
    
        }
    
    }

    3. 总结

    • 几种IO模型的优缺点,模型的阻塞、非阻塞以及同步、异步指的是什么?
    • select模型与阻塞+多线程模型的区别以及使用场景;

    4. References

    [1] 史蒂文斯,芬纳,鲁道夫.UNIX网络编程卷一[M].北京:人民邮电出版社,2015:122-148

  • 相关阅读:
    acm课程练习2--1002
    acm课程练习2--1001
    SDAU课程练习--problemQ(1016)
    SDAU课程练习--problemG(1006)
    SDAU课程练习--problemO(1014)
    SDAU课程练习--problemB(1001)
    SDAU课程练习--problemA(1000)
    SDAU课程练习--problemC
    SDAU课程练习--problemE
    不安全函数(转)
  • 原文地址:https://www.cnblogs.com/Spground/p/8536153.html
Copyright © 2011-2022 走看看