1. 概述:
PipedReader 和 PipedWriter,意为管道读写流。所谓管道,那就是有进有出,所以这也是它们跟其它流对象最显著的区别:PipedReader和PipedWriter必须成对使用才有意义。
2. 角色扮演:
PipedWriter 扮演生产者的角色,将字符数据写入到管道;PipedReader扮演消费者的角色,负责将数据从管道取出消费掉。
3. 数据结构:
要实现一个管道,至少需要三个要素:一个线性存储结构、一个写入位置标记、一个读取位置标记。java中把这三个要素放在了PipedReader中:
从代码注释可以看出,buffer[] 就是存储数据的线性结构、in 是写入的位置标记、out是读取的位置标记。"The circular buffer" 说明buffer数组是循环使用,那么肯定会牵涉到数据覆盖的问题,后文中会讲到怎么解决。注释中还说:in<0 表示管道为空(没有数据),in==out 表示管道满了。
4. 构造:
4.1. PipedWriter有两个构造方法,一个无参构造方法,一个带有一个PipedReader对象的构造方法。使用前者时只是单单构造一个Writer对象,没有建立连接,需要后续手动调用connect方法建立连接才能write数据;使用后者时会在构造完成时调用connect方法建立管道连接:
4.2. PipedReader有4个构造方法,两个参数可选:需要建立连接的Writer对象和管道长度(默认1024):
可以看出,PipedWriter和PipedReader都提供了带有连接对象参数的构造函数,所以实际应用中,他们之间构造的先后顺序可以自己决定。
5. 建立连接:
PipedReader中保存着连接标志: boolean connected = false; PipedWriter中保存着对PipedReader的引用: private PipedReader sink; 两个类中都有 connect 方法,而其实 PipedReader 中的 conncet 方法最终也是通过调用 PipedWriter 中的 connect 方法,如下:
我们看到,连接方法做了以下几件事:
5.1. 异常检查,包括:为空检查、已连接检查、已关闭检查(通过同步机制,保证这些检查的结果是可信的);
5.2. 建立 PipedWriter 对 PipedReader 的引用;
5.3. 初始化管道位置标记;
5.4. 将连接标志置为 true;
6. 写入数据:
PipedWriter 有两个 write 方法,最终都是调用 PipedReader 的 receive 方法实现的(其实这很自然,因为 buffer 本身就是放在PipedReader中的),所以写入数据的核心方法是 synchronized void receive(int c) throws IOException :
从代码可以看出:
6.1. 写入方法是同步的。如果不同步,那么可能导致不同线程写入的数据相互覆盖;
6.2. 写入方法是阻塞的。当管道已满却要写入数据的时候,会首先唤醒监听当前对象锁的所有线程(当然也包括读取线程)让他们进入就绪态,准备竞争锁资源;然后让当前执行写入的线程放弃对象锁,沉睡1秒。这样一来,才有可能让该对象锁上的读取线程从管道读取数据,将管道腾出部分或者全部的空间,当前线程才能继续写入数据,结束阻塞状态。
7. 读取数据:
PipedReader有两个 read 方法,分别是读取单个字符和多个,大同小异,下面是读取单个字符的方法:
从代码可以看出:
7.1. read 方法是同步的。
7.2. read 方法是阻塞的。当管道已空却需要读取数据时,首先唤醒当前对象锁上的其它线程(包括写入线程);然后让当前执行读取的线程沉睡1秒,放弃锁资源,这样一来才有可能让该对象锁上的写入线程写入数据,当前线程才有数据可读,结束阻塞状态。
8. 写入结束和关闭管道:
可以从Writer或者Reader任意一方关闭管道, closeByReader 和 closeByWriter 会标记从哪一方关闭的,并且若是从 Writer 方关闭的,会调用 PipedReader 的 synchronized void receivedLast() 方法,从而唤醒所有线程。
9. 总结:
9.1. PipedReader 和 PipedWriter 需要成对使用,才能建立管道;
9.2. 管道是通过一个循环使用的字符数组实现的;
9.3. write 和 read 都是同步的;
9.4. write 和 read 都是阻塞的(write 时管道已满就阻塞,read时管道已空就阻塞);
10. 小小实践: