zoukankan      html  css  js  c++  java
  • Java IO浅析

    Java中IO主要分为字符流和字节流,字符流处理的单元为2个字节的Unicode的字符,而字节流处理的单元为一个字节。Java内用Unicode编码存储字符,字符流处理类负责将外部的其他编码的字符流和java内Unicode字符流之间的转换。而类InputStreamReader和OutputStreamWriter处理字符流和字节流的转换。

    Java IO按照阻塞/非阻塞,同步/异步又可以分为BIO,NIO,AIO,下面我们简单介绍下这几类IO。

    BIO

    下图展示了Java BIO的体系结构,主要相关部分分为流式结构、非流式结构(file相关)、其他(socket等):
    img
    字节流和字符流的常见类如下图所示,主要采用了装饰者及适配器设计模式:
    img

    字符流

    字符流的基础抽象类是InputStream和OutputStream,下面简单看看它们的主要方法和具体实现类

    InputStream

    1
    2
    3
    4
    5
    6
    7
    8
    public abstract int () throws IOException;
    public int (byte b[], int off, int len) throws IOException; //一次性读取内容到缓冲字节数组
    public long skip(long n) throws IOException; //从stream中跳过long类型参数个位置
    public void close() throws IOException; //关闭stream方法
    public int available() throws IOExceptionpublic synchronized void mark(int readlimit);//返回stream中的可读字节数
    public synchronized void mark(int readlimit) //用于标记stream的作用,readlimit表示调用mark最多可以读多少字节
    public synchronized void reset() throws IOException;//这个方法和mark方法一起使用的,让stream回到mark的位置判断
    public boolean markSupported();//stream是否可以调用mark方法和reset方法

    这里需要注意下:mark(int readlimit)的参数并不是说读取readlimit个字节标记位才失效,也和BufferedInputStream类的缓冲区大小有关。若BufferedInputStream类的缓冲区大小大于readlimit,在mark以后只有读取超过缓冲区大小的数据,mark标记才会失效,即是取readlimit和BufferedInputStream类的缓冲区大小(BufferedInputStream bis=new BufferedInputStream(in,2))两者中的最大值。
    下面介绍一些常用的具体实现类吧:
    1.ByteArrayInputStream :把内存中的一个缓冲区作为 InputStream 使用
    2.FileInputStream :把一个文件作为 InputStream
    3.PipedInputStream:实现了pipe,主要在线程中使用,管道输入流是指一个通讯管道的接收端。一个线程通过管道输出流发送数据,另一个线程通过管道输入流读取数据,这样可实现两个线程间的通讯。
    4.SequenceInputStream:把多个 InputStream 合并为一个 InputStream,使用Enumeration作为参数
    5.ObjectInputStream:用于操作Object的stream,这个在stream主要用在对象传输的过程中,其中牵涉到了序列化的知识
    6.FilterInputStream:是一个过滤的InputStream,有很多具体的子类
    (1)BufferedInputStream:使用缓冲区的stream
    (2)DataInputStream:数字格式化的stream(readInt,readFloat,readDouble)

    OutputStream

    1
    2
    3
    4
    5
    public abstract void write(int b) throws IOException; //写入一个字节
    public void write(byte b[]) throws IOException;//写入一个byte数组到stream中
    public void write(byte b[], int off, int len) throws IOException;//把byte数组中从offset开始处写入长度为len的数据
    public void flush() throws IOException;//这个方法是用来刷新stream中的数据,让缓冲区中的数据强制的输出
    public void close() throws IOException;//关闭流,这个是在操作完stream之后必须要调用的方法

    常见的具体实现类:
    (1)ByteArrayOutputStream: 把信息存入内存中的一个缓冲区中 . 该类实现一个以字节数组形式写入数据的输出流
    (2)FileOutputStream: 文件输出流是向 File 或 FileDescriptor 输出数据的一个输出流
    (3)PipedOutputStream: 管道输出流是指一个通讯管道的发送端。 一个线程通过管道输出流发送数据
    (4)ObjectOutputStream:输出Object对象的stream
    (5)BufferedOutputStream:带有缓冲区的stream
    (6)DataOutputStream:具有格式化的OutputStream
    (7)PrintStream:直接输出到控制台中:最熟悉的就是System.out是一个PrintStream

    字符流

    字符流的基础抽象类是Reader、Writer,下面看一下主要方法和具体实现类。
    Reader和InputStream的方法基本上一致,区别在于前者使用了char读取字符,而后者使用了byte读取字节
    (1)CharArrayReader:此类实现一个可用作字符输入流的字符缓冲区char[],与 ByteArrayInputStream 对应
    (2)PipedReader:与 PipedInputStream 对应
    (3) BufferReader: 与BufferInputStream对应
    (4)StringReader:从字符串读取
    (5)InputStreamReader:将InputStream转化成Reader,从字节流到字符流的桥梁:它读入字节,并根据指定的编码方式,将之转换为字符流。
    (6)FileReader:继承InputStreamReader
    Writer和OutputStream的方法基本上一致:
    (1)CharArrayWriter: 与 ByteArrayOutputStream 对应
    (2)PipedWriter:与 PipedOutputStream 对应
    (3)StringWriter:输出到StringBuffer
    (4)BufferWriter: 与BufferOutputStream对应
    (5)OutputStreamWriter: 将Writer转化为OutputStream,根据指定的字符编码将多个字符转换为字节,是字符流到字节流的桥梁
    (6)FileWriter:继承OutputStreamWriter

    字节流与字符流的选择Tips

    (1)按照输入输出 输入: Reader, InputStream 类型的子类;输出: Writer, OutputStream 类型的子类
    (2)按照数据格式 二进制格式(只要不能确定是纯文本的) : InputStream, OutputStream 及其所有带 Stream 结束的子类 ; 纯文本格式(含纯英文与汉字或其他编码方式); Reader, Writer 及其所有带 Reader, Writer 的子类
    (3)是否需要缓冲 要缓冲: BufferedInputStream, BufferedOutputStream,( 字节流 ) BufferedReader, BufferedWriter( 字符流 )
    (4)按数据来源(去向)分类:
    是文件: FileInputStream, FileOutputStream, ( 字节流 )FileReader, FileWriter( 字符 )
    是 byte[] : ByteArrayInputStream, ByteArrayOutputStream( 字节流 )
    是 Char[]: CharArrayReader, CharArrayWriter( 字符流 )
    是 String: StringReader, StringWriter( 字符流 )
    网络数据流: InputStream, OutputStream,( 字节流 ) Reader, Writer( 字符流 )
    (5)其他特殊功能
    从 Stream 到 Reader,Writer 的转换类: InputStreamReader, OutputStreamWriter
    对象输入输出: ObjectInputStream, ObjectOutputStream
    进程间通信: PipeInputStream, PipeOutputStream, PipeReader, PipeWriter
    合并输入: SequenceInputStream
    更特殊的需要: PushbackInputStream, PushbackReader, LineNumberInputStream, LineNumberReader

    Java IO性能优化

    IO的性能瓶颈主要体现在两个方面。

    错误的使用缓冲(buffer)

    (1)为内存IO类(In-memory IO class)添加缓冲(错误用法):如ByteArrayInput/OutputStream,根本没有使用到IO
    (2)为已添加buffer的IO类再次添加buffer(错误用法):多余的buffer只会引入更多的栈调用和垃圾创建
    (3)Buffer版的IO类和显式使用buffer(使用byte[]或char[])间的关系(概念上的误解):实际上显示使用数组使用buffer效率更高,Buffer版IO使用装饰者模式,效率较低; Buffer版IO方法有同步synchroized操作,同步导致效率低下

    过度的同步保护

    Java IO类存在很多同步方法,这样会导致效率低下

    NIO

    Java NIO是指New IO,相对于传统的IO支持非阻塞模式(文件除外),而且NIO是基于Buffer的,性能相比BIO有较大的提升,NIO中有三个主要的概念:Buffer、Channel、Selector.

    Buffer

    Buffer是一个对象,包含一些要写入或者读出的数据。在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。具体的缓存区有:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer,其中ByteBuffer支持堆外直接内存分配。
    每一个Buffer也有几个关键的信息如:position、limit、capacity、mark等等标记位来代表buffer数据状态情况(具体参考JDK源码)。

    Channel

    NIO 对数据的读取和写入要通过Channel。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。Channel主要分两大类:SelectableChannel用户网络读写,FileChannel用于文件操作。

    Selector

    Selector是NIO的基础,它提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。一个Selector可以同时轮询多个Channel,JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

    NIO聊天示例

    NIO相对BIO实现上复杂很多,下面我们简单写一个聊天功能吧:
    服务端代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    package nio;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.charset.Charset;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Set;
    /**
    * Created by fuyang on 16/9/11.
    */
    public class NioServer {
    private Selector selector;
    private static final int PORT = 6060;
    private static Charset charset = Charset.forName("UTF-8");
    private static Set<String> userNames = new HashSet<>();
    public void initNioServer() {
    try {
    // 主线程注册等待连接事件
    selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(PORT));
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("NIO Server initial...Waiting for connecting on port 6060!");
    while (true) {
    int n = selector.select();
    if (n == 0) {
    continue;
    }
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    handleMessage(key);
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    // 发送消息,发送到其他所有的通道
    public void sendMessages(Selector selector, SelectionKey sk, String content) {
    for (SelectionKey key : selector.keys()) {
    if (key == sk) {
    continue;
    }
    Channel channel = key.channel();
    if (channel instanceof SocketChannel) {
    try {
    ((SocketChannel) channel).write(charset.encode(content));
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    }
    //select监听到事件返回,然后解析具体的SelectionKey,读/写/连接,分别处理
    public void handleMessage(SelectionKey key) {
    if (key.isAcceptable()) {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    try {
    SocketChannel socketChannel = ssc.accept();
    socketChannel.configureBlocking(false);
    socketChannel.register(selector, SelectionKey.OP_READ);
    System.out.println("client:" + socketChannel.getRemoteAddress() + " is connecting");
    socketChannel.write(ByteBuffer.wrap("welcome guys,please input your name!".getBytes()));
    } catch (IOException e) {
    e.printStackTrace();
    }
    } else if (key.isReadable()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    try {
    socketChannel.read(buffer);
    buffer.flip();
    String content = charset.decode(buffer).toString();
    if (content != null && content.length() > 0) {
    String[] strings = content.split("_");
    if (strings.length == 1) {
    if (userNames.contains(strings[0])) {
    socketChannel.write(ByteBuffer.wrap( 大专栏  Java IO浅析4;users already exists!".getBytes()));
    } else {
    userNames.add(content);
    System.out.println(content + ", registered!");
    socketChannel.write(ByteBuffer.wrap(("welcome," + content).getBytes()));
    sendMessages(selector, key, strings[0] + ",enter the char room!");
    }
    } else {
    sendMessages(selector, key, content);
    }
    }
    } catch (IOException e) {
    key.cancel();
    if (socketChannel != null) {
    try {
    socketChannel.close();
    } catch (IOException e1) {
    e1.printStackTrace();
    }
    }
    e.printStackTrace();
    }
    }
    }
    public static void main(String[] args) throws IOException {
    new NioServer().initNioServer();
    }
    }

    客户端代码比较简单:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    public class NioClient {
    private Selector selector;
    private Charset charset = Charset.forName("UTF-8");
    private static final int PORT = 6060;
    private static String userName="";
    public void init() {
    try {
    selector = Selector.open();
    //连接远程主机的IP和端口
    SocketChannel sc = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
    sc.configureBlocking(false);
    sc.register(selector, SelectionKey.OP_READ);
    //开辟一个新线程来读取从服务器端的数据
    new Thread(new ClientThread()).start();
    //在主线程中 从键盘读取数据输入到服务器端
    Scanner scan = new Scanner(System.in);
    while (scan.hasNextLine()) {
    String line = scan.nextLine();
    if(userName.length()==0){
    userName=line;
    sc.write(charset.encode(userName));
    }else {
    System.out.println("Me: "+line);
    sc.write(charset.encode(userName+"_"+line));
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    //新起一个线程接受服务器的数据
    private class ClientThread implements Runnable {
    public void run() {
    try {
    while (true) {
    int k = selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
    SelectionKey sk = iterator.next();
    iterator.remove();
    if (sk.isReadable()) {
    //使用 NIO 读取 Channel中的数据
    SocketChannel sc = (SocketChannel) sk.channel();
    ByteBuffer buff = ByteBuffer.allocate(1024);
    String content = "";
    while (sc.read(buff) > 0) {
    buff.flip();
    content += charset.decode(buff);
    }
    if (content.length() > 0) {
    String[] strs = content.split("_");
    if(strs.length>1) {
    System.out.println(strs[0] + ": " + strs[1]);
    }else {
    if(content.equals("users already exists!")) {
    System.out.println(content);
    userName = "";
    }else {
    System.out.println(content);
    }
    }
    }
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    public static void main(String[] args) throws IOException {
    new NioClient().init();
    }
    }

    由此可以看出,NIO服务端利用了IO多路复用,相比BIO使用了更少的线程,资源利用率更高!

    NIO的优势

    (1)事件驱动模型
    (2)单线程处理多任务,避免了多线程等待
    (3)非阻塞I/O,I/O读写不再阻塞,而是返回0
    (4)IO多路复用大大提高了Java网络应用的可伸缩性和实用性

    IO模式对比

    下面我们对比一下NIO与BIO的模式,然后再分析一下Java IO中的同步与异步、阻塞与非阻塞概念。

    BIO VS NIO

    BIO模式中,服务端由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行处理,通过输出流返回应答给客户端,处理完成后线程销毁。即典型的一请求一应答通宵模型,如下图所示:
    img
    BIO最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,当线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就down机,所以BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,无法满足高性能、高并发的场景。当然可以使用一个线程池来管理这写具体的数据处理,以此来达到线程服用,节省服务器资源,但是这种方法不能解决问题的根本,在大量线程处于等待情况下仍然不能很好利用资源。
    NIO模式的处理流程如下图所示:
    img
    可以看出NIO服务端由一个单独的线程来轮询等待事件到来,在没有事件时不会存在大量的线程阻塞,只有有事件时才会在新的线程中处理任务,因此不会造成大量线程阻塞,支持更高的并发,性能相比BIO有较大的提升。

    同步与阻塞

    同/异步以及阻塞/非阻塞概念

    首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO也分为二步,等待数据就绪以及数据拷贝(核心态内存到用户态内存)。所以可以看出,同步异步主要是应用程序和内核的交互方式,若应用读写直到IO完成那么就是同步的,若由操作系统完成IO读写放入缓冲区然后会调通知应用直接取数据那么就是异步的;同样阻塞和非阻塞区别在于在应用访问数据的时候,根据IO操作的就绪状态来采取的不同方式,是一种读取或者写入操作函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。
    下面我们结合下图看一下常见的几种模式:
    阻塞IO:阻塞在recvfrom调用
    img
    非阻塞IO:反复轮询recvfrom
    img
    IO多路复用:实质阻塞在select调用(select/poll,epoll轮询),数据准备好开始读取(核心态到用户态)
    img
    异步IO:发起aio-read请求,等待数据准备好并拷贝到用户态缓存区(依赖OS),然后读取
    img
    在这里简单了解下多路复用的概念以及select/poll,epoll的区别:
    I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符(fd)就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间

    poll与epoll对比

    select的缺点:
    1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大(fd_set类似数组的结构需要遍历)
    3)select支持的文件描述符数量太小了,默认是1024
    poll和select几本一致,区别在于其描述符FD定义不同,它使用的是pollfd;
    epoll是linux新出用于取代poll、select,处理上述三个问题时:
    1)次注册新的事件到epoll句柄中时(epoll_ctl函数)会把所有的fd拷贝进内核,不会在epoll_wait的时候重复拷贝
    2)epoll_ctl为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表,epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(也有人说用了红黑树所以比数组快)
    3)epoll没有最大FD这个限制

    BIO,NIO,AIO的模式对比

    BIO:同步阻塞方式,同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善
    NIO:同步非阻塞方式,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,由多路复用器轮询到连接有IO请求时才启动一个线程进行处理,所以是非阻塞的。用户进程也需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,但是从内核读取数据到用户缓冲区需要应用程序自己完成,因此是同步的。
    AIO:异步非阻塞,JDK1.7后出现,一个有效请求一个线程,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。常见的类有AsynchronousSocketChannel,AsynchronousServerSocketChannel,AsynchronousFileChannel,AsynchronousDatagramChannel
    总结一下,BIO关注“我要读”,NIO关注"我可以读了",在AIO模型关注的是“读完了”,此外NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。BIO,NIO,AIO也有不同的适用场景:
    BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
    NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
    AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

    Reactor与Proactor

    Reactor

    Reactor与Proactor是两种常用的IO模式,其中Reactor是同步IO模式,被广泛应用于网络编程中,如Redis、Netty;而Proactor是一种异步IO模式,由于操作系统的原因,相关的开源产品也少;在这先里我们学习下其模型结构,重点对比下两者的异同点。首先我们先看一下reactor的结构:
    img
    Reactor包含如下角色:
    1.Handle 句柄;用来标识socket连接或是打开文件;
    2.Synchronous Event Demultiplexer:同步事件多路分解器:由操作系统内核实现的一个函数;用于阻塞等待发生在句柄集合上的一个或多个事件;(如select/epoll;)
    3.Event Handler:事件处理接口
    4.Concrete Event HandlerA:实现应用程序所提供的特定事件处理逻辑;
    其流程如下:
    1.应用启动,将关注的事件handle注册到Reactor中;
    2.调用Reactor,进入无限事件循环,等待注册的事件到来;
    3.事件到来,select返回,Reactor将事件分发到之前注册的回调函数中处理;

    Reactor也包含了常见三种类型:
    1.精典Reactor模式
    img
    可以看出Acceptor处理客户端连接请,Reactor将I/O事件发派给对应的Handler,由Handlers 执行非阻塞读/写及数据处理操作,这种模式下一个线程同时监控多个请求状态。
    2.多工作线程Reactor模式
    img
    经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。因此可以引入多线程,可以使用线程池来并行处理多个读/写操作。
    3.多Reactor模式
    img
    多工作线程Reactor模式下,主线程也需要监听多种事件,当并发量过大时也会导致性能瓶颈,也存在单点故障问题,所以引出了多Reactor,即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel提交给不同的子Reactor处理,每一个子Reactor包含一个Selector实例,用来监听监听事件,并且监听在子Reactor所在的线程中,保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换;同样每一个子Reactor中创建了一个静态的线程池,循环处理数据读取和操作,相比单一的Reactor大大提升了效率。

    Proactor

    Proactor的结构如下所示:
    img
    主要包含了如下角色:
    1.Handle 句柄;用来标识socket连接或是打开文件;
    2.Asynchronous Operation Processor:异步操作处理器;负责执行异步操作,一般由操作系统内核实现;
    3.Asynchronous Operation:异步操作
    4.Completion Event Queue:完成事件队列;异步操作完成的结果放到队列中等待后续使用
    5.Proactor:主动器;为应用程序进程提供事件循环;从完成事件队列中取出异步操作的结果,分发调用相应的后续处理逻辑;
    6.Completion Handler:完成事件接口;一般是由回调函数组成的接口;
    7.Concrete Completion Handler:完成事件处理逻辑;实现接口定义特定的应用处理逻辑;
    其工作流程如下:
    1.应用程序启动,调用异步操作处理器提供的异步操作接口函数,调用之后应用程序和异步操作处理就独立运行;应用程序可以调用新的异步操作,而其它操作可以并发进行;
    2.应用程序启动Proactor主动器,进行无限的事件循环,等待完成事件到来;
    3.异步操作处理器执行异步操作,完成后将结果放入到完成事件队列;
    4.主动器从完成事件队列中取出结果,分发到相应的完成事件回调函数处理逻辑中。

    总结

    Java IO中包含了BIO、NIO、AIO等多种模式,每一个模式有不同的特点和应用场景,这些都是IO的基础知识。此外有时间一定要学习一些开源的框架如Netty,它基于NIO,使用了Reactor模式,同时也要多了解一下AIO模式如Proactor(异步IO)等高性能的IO模式。

  • 相关阅读:
    vue实现通过链接跳转到页面
    vue-cli2-项目的创建
    平均数
    Spring-Spring简介
    vue + element-ui 表单校验封装公用方法
    Python(一)数据结构和算法的20个练习题问答
    Python包中__init__.py作用
    if __name__=="__main__":
    execute immediate
    oracle基础知识过一遍(原创)
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12376243.html
Copyright © 2011-2022 走看看