zoukankan      html  css  js  c++  java
  • JDK源码阅读之BufferedInputStream

    前言:

      BufferedInputStream 的作用是为另一个输入流添加一些功能,例如,提供“缓冲功能”以及支持“mark()标记”和“reset()重置方法”

    BufferedInputStream

      现在看源码分析

      1 public
      2 class BufferedInputStream extends FilterInputStream {
      3 
      4     //默认的缓存数组大小
      5     private static int DEFAULT_BUFFER_SIZE = 8192;
      6 
      7     /*
      8                      要分配的最大数组大小。
      9        有些VM会在数组中保留一些头信息。
     10        尝试分配更大的数组可能会导致
     11        OutOfMemoryError:请求的数组大小超过VM限制
     12      */
     13     private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
     14 
     15     //缓存数组,变化对线程可见
     16     protected volatile byte buf[];
     17 
     18     /*  原子更新操作
     19                        与buf数组的volatile关键字共同组成了buf数组的原子更新功能实现
     20      */
     21     private static final
     22         AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
     23         AtomicReferenceFieldUpdater.newUpdater
     24         (BufferedInputStream.class,  byte[].class, "buf");
     25 
     26    //当前缓冲区的有效字节数
     27    //比缓冲区中最后一个有效字节的索引大 1 的索引。此值始终处于 0 到 buf.length 的范围内;
     28         //从 buf[0] 到 buf[count-1] 的元素包含从底层输入流中获取的缓冲输入数据。
     29     protected int count;
     30 
     31    /*
     32     *   缓冲区中的当前位置。这是将从 buf 数组中读取的下一个字符的索引。
     33          此值始终处于 0 到 count 的范围内。如果此值小于 count,则 buf[pos] 将作为下一个输入字节;
     34          如果此值等于 count,则下一次 read 或 skip 操作需要从包含的输入流中读取更多的字节。
     35     */
     36     protected int pos;
     37 
     38     /*
     39      * 最后一次调用 mark 方法时 pos 字段的值。
     40        此值始终处于 -1 到 pos 的范围内。如果输入流中没有被标记的位置,则此字段为 -1。
     41        如果输入流中有被标记的位置,则 buf[markpos] 将用作 reset 操作后的第一个输入字节。
     42        如果 markpos 不是 -1,则从位置 buf[markpos] 到 buf[pos-1] 之间的所有字节都必须保留在缓冲区数组中(尽管对 count、pos 和 markpos 的值进行适当调整后,
     43        这些字节可能移动到缓冲区数组中的其他位置);除非 pos 与 markpos 的差超过 marklimit,否则不能将其丢弃。
     44      */
     45     protected int markpos = -1;
     46 
     47 /*
     48  * 调用 mark 方法后,在后续调用 reset 方法失败之前所允许的最大提前读取量。
     49  * 只要 pos 与 markpos 之差超过 marklimit,就可以通过将 markpos 设置为 -1 来删除该标记。
     50  */
     51     protected int marklimit;
     52 
     53    //获取输入流
     54     private InputStream getInIfOpen() throws IOException {
     55         InputStream input = in;
     56         if (input == null)
     57             throw new IOException("Stream closed");
     58         return input;
     59     }
     60 
     61     //获取缓存数组
     62     private byte[] getBufIfOpen() throws IOException {
     63         byte[] buffer = buf;
     64         if (buffer == null)
     65             throw new IOException("Stream closed");
     66         return buffer;
     67     }
     68 
     69   //创建默认大小的缓存输入流,包装指定的输入流
     70     //关于装饰器模式即装饰器模式在Java I/O里的应用,请看下一篇博文
     71     public BufferedInputStream(InputStream in) {
     72         this(in, DEFAULT_BUFFER_SIZE);
     73     }
     74 
     75   //创建指定大小的缓存输入流,包装指定的输入流
     76     public BufferedInputStream(InputStream in, int size) {
     77         super(in);
     78         if (size <= 0) {
     79             throw new IllegalArgumentException("Buffer size <= 0");
     80         }
     81         buf = new byte[size];
     82     }
     83 
     84  /*
     85   * fill()函数是整个 BufferedInputStream类的核心所在,我们重点来分析这个函数
     86   */
     87     private void fill() throws IOException {
     88         byte[] buffer = getBufIfOpen();
     89         
     90         if (markpos < 0)
     91             pos = 0;            /* no mark: throw away the buffer */
     92         else if (pos >= buffer.length)
     93             /* no room left in buffer */
     94                 if (markpos > 0) {  /* can throw away early part of the buffer */
     95                 int sz = pos - markpos;
     96                 System.arraycopy(buffer, markpos, buffer, 0, sz);
     97                 pos = sz;
     98                 markpos = 0;
     99                } 
    100          else if (buffer.length >= marklimit) {
    101                 markpos = -1;   /* buffer got too big, invalidate mark */
    102                 pos = 0;        /* drop buffer contents */
    103                }
    104          else if (buffer.length >= MAX_BUFFER_SIZE) {
    105                 throw new OutOfMemoryError("Required array size too large");
    106                }
    107        
    108          else {            /* grow buffer */
    109                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
    110                         pos * 2 : MAX_BUFFER_SIZE;
    111                                 if (nsz > marklimit)
    112                     nsz = marklimit;
    113                 byte nbuf[] = new byte[nsz];
    114                 System.arraycopy(buffer, 0, nbuf, 0, pos);
    115                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
    116                  {
    117                     // Can't replace buf if there was an async close.
    118                     // Note: This would need to be changed if fill()
    119                     // is ever made accessible to multiple threads.
    120                     // But for now, the only way CAS can fail is via close.
    121                     // assert buf == null;
    122                     throw new IOException("Stream closed");
    123                 }
    124                 buffer = nbuf;
    125             }
    126         
    127         
    128         count = pos;
    129         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    130         if (n > 0)
    131             count = n + pos;
    132     }
    133 
    134    
    135     public synchronized int read() throws IOException {
    136         // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区
    137         if (pos >= count) {
    138             fill();
    139             //如果填充操作执行完后pos>=count,这时证明已经无数据向数组中填充
    140             if (pos >= count)
    141                 return -1;
    142         }
    143         return getBufIfOpen()[pos++] & 0xff;//保持二进制数据一致性
    144     }
    145 
    146     private int read1(byte[] b, int off, int len) throws IOException {
    147        
    148         int avail = count - pos;
    149         if (avail <= 0) {
    150             /* 如果请求的长度至少与缓冲区一样大,并且
    151                 如果没有标记/重置活动,那么就不要费心去复制
    152                 字节进入本地缓冲区。 而是直接从流中读取,这其实是一种加快读取的机制*/
    153             if (len >= getBufIfOpen().length && markpos < 0) {
    154                 return getInIfOpen().read(b, off, len);
    155             }
    156             // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区
    157             fill();
    158             avail = count - pos;
    159             //流中已经没有数可读
    160             if (avail <= 0) return -1;
    161         }
    162         int cnt = (avail < len) ? avail : len;
    163         System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
    164         pos += cnt;
    165         return cnt;
    166     }
    167 
    168     // 将缓冲区中的数据写入到字节数组b中。off是字节数组b的起始位置,len是写入长度
    169     public synchronized int read(byte b[], int off, int len)
    170         throws IOException
    171     {
    172         getBufIfOpen(); // Check for closed stream
    173         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
    174             throw new IndexOutOfBoundsException();
    175         } else if (len == 0) {
    176             return 0;
    177         }
    178 
    179         int n = 0;
    180         for (;;) {//不断循环,直到满足读取的长度再跳出
    181             int nread = read1(b, off + n, len - n);
    182             if (nread <= 0)
    183                 return (n == 0) ? nread : n;
    184             n += nread;
    185             if (n >= len)
    186                 return n;
    187             // 如果真的没有可以读取的数据了,此时也会返回
    188             InputStream input = in;
    189             if (input != null && input.available() <= 0)
    190                 return n;
    191         }
    192     }
    193 
    194    
    195     public synchronized long skip(long n) throws IOException {
    196         getBufIfOpen(); // Check for closed stream
    197         if (n <= 0) {
    198             return 0;
    199         }
    200         long avail = count - pos;
    201 
    202         if (avail <= 0) {
    203             // If no mark position set then don't keep in buffer
    204             if (markpos <0)
    205                 return getInIfOpen().skip(n);
    206 
    207             // Fill in buffer to save bytes for reset
    208             fill();
    209             avail = count - pos;
    210             if (avail <= 0)
    211                 return 0;
    212         }
    213 
    214         long skipped = (avail < n) ? avail : n;
    215         pos += skipped;
    216         return skipped;
    217     }
    218 
    219    
    220     public synchronized int available() throws IOException {
    221         int n = count - pos;
    222         int avail = getInIfOpen().available();
    223         return n > (Integer.MAX_VALUE - avail)
    224                     ? Integer.MAX_VALUE
    225                     : n + avail;
    226     }
    227 
    228  
    229     public synchronized void mark(int readlimit) {
    230         marklimit = readlimit;
    231         markpos = pos;
    232     }
    233 
    234     public synchronized void reset() throws IOException {
    235         getBufIfOpen(); // Cause exception if closed
    236         if (markpos < 0)
    237             throw new IOException("Resetting to invalid mark");
    238         pos = markpos;
    239     }
    240 
    241   
    242     public boolean markSupported() {
    243         return true;
    244     }
    245 
    246     
    247     public void close() throws IOException {
    248         byte[] buffer;
    249         while ( (buffer = buf) != null) {
    250             if (bufUpdater.compareAndSet(this, buffer, null)) {
    251                 InputStream input = in;
    252                 in = null;
    253                 if (input != null)
    254                     input.close();
    255                 return;
    256             }
    257             // Else retry in case a new buf was CASed in fill()
    258         }
    259     }
    260 }
    View Code

    对于fill()函数

     1 private void fill() throws IOException {
     2         byte[] buffer = getBufIfOpen();
     3         
     4         if (markpos < 0)
     5             pos = 0;            /* no mark: throw away the buffer */
     6         else if (pos >= buffer.length)
     7             /* no room left in buffer */
     8                 if (markpos > 0) {  /* can throw away early part of the buffer */
     9                 int sz = pos - markpos;
    10                 System.arraycopy(buffer, markpos, buffer, 0, sz);
    11                 pos = sz;
    12                 markpos = 0;
    13                } 
    14          else if (buffer.length >= marklimit) {
    15                 markpos = -1;   /* buffer got too big, invalidate mark */
    16                 pos = 0;        /* drop buffer contents */
    17                }
    18          else if (buffer.length >= MAX_BUFFER_SIZE) {
    19                 throw new OutOfMemoryError("Required array size too large");
    20                }
    21          else {            /* grow buffer */
    22                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
    23                         pos * 2 : MAX_BUFFER_SIZE;
    24                 if (nsz > marklimit)
    25                     nsz = marklimit;
    26                 byte nbuf[] = new byte[nsz];
    27                 System.arraycopy(buffer, 0, nbuf, 0, pos);
    28                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
    29                  {
    30                     // Can't replace buf if there was an async close.
    31                     // Note: This would need to be changed if fill()
    32                     // is ever made accessible to multiple threads.
    33                     // But for now, the only way CAS can fail is via close.
    34                     // assert buf == null;
    35                     throw new IOException("Stream closed");
    36                 }
    37                 buffer = nbuf;
    38             }
    39         
    40         
    41         count = pos;
    42         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    43         if (n > 0)
    44             count = n + pos;
    45 }
    View Code

       现在我们先整体观察一下fill方法,这个方法主要是对缓存数组进行操作,可以看到这个方法存在5条判断分支,并且除了抛出异常外,都会执行最后一段代码,所以这里我们可以将fill方法分解成5个小方法分别进行分析!暂时以fill+序号的方式命名这五个函数。

     一

    1   private void fill0() throws IOException {
    2         byte[] buffer = getBufIfOpen();
    3         
    4         if (markpos < 0)
    5             pos = 0;    
    6          count = pos;
    7         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    8         if (n > 0)
    9             count = n + pos;
    View Code

     这种情况指的是我们每次将输入流中的数据读取一部分到缓存数组中,当缓存数组中的数据被读完后,此时也没有进行任何标记,那么我们继续将输入流中的数据读取一部分到缓存数组中,结合read方法,我们通过pos >= count来判断缓存数组中的数据是否已经读完,通过markpos<0来判断是否存在标记!getInIfOpen().read用来向缓存数组中放入数据,最后通过count变量记录放入的数据个数。

     1 private void fill1() throws IOException {
     2 if (pos >= buffer.length)
     3             /* no room left in buffer */
     4                 if (markpos > 0) {  /* can throw away early part of the buffer */
     5                 int sz = pos - markpos;
     6                 System.arraycopy(buffer, markpos, buffer, 0, sz);
     7                 pos = sz;
     8                 markpos = 0;
     9                }
    10 count = pos;
    11         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    12         if (n > 0)
    13             count = n + pos;
    14 }
    View Code

    这种情况指的是我们读完了缓存数组中的数据(通过pos >=count判断),数组中也没有多余的空间(通过pos >= buffer.length判断),而缓存中存在标记,我们需要将标记位置到数组结尾的数据保存下来已供读取,如下图:数组下标从0到7.

    -1

    0  data

    1  data

    2  data

    markpos

    3 data

    4 data

    5  data

    6 data

    7  data

    pos

    标记及其后面的 数据都需要保留

    -1

    0 data

    1data

    2 data

    3 data

    4 data

    5 data

    pos

     

     

    getInIfOpen().read方法; 从输入流中读取出“buffer.length - pos”的数据,然后填充到buffer中

     

     1 private void fill2() throws IOException {
     2 byte[] buffer = getBufIfOpen();
     3 if (buffer.length >= marklimit) {
     4                 markpos = -1;   /* buffer got too big, invalidate mark */
     5                 pos = 0;        /* drop buffer contents */
     6                }
     7   count = pos;
     8         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
     9         if (n > 0)
    10             count = n + pos;
    11     }
    View Code

    这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length>=marklimit,这时缓存区太大了,标记无效,重置markpos和pos即可!

     1 private void fill3() throws IOException {
     2         byte[] buffer = getBufIfOpen();
     3 
     4 if (markpos >= 0 && pos >= buffer.length)
     5 {            /* grow buffer */
     6                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
     7                         pos * 2 : MAX_BUFFER_SIZE;
     8                 if (nsz > marklimit)
     9                     nsz = marklimit;
    10                 byte nbuf[] = new byte[nsz];
    11                 System.arraycopy(buffer, 0, nbuf, 0, pos);
    12                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 
    13                  {
    14                  
    15                     throw new IOException("Stream closed");
    16                 }
    17                 buffer = nbuf;
    18             }
    View Code

    这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length<marklimit,这时我们需要扩充数组,对于

    int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?

                            pos * 2 : MAX_BUFFER_SIZE;

    这条语句,为方便观察,我们改一下

    int nsz=(pos<=MAX_BUFFER_SIZE/2) ?pos*2:MAX_BUFFER_SIZE;

    很明显,我们扩充的数组不能超过MAX_BUFFER_SIZE我们现在来考虑一种情况

    ,如果再执行fill()方法的过程中一直要执行fill3()方法,那么缓存数组就会越来越大,需要的内存越来越多,所以我们需要一个变量来控制数组的增长,而这个变量就是marklimit, 当buffer>=marklimit时,就不再保存markpos的值了

  • 相关阅读:
    算法与数据结构基础
    算法与数据结构基础
    算法与数据结构基础
    分布式系统理论进阶
    分布式系统理论进阶
    分布式系统理论基础
    分布式系统理论进阶
    分布式系统理论基础
    dht 分布式hash 一致性hash区别
    排期模板
  • 原文地址:https://www.cnblogs.com/lls101/p/11143400.html
Copyright © 2011-2022 走看看