上一篇介绍了关于字节输入输出流的Java类框架,同时也简单介绍了一下各个类的作用,下面就来具体看一下这些类是怎么实现这些功能的。
1、InputStream和OutputStream
InputStream类的源代码如下:
public abstract class InputStream implements Closeable { private static final int MAX_SKIP_BUFFER_SIZE = 2048;//最多可以跳过字节的数量 // 获取下一个字节数据并返回int值(范围0~255),如果流结束,返回-1 public abstract int read() throws IOException; public int read(byte b[]) throws IOException {//读取一个字节,返回值为所读得字节 return read(b, 0, b.length); } //读取len个字节,放置到以下标off开始字节数组b中,返回值为实际 读取的字节的数量 public int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int c = read(); if (c == -1) { return -1; } b[off] = (byte)c; int i = 1; try { for (; i < len ; i++) { c = read(); if (c == -1) { break; } b[off + i] = (byte)c; } } catch (IOException ee) { } return i; } //读指针跳过n个字节不读,返回值为实际跳过的字节数量 public long skip(long n) throws IOException { long remaining = n; int nr; if (n <= 0) { return 0; } int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining); byte[] skipBuffer = new byte[size]; while (remaining > 0) { nr = read(skipBuffer, 0, (int)Math.min(size, remaining)); if (nr < 0) { break; } remaining -= nr; } return n - remaining; } // 返回值为流中尚未读取的字节的数量,这个方法应该被子类覆写 public int available() throws IOException { return 0; } public void close() throws IOException {} // 纪录当前指针的所在位置. // readlimit参数表示读指针读出的readlimit个字节后 所标记的指针位置才实效。 public synchronized void mark(int readlimit) {} //把读指针重新指向用mark方法所记录的位置 public synchronized void reset() throws IOException { throw new IOException("mark/reset not supported"); } //当前的流是否支持读指针的记录功能 public boolean markSupported() { return false; } }
来解释一下mark()和reset()方法,如下图所示。
讲到具体的类时会进行详细的解说。
下面来看InputStream的源代码,如下:
public abstract class OutputStream implements Closeable, Flushable { /** The byte to be written is the eight low-order bits of the argument b. The 24 * high-order bits of b are ignored. */ public abstract void write(int b) throws IOException; public void write(byte b[]) throws IOException { write(b, 0, b.length); } public void write(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } for (int i = 0 ; i < len ; i++) { write(b[off + i]); } } public void flush() throws IOException { } public void close() throws IOException { } }
如上主要就是将字节写入到输出流中,具体的写入方法write(int b)是一个抽象方法,取决于具体的实现类的实现。
2、PipedInputStream和PipedOutputStream
PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.
PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东西.还有两个变量in和out。in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.
在两者的构造函数中,都相互提供了连接的构造方法,分别用于接收对方的管道实例,然后调用各自的connect()方法进行连接,如PipedInputStream:
// PipedInputStream public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } public void connect(PipedOutputStream src) throws IOException { src.connect(this); }
同时还可以指定缓冲区的大小,看PipedOutputStream:
private PipedInputStream sink; public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } public PipedOutputStream() { } // PipedOutputStream public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
没有谁连接谁的规定,只要连接上,效果是一样的。来看输出管道中的write()方法,如下:
public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
方法在写入到byte[]数组缓存区数据后,就会调用输出管道中的receive方法。输出管道中的receive()方法如下:
/** * Receives a byte of data. This method will block if no input is * available. */ protected synchronized void receive(int b) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); if (in == out)//in==out implies the buffer is full awaitSpace(); if (in < 0) {//输入管道无数据 in = 0; out = 0; } buffer[in++] = (byte)(b & 0xFF); if (in >= buffer.length) { in = 0;// 缓冲区已经满了,等待下一次从头写入 } } /** * Receives data into an array of bytes. This method will * block until some input is available. */ synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out) awaitSpace(); int nextTransferAmount = 0; if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } }
输入管理通过如上的对应方法接收到数据并保存到输入缓冲区后,下面就可以使用read()方法读出这些数据了,如下:
public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } /** * Reads up to len bytes of data from this piped input stream into an array of bytes. Less than len bytes * will be read if the end of the data stream is reached or if len exceeds the pipe's buffer size. * If len is zero, then no bytes are read and 0 is returned;otherwise, the method blocks until * at least 1 byte of input is available, end of the stream has been detected, or an exception is */ public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; }
下面来具体举一个例子,如下:
public class test04 { public static void main(String [] args) { Sender sender = new Sender(); Receiver receiver = new Receiver(); PipedOutputStream outStream = sender.getOutStream(); PipedInputStream inStream = receiver.getInStream(); try { //inStream.connect(outStream); // 与下一句一样 outStream.connect(inStream); } catch (Exception e) { e.printStackTrace(); } sender.start(); receiver.start(); } } class Sender extends Thread { private PipedOutputStream outStream = new PipedOutputStream(); public PipedOutputStream getOutStream() { return outStream; } public void run() { String info = "hello, receiver"; try { outStream.write(info.getBytes()); outStream.close(); } catch (Exception e) { e.printStackTrace(); } } } class Receiver extends Thread { private PipedInputStream inStream = new PipedInputStream(); public PipedInputStream getInStream() { return inStream; } public void run() { byte[] buf = new byte[1024]; try { int len = inStream.read(buf); System.out.println("receive message from sender : " + new String(buf, 0, len)); inStream.close(); } catch (Exception e) { e.printStackTrace(); } } }
最后运行后输出的结果如下:receive message from sender : hello, receiver
3、ByteArrayInputStream和ByteArrayOutputStream
先来看ByteArrayOutputStream类中write()方法:
// Writes the specified byte to this byte array output stream. public synchronized void write(int b) { ensureCapacity(count + 1); buf[count] = (byte) b; count += 1; } /** * Writes len bytes from the specified byte array * starting at offset off to this byte array output stream. */ public synchronized void write(byte b[], int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { throw new IndexOutOfBoundsException(); } ensureCapacity(count + len); System.arraycopy(b, off, buf, count, len); count += len; }
这个类也是通过向数组中定入值来进行数值传递的,而字节数组的大小可以在创建ByteArrayOutputStream时通过构造函数指定,默认的大小为32.如果在调用write()方法时,都会确保字节数组的容量。如果过小,会自动进行扩容操作。这样就可以把需要的数据写到字节数组中去了。还可以通过调用writeTo()方法可以写入到其他输出流中,源代码如下:
/** * Writes the complete contents of this byte array output stream to * the specified output stream argument, as if by calling the output * stream's write method using out.write(buf, 0, count). */ public synchronized void writeTo(OutputStream out) throws IOException { out.write(buf, 0, count); }
继续来看ByteArrayInputStream类,这个类可以从字节数组中读出数据,具体的源代码如下:
public synchronized int read() { return (pos < count) ? (buf[pos++] & 0xff) : -1; } public synchronized int read(byte b[], int off, int len) { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } if (pos >= count) { return -1; } int avail = count - pos; if (len > avail) { len = avail; } if (len <= 0) { return 0; } System.arraycopy(buf, pos, b, off, len); pos += len; return len; }
同时也提供了其他的一些方法,如可以跳读字节的skip()方法、查看剩余有效字节的avaible()方法等等,有兴趣的可以自己去看。下面来举一个具体应用的例子,如下:
byte[] bytes = { 0,2, 3, 4, 5 }; try (ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayInputStream in = new ByteArrayInputStream(bytes);){ out.write(bytes); System.out.println(out.size());//5 System.out.println(in.read());//解 in.skip(1);//2 in.mark(4); System.out.println(in.read());//3 in.reset();// 从索引为2的地方重新开始读 System.out.println(in.read());//3 System.out.println(in.read()); } catch (IOException e) { e.printStackTrace(); }
4、StringBufferInputStream
这个类现在已经不提倡使用了,个人觉得是因为编码的原因吧。查看这个类后的源代码,如下:
public synchronized int read() { return (pos < count) ? (buffer.charAt(pos++) & 0xFF) : -1; } public synchronized int read(byte b[], int off, int len) { if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (pos >= count) { return -1; } if (pos + len > count) { len = count - pos; } if (len <= 0) { return 0; } String s = buffer; int cnt = len; while (--cnt >= 0) { b[off++] = (byte)s.charAt(pos++); } return len; }
发现,其实这个类是将字符串中的字符转换为字节进行读取的,如果这个字符串的字符全部为ISO-8859-1编码所能表示的,那肯定能正常读取。但是通常Java都是Unicode编码,两个字符,所以如果其中出现了Unicode字符的时候,例如中文,读取就会不准确。如下举例:
String str = "马智AB"; StringBufferInputStream st = new StringBufferInputStream(str); byte[] j = new byte[16]; st.read(j); System.out.println(new String(j)); //lzAB
原因可能大家也知道了,两个read()方法在获取到这个字符串的字符(s.charAt())后,强制转换为byte或与0xff相与,这样的结果只能导致取到低8位的编码,而对于两个字节编码的汉字来说,肯定会产生错误。