前言:
BufferedInputStream 的作用是为另一个输入流添加一些功能,例如,提供“缓冲功能”以及支持“mark()标记”和“reset()重置方法”
BufferedInputStream
现在看源码分析
1 public 2 class BufferedInputStream extends FilterInputStream { 3 4 //默认的缓存数组大小 5 private static int DEFAULT_BUFFER_SIZE = 8192; 6 7 /* 8 要分配的最大数组大小。 9 有些VM会在数组中保留一些头信息。 10 尝试分配更大的数组可能会导致 11 OutOfMemoryError:请求的数组大小超过VM限制 12 */ 13 private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; 14 15 //缓存数组,变化对线程可见 16 protected volatile byte buf[]; 17 18 /* 原子更新操作 19 与buf数组的volatile关键字共同组成了buf数组的原子更新功能实现 20 */ 21 private static final 22 AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = 23 AtomicReferenceFieldUpdater.newUpdater 24 (BufferedInputStream.class, byte[].class, "buf"); 25 26 //当前缓冲区的有效字节数 27 //比缓冲区中最后一个有效字节的索引大 1 的索引。此值始终处于 0 到 buf.length 的范围内; 28 //从 buf[0] 到 buf[count-1] 的元素包含从底层输入流中获取的缓冲输入数据。 29 protected int count; 30 31 /* 32 * 缓冲区中的当前位置。这是将从 buf 数组中读取的下一个字符的索引。 33 此值始终处于 0 到 count 的范围内。如果此值小于 count,则 buf[pos] 将作为下一个输入字节; 34 如果此值等于 count,则下一次 read 或 skip 操作需要从包含的输入流中读取更多的字节。 35 */ 36 protected int pos; 37 38 /* 39 * 最后一次调用 mark 方法时 pos 字段的值。 40 此值始终处于 -1 到 pos 的范围内。如果输入流中没有被标记的位置,则此字段为 -1。 41 如果输入流中有被标记的位置,则 buf[markpos] 将用作 reset 操作后的第一个输入字节。 42 如果 markpos 不是 -1,则从位置 buf[markpos] 到 buf[pos-1] 之间的所有字节都必须保留在缓冲区数组中(尽管对 count、pos 和 markpos 的值进行适当调整后, 43 这些字节可能移动到缓冲区数组中的其他位置);除非 pos 与 markpos 的差超过 marklimit,否则不能将其丢弃。 44 */ 45 protected int markpos = -1; 46 47 /* 48 * 调用 mark 方法后,在后续调用 reset 方法失败之前所允许的最大提前读取量。 49 * 只要 pos 与 markpos 之差超过 marklimit,就可以通过将 markpos 设置为 -1 来删除该标记。 50 */ 51 protected int marklimit; 52 53 //获取输入流 54 private InputStream getInIfOpen() throws IOException { 55 InputStream input = in; 56 if (input == null) 57 throw new IOException("Stream closed"); 58 return input; 59 } 60 61 //获取缓存数组 62 private byte[] getBufIfOpen() throws IOException { 63 byte[] buffer = buf; 64 if (buffer == null) 65 throw new IOException("Stream closed"); 66 return buffer; 67 } 68 69 //创建默认大小的缓存输入流,包装指定的输入流 70 //关于装饰器模式即装饰器模式在Java I/O里的应用,请看下一篇博文 71 public BufferedInputStream(InputStream in) { 72 this(in, DEFAULT_BUFFER_SIZE); 73 } 74 75 //创建指定大小的缓存输入流,包装指定的输入流 76 public BufferedInputStream(InputStream in, int size) { 77 super(in); 78 if (size <= 0) { 79 throw new IllegalArgumentException("Buffer size <= 0"); 80 } 81 buf = new byte[size]; 82 } 83 84 /* 85 * fill()函数是整个 BufferedInputStream类的核心所在,我们重点来分析这个函数 86 */ 87 private void fill() throws IOException { 88 byte[] buffer = getBufIfOpen(); 89 90 if (markpos < 0) 91 pos = 0; /* no mark: throw away the buffer */ 92 else if (pos >= buffer.length) 93 /* no room left in buffer */ 94 if (markpos > 0) { /* can throw away early part of the buffer */ 95 int sz = pos - markpos; 96 System.arraycopy(buffer, markpos, buffer, 0, sz); 97 pos = sz; 98 markpos = 0; 99 } 100 else if (buffer.length >= marklimit) { 101 markpos = -1; /* buffer got too big, invalidate mark */ 102 pos = 0; /* drop buffer contents */ 103 } 104 else if (buffer.length >= MAX_BUFFER_SIZE) { 105 throw new OutOfMemoryError("Required array size too large"); 106 } 107 108 else { /* grow buffer */ 109 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? 110 pos * 2 : MAX_BUFFER_SIZE; 111 if (nsz > marklimit) 112 nsz = marklimit; 113 byte nbuf[] = new byte[nsz]; 114 System.arraycopy(buffer, 0, nbuf, 0, pos); 115 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 116 { 117 // Can't replace buf if there was an async close. 118 // Note: This would need to be changed if fill() 119 // is ever made accessible to multiple threads. 120 // But for now, the only way CAS can fail is via close. 121 // assert buf == null; 122 throw new IOException("Stream closed"); 123 } 124 buffer = nbuf; 125 } 126 127 128 count = pos; 129 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 130 if (n > 0) 131 count = n + pos; 132 } 133 134 135 public synchronized int read() throws IOException { 136 // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区 137 if (pos >= count) { 138 fill(); 139 //如果填充操作执行完后pos>=count,这时证明已经无数据向数组中填充 140 if (pos >= count) 141 return -1; 142 } 143 return getBufIfOpen()[pos++] & 0xff;//保持二进制数据一致性 144 } 145 146 private int read1(byte[] b, int off, int len) throws IOException { 147 148 int avail = count - pos; 149 if (avail <= 0) { 150 /* 如果请求的长度至少与缓冲区一样大,并且 151 如果没有标记/重置活动,那么就不要费心去复制 152 字节进入本地缓冲区。 而是直接从流中读取,这其实是一种加快读取的机制*/ 153 if (len >= getBufIfOpen().length && markpos < 0) { 154 return getInIfOpen().read(b, off, len); 155 } 156 // 若已经读完缓冲区中的数据,则调用fill()从输入流读取下一部分数据来填充缓冲区 157 fill(); 158 avail = count - pos; 159 //流中已经没有数可读 160 if (avail <= 0) return -1; 161 } 162 int cnt = (avail < len) ? avail : len; 163 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); 164 pos += cnt; 165 return cnt; 166 } 167 168 // 将缓冲区中的数据写入到字节数组b中。off是字节数组b的起始位置,len是写入长度 169 public synchronized int read(byte b[], int off, int len) 170 throws IOException 171 { 172 getBufIfOpen(); // Check for closed stream 173 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 174 throw new IndexOutOfBoundsException(); 175 } else if (len == 0) { 176 return 0; 177 } 178 179 int n = 0; 180 for (;;) {//不断循环,直到满足读取的长度再跳出 181 int nread = read1(b, off + n, len - n); 182 if (nread <= 0) 183 return (n == 0) ? nread : n; 184 n += nread; 185 if (n >= len) 186 return n; 187 // 如果真的没有可以读取的数据了,此时也会返回 188 InputStream input = in; 189 if (input != null && input.available() <= 0) 190 return n; 191 } 192 } 193 194 195 public synchronized long skip(long n) throws IOException { 196 getBufIfOpen(); // Check for closed stream 197 if (n <= 0) { 198 return 0; 199 } 200 long avail = count - pos; 201 202 if (avail <= 0) { 203 // If no mark position set then don't keep in buffer 204 if (markpos <0) 205 return getInIfOpen().skip(n); 206 207 // Fill in buffer to save bytes for reset 208 fill(); 209 avail = count - pos; 210 if (avail <= 0) 211 return 0; 212 } 213 214 long skipped = (avail < n) ? avail : n; 215 pos += skipped; 216 return skipped; 217 } 218 219 220 public synchronized int available() throws IOException { 221 int n = count - pos; 222 int avail = getInIfOpen().available(); 223 return n > (Integer.MAX_VALUE - avail) 224 ? Integer.MAX_VALUE 225 : n + avail; 226 } 227 228 229 public synchronized void mark(int readlimit) { 230 marklimit = readlimit; 231 markpos = pos; 232 } 233 234 public synchronized void reset() throws IOException { 235 getBufIfOpen(); // Cause exception if closed 236 if (markpos < 0) 237 throw new IOException("Resetting to invalid mark"); 238 pos = markpos; 239 } 240 241 242 public boolean markSupported() { 243 return true; 244 } 245 246 247 public void close() throws IOException { 248 byte[] buffer; 249 while ( (buffer = buf) != null) { 250 if (bufUpdater.compareAndSet(this, buffer, null)) { 251 InputStream input = in; 252 in = null; 253 if (input != null) 254 input.close(); 255 return; 256 } 257 // Else retry in case a new buf was CASed in fill() 258 } 259 } 260 }
对于fill()函数
1 private void fill() throws IOException { 2 byte[] buffer = getBufIfOpen(); 3 4 if (markpos < 0) 5 pos = 0; /* no mark: throw away the buffer */ 6 else if (pos >= buffer.length) 7 /* no room left in buffer */ 8 if (markpos > 0) { /* can throw away early part of the buffer */ 9 int sz = pos - markpos; 10 System.arraycopy(buffer, markpos, buffer, 0, sz); 11 pos = sz; 12 markpos = 0; 13 } 14 else if (buffer.length >= marklimit) { 15 markpos = -1; /* buffer got too big, invalidate mark */ 16 pos = 0; /* drop buffer contents */ 17 } 18 else if (buffer.length >= MAX_BUFFER_SIZE) { 19 throw new OutOfMemoryError("Required array size too large"); 20 } 21 else { /* grow buffer */ 22 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? 23 pos * 2 : MAX_BUFFER_SIZE; 24 if (nsz > marklimit) 25 nsz = marklimit; 26 byte nbuf[] = new byte[nsz]; 27 System.arraycopy(buffer, 0, nbuf, 0, pos); 28 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 29 { 30 // Can't replace buf if there was an async close. 31 // Note: This would need to be changed if fill() 32 // is ever made accessible to multiple threads. 33 // But for now, the only way CAS can fail is via close. 34 // assert buf == null; 35 throw new IOException("Stream closed"); 36 } 37 buffer = nbuf; 38 } 39 40 41 count = pos; 42 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 43 if (n > 0) 44 count = n + pos; 45 }
现在我们先整体观察一下fill方法,这个方法主要是对缓存数组进行操作,可以看到这个方法存在5条判断分支,并且除了抛出异常外,都会执行最后一段代码,所以这里我们可以将fill方法分解成5个小方法分别进行分析!暂时以fill+序号的方式命名这五个函数。
一
1 private void fill0() throws IOException { 2 byte[] buffer = getBufIfOpen(); 3 4 if (markpos < 0) 5 pos = 0; 6 count = pos; 7 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 8 if (n > 0) 9 count = n + pos;
这种情况指的是我们每次将输入流中的数据读取一部分到缓存数组中,当缓存数组中的数据被读完后,此时也没有进行任何标记,那么我们继续将输入流中的数据读取一部分到缓存数组中,结合read方法,我们通过pos >= count来判断缓存数组中的数据是否已经读完,通过markpos<0来判断是否存在标记!getInIfOpen().read用来向缓存数组中放入数据,最后通过count变量记录放入的数据个数。
二
1 private void fill1() throws IOException { 2 if (pos >= buffer.length) 3 /* no room left in buffer */ 4 if (markpos > 0) { /* can throw away early part of the buffer */ 5 int sz = pos - markpos; 6 System.arraycopy(buffer, markpos, buffer, 0, sz); 7 pos = sz; 8 markpos = 0; 9 } 10 count = pos; 11 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 12 if (n > 0) 13 count = n + pos; 14 }
这种情况指的是我们读完了缓存数组中的数据(通过pos >=count判断),数组中也没有多余的空间(通过pos >= buffer.length判断),而缓存中存在标记,我们需要将标记位置到数组结尾的数据保存下来已供读取,如下图:数组下标从0到7.
-1 |
0 data |
1 data |
2 data markpos |
3 data |
4 data |
5 data |
6 data |
7 data |
pos |
标记及其后面的 数据都需要保留
-1 |
0 data |
1data |
2 data |
3 data |
4 data |
5 data |
pos |
|
|
getInIfOpen().read方法; 从输入流中读取出“buffer.length - pos”的数据,然后填充到buffer中
三
1 private void fill2() throws IOException { 2 byte[] buffer = getBufIfOpen(); 3 if (buffer.length >= marklimit) { 4 markpos = -1; /* buffer got too big, invalidate mark */ 5 pos = 0; /* drop buffer contents */ 6 } 7 count = pos; 8 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 9 if (n > 0) 10 count = n + pos; 11 }
这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length>=marklimit,这时缓存区太大了,标记无效,重置markpos和pos即可!
四
1 private void fill3() throws IOException { 2 byte[] buffer = getBufIfOpen(); 3 4 if (markpos >= 0 && pos >= buffer.length) 5 { /* grow buffer */ 6 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? 7 pos * 2 : MAX_BUFFER_SIZE; 8 if (nsz > marklimit) 9 nsz = marklimit; 10 byte nbuf[] = new byte[nsz]; 11 System.arraycopy(buffer, 0, nbuf, 0, pos); 12 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) 13 { 14 15 throw new IOException("Stream closed"); 16 } 17 buffer = nbuf; 18 }
这种情况指的是读取完buffer中的数据,buffer被标记位置=0,buffer中没有多余的空间,并且buffer.length<marklimit,这时我们需要扩充数组,对于
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
这条语句,为方便观察,我们改一下
int nsz=(pos<=MAX_BUFFER_SIZE/2) ?pos*2:MAX_BUFFER_SIZE;
很明显,我们扩充的数组不能超过MAX_BUFFER_SIZE,我们现在来考虑一种情况
,如果再执行fill()方法的过程中一直要执行fill3()方法,那么缓存数组就会越来越大,需要的内存越来越多,所以我们需要一个变量来控制数组的增长,而这个变量就是marklimit, 当buffer>=marklimit时,就不再保存markpos的值了