zoukankan      html  css  js  c++  java
  • JDK源码阅读之PipedInoutStream与PipedOutputStream

    前言:

         在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。如果使用同一个线程处理两个相关联的管道流时,read()方法和write()方法调用时会导致流阻塞,可能会导致线程死锁。

    PipedOutputStream

     1 public class PipedOutputStream extends OutputStream {
     2 //持有一个PipedInputStream对象,PipedOutputStream类里后续很多操作都需要用到此对象
     3     private PipedInputStream sink;
     4 
     5 //构造函数,将本类的对象与一个特定的PipedInputStream对象关联
     6 public PipedOutputStream(PipedInputStream snk)  throws IOException {
     7     connect(snk);
     8 }
     9 
    10 //默认构造函数,创建对象后必须调用connect(PipedInputStream snk)方法才能正常工作
    11 public PipedOutputStream() {
    12 }
    13 
    14 //将“管道输出流” 和 “管道输入流”连接。
    15 public synchronized void connect(PipedInputStream snk) throws IOException {
    16     if (snk == null) {//传入的对象不能为空,否则就抛出异常
    17         throw new NullPointerException();
    18     } else if (sink != null || snk.connected) {//不能重复连接
    19         throw new IOException("Already connected");
    20     }
    21     sink = snk;
    22     //修改连接的PipedInputStream的成员变量, 使其处于已连接状态.以下//三个变量是在PipedInputStream中定义的,将在PipedInputStream中详//细介绍
    23     snk.in = -1;
    24     snk.out = 0;
    25     snk.connected = true;
    26 }
    27 
    28 //
    29 public void write(int b)  throws IOException {
    30     if (sink == null) {
    31          // 确保已经连接
    32         throw new IOException("Pipe not connected");
    33     }
    34     //调用PipedInputStream里的方法
    35     sink.receive(b);
    36 }
    37 
    38 //将字节数组b写入“管道输出流”中。
    39 // 将数组b写入“管道输出流”之后,它会将其传输给“管道输入流”
    40 public void write(byte b[], int off, int len) throws IOException {
    41     if (sink == null) {
    42         throw new IOException("Pipe not connected");
    43     } else if (b == null) {
    44         throw new NullPointerException();
    45     } else if ((off < 0) || (off > b.length) || (len < 0) ||
    46                ((off + len) > b.length) || ((off + len) < 0)) {
    47         throw new IndexOutOfBoundsException();
    48     } else if (len == 0) {
    49         return;
    50     }
    51     /*
    52                以上代码 保证
    53        1. 已经连接
    54        2. 输出数组b不为空
    55        3. off和len不会导致数组越界
    56      */
    57     sink.receive(b, off, len);
    58 }
    59 /*
    60  * 从上可以看出, 两个write方法, 最后都调用了响应的PipedInputStream#receive方法, 这表明
    61 数据存储的地方和写数据的具体逻辑都在PipedInputStream中
    62  */
    63 
    64 
    65 
    66 
    67  /*清空“管道输出流”。
    68          这里会调用“管道输入流”的notifyAll();
    69          目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
    70      */
    71 public synchronized void flush() throws IOException {
    72     if (sink != null) {
    73         synchronized (sink) {
    74             sink.notifyAll();
    75         }
    76     }
    77 }
    78 
    79 /*
    80  * 这个方法就是简单的调用了PipedInputStream的receivedLast()方法, 
    81  * 从方法名可以判断出, 这个方法就是通知PipedInputStream, 数据已经填充完毕.
    82  * 关闭之后,会调用receivedLast()通知“管道输入流”它已经关闭
    83  */
    84 public void close()  throws IOException {
    85     if (sink != null) {
    86         sink.receivedLast();
    87     }
    88 }
    89 }
    View Code

    总结:

     从上面的分析可以看出, PipedOutputStream不会对数据进行实际的操作, 也不承担具体的职责, 只负责把数据交给PipedInputStream处理.

    下面我们接着分析最关键的PipedInputStream的源码

    PipedInputStream

      1 public class PipedInputStream extends InputStream {
      2      // “管道输出流”是否关闭的标记
      3     boolean closedByWriter = false;
      4     // “管道输入流”是否关闭的标记
      5     volatile boolean closedByReader = false;
      6     //是否已经连接的标记
      7     boolean connected = false;
      8     //读线程
      9     Thread readSide;
     10   
     11     /*readSide和writeSide是一种简单的标记读写线程的方式, 源码注释中也有说明这种方式并不可靠, 
     12                  这 种方式针对的应该是两条线程的情况, 所以我们使用的时候应该尽量按照设计意图来使用
     13                 在两条线程中建立"管道"传递数据, 写线程写数据, 读线程读数据.
     14      */
     15     
     16     //写线程
     17     Thread writeSide;
     18 
     19     //管道循环输入缓冲区的默认大小
     20     private static final int DEFAULT_PIPE_SIZE = 1024;
     21 
     22   
     23     protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
     24     
     25     // 放置传入数据的循环缓冲区
     26     protected byte buffer[];
     27 
     28    /* 循环缓冲区中位置的索引,当从连接的管道输出流中接收到下一个数据字节时,会将其存储到该位置。
     29     *  in<0 意味着缓冲区为空, in==out 意味着缓冲区已满,具体原因后面详细解释
     30     */
     31     protected int in = -1;
     32     
     33     
     34     //循环缓冲区中位置的索引,此管道输入流将从该位置读取下一个数据字节
     35     protected int out = 0;
     36 
     37    
     38     
     39     //创建 PipedInputStream,使其连接到管道输出流 src。写入 src 的数据字节可用作此流的输入
     40     public PipedInputStream(PipedOutputStream src) throws IOException {
     41         this(src, DEFAULT_PIPE_SIZE);
     42     }
     43 
     44     
     45    /*创建一个 PipedInputStream,使其连接到管道输出流 src,
     46     * 并对管道缓冲区使用指定的管道大小。 写入 src 的数据字节可用作此流的输入。
     47    */
     48     public PipedInputStream(PipedOutputStream src, int pipeSize)
     49             throws IOException {
     50          initPipe(pipeSize);
     51          connect(src);
     52     }
     53 
     54   //创建尚未 连接的 PipedInputStream。在使用前必须将其 连接到 PipedOutputStream
     55     public PipedInputStream() {
     56         initPipe(DEFAULT_PIPE_SIZE);
     57     }
     58     
     59 
     60    /*创建一个尚未 连接的 PipedInputStream,
     61                并对管道缓冲区使用指定的管道大小。在使用前必须将其 连接到 PipedOutputStream。
     62     */
     63     public PipedInputStream(int pipeSize) {
     64         initPipe(pipeSize);
     65     }
     66     
     67      
     68     
     69     //对byte数组buffer变量进行赋值, 也就是初始化缓冲区域
     70     private void initPipe(int pipeSize) {
     71          if (pipeSize <= 0) {
     72             throw new IllegalArgumentException("Pipe Size <= 0");
     73          }
     74          buffer = new byte[pipeSize];
     75     }
     76 
     77  /*直接调用了PipedOutputStream的connect, 上面已经分析过了, 最终效果就是指明PipedOutputStream的连接对象, 
     78   * 改变connected变量的值, 使得PipedInputStream处于连接状态.
     79   */
     80     public void connect(PipedOutputStream src) throws IOException {
     81         src.connect(this);
     82     }
     83 
     84     
     85     
     86     
     87     /*通过上面PipedOutputStream的分析可以知道, 写数据的方法会调用PipedInputStream的reveive方法
     88      * 
     89      */
     90     protected synchronized void receive(int b) throws IOException {
     91         //检查当前"管道"状态, 确保能够读写数据
     92         checkStateForReceive();
     93         
     94         //本方法由PipedOutputStream所在的线程调用, 所以线程是写线程, 记录该线程
     95         writeSide = Thread.currentThread();
     96         
     97         // in == out表示缓存数组已经满了, 阻塞写线程
     98         // 这里确保了未读的缓存数据不会丢失
     99         if (in == out)
    100             awaitSpace();
    101         
    102         // 当检测到缓存数组有空间, 等待结束后, 会继续执行以下代码
    103         if (in < 0) {//小于0表示缓存中无数据,此时设置读与写的位置,
    104             in = 0;//设置为0是因为要从0号索引开始往缓存中写入数据
    105             out = 0;//设置为0是因为要从0号索引开从缓存中读取数据
    106         }
    107         // 写操作
    108         // 1. 把数据写到目标位置(in)
    109         // 2. 后移in, 指明下一个写数据的位置
    110         buffer[in++] = (byte)(b & 0xFF);//&0xff是为了保证二进制数据的一致性,具体原因跟反码,int和byte的位数有关
    111         if (in >= buffer.length) {// 如果in超出缓存长度, 回到0, 循环利用缓存数组
    112             in = 0;
    113         }
    114     }
    115 
    116   
    117     synchronized void receive(byte b[], int off, int len)  throws IOException {
    118         ///检查当前"管道"状态, 确保能够读写数据
    119         checkStateForReceive();
    120         
    121         //因为这个放在是由PipedOutputStream的对象调用的,所以当前线程为写入线程
    122         writeSide = Thread.currentThread();
    123         
    124         // len是需要写进缓存数据的总长度
    125         // bytesToTransfer用来记录剩余个数
    126         int bytesToTransfer = len;
    127         //循环写入
    128         while (bytesToTransfer > 0) {
    129             //in==out表示缓冲已满,调用awaitSpace()阻塞此线程
    130             if (in == out)
    131                 awaitSpace();
    132             
    133             //记录本次写入过程中写进缓冲中的个数
    134             int nextTransferAmount = 0;
    135             if (out < in) {
    136                 // 因为out必然大于等于0, 所以这里 0 <= out < int
    137                 // out < in 表示[in, buffer.length)和[0, out)两个区间可以写数据
    138                 // 先写数据进[in, buffer.length)区间, 避免处理头尾连接的逻辑, 如果还有数据剩余, 留到下一个循环处理
    139                 nextTransferAmount = buffer.length - in;
    140             } else if (in < out) {
    141                 if (in == -1) {
    142                     //in==-1这表示缓存数组为空
    143                     in = out = 0;//将in和out设为0表示写入数据从0开始,读取也要从零开始
    144                     nextTransferAmount = buffer.length - in;
    145                 } else {
    146                     // in < out 表示[in, out)区间可以写数据
    147                     nextTransferAmount = out - in;
    148                 }
    149             }
    150             /*
    151              * 本次可以写入到缓存中的数据个数比还需要的数据个数要多,修改nextTransferAmount,
    152              * 比如缓存数组中还有5个位置可以写入数据,但此时只需2个数据b[]数组就满了,所以重置
    153              * nextTransferAmount=2,让他再写入2个数据。
    154 
    155              */
    156             if (nextTransferAmount > bytesToTransfer)
    157                 nextTransferAmount = bytesToTransfer;
    158             assert(nextTransferAmount > 0);
    159             //把数据写进缓存
    160             System.arraycopy(b, off, buffer, in, nextTransferAmount);
    161            // 计算剩余个数
    162             bytesToTransfer -= nextTransferAmount;
    163             // 移动数据起点
    164             off += nextTransferAmount;
    165             //移动in
    166             in += nextTransferAmount;
    167             // 如果in超出缓存长度, 回到0
    168             if (in >= buffer.length) {
    169                 in = 0;
    170             }
    171         }
    172     }
    173 
    174     /*
    175      *在写数据前会先通过checkStateForReceive检查"管道"状态, 确保
    176                   当前处于连接状态
    177                   管道读写两端都没有被关闭
    178                   读线程状态正常
    179      */
    180     private void checkStateForReceive() throws IOException {
    181         if (!connected) {
    182             throw new IOException("Pipe not connected");
    183         } else if (closedByWriter || closedByReader) {
    184             throw new IOException("Pipe closed");
    185         } else if (readSide != null && !readSide.isAlive()) {
    186             throw new IOException("Read end dead");
    187         }
    188     }
    189     
    190     
    191       /*
    192                      判断目标位置(in), 如果in == out表明当前缓存数组已经满了, 
    193                    不能再写数据了, 所以会通过awaitSpace()方法阻塞写线程;
    194        */
    195     private void awaitSpace() throws IOException {
    196         // 只有缓存数组已满才需要等待
    197         while (in == out) {
    198             // 检查管道状态, 防止在等待的过程中状态发生变化
    199             checkStateForReceive();
    200             //因为Java推荐仅使用读写两条线程,所以这里可以来理解为唤醒读线程
    201             notifyAll();
    202             try {
    203                  // 释放对象锁, 等待读线程读数据, 调用后就会阻塞写线程
    204                  // 1s后取消等待是为了再次检查管道状态
    205                  // 注意等待结束后, 锁仍然在写线程
    206                 wait(1000);
    207             } catch (InterruptedException ex) {
    208                 throw new java.io.InterruptedIOException();
    209             }
    210         }
    211     }
    212 
    213     //当输入端关闭时(调用PipedOutputStream#close()), 会调用receivedLast()
    214     //该方法使用变量标记输入端已经关闭, 表示不会有新数据写入了.
    215     synchronized void receivedLast() {
    216         closedByWriter = true;//此方法由PipedOutputStream对象调用,代表由writer线程关闭
    217         notifyAll();
    218     }
    219 
    220    
    221     public synchronized int read()  throws IOException {
    222         
    223         //检查状态
    224         if (!connected) {
    225             throw new IOException("Pipe not connected");
    226         } else if (closedByReader) {
    227             throw new IOException("Pipe closed");
    228         } else if (writeSide != null && !writeSide.isAlive()
    229                    && !closedByWriter && (in < 0)) {
    230              // 为什么是in<0?因为如果in >= 0, 表示还有数据没有读, 所以不抛出异常
    231             // 这个判断表明了, 即使输入端已经调用了close, 也能继续读已经写入的数据
    232             throw new IOException("Write end dead");
    233         }
    234         //由PipedInoutStream对象所在的线程调用,所以此时当前线程为读取线程
    235         readSide = Thread.currentThread();
    236         int trials = 2;
    237         while (in < 0) {
    238               // in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0
    239             if (closedByWriter) {
    240                 /* closed by writer, return EOF */
    241                 return -1;
    242             }
    243             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
    244                 throw new IOException("Pipe broken");
    245             }
    246             /* 可以理解为等待写入线程 */
    247             notifyAll();
    248             try {
    249                 // 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据
    250                 wait(1000);
    251             } catch (InterruptedException ex) {
    252                 throw new java.io.InterruptedIOException();
    253             }
    254         }
    255         // 执行到这里证明in >= 0, 即缓存数组中有数据
    256         // 关键的读操作
    257         // 1. 读取out指向的byte数据
    258         // 2. 后移out
    259         // 3. 把byte转成int, 高位补0,保证数据的一致性
    260         int ret = buffer[out++] & 0xFF;
    261         if (out >= buffer.length) {
    262             out = 0;
    263         }
    264         if (in == out) {
    265              // 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1
    266             in = -1;
    267         }
    268 
    269         return ret;
    270     }
    271 
    272    
    273     public synchronized int read(byte b[], int off, int len)  throws IOException {
    274         
    275         //执行判断,确保可以正常读写
    276         if (b == null) {
    277             throw new NullPointerException();
    278         } else if (off < 0 || len < 0 || len > b.length - off) {
    279             throw new IndexOutOfBoundsException();
    280         } else if (len == 0) {
    281             return 0;
    282         }
    283 
    284         /* 先读取一个数据是为了确保有数据可以,如果此时无数据可读,就会阻塞当前线程,唤醒写线程 */
    285         int c = read();
    286         if (c < 0) {//其实如果c<0,c就只能等于-1
    287             return -1;
    288         }
    289         b[off] = (byte) c;//这里不&0xff是因为已经在read()方法里转换了
    290         int rlen = 1;
    291         // in >= 0确保还有数据可以读
    292         // len > 1确保只读取外部请求的数据长度, 因为上面已经读了1个数据, 所以是大于1, 而不是大于0
    293         while ((in >= 0) && (len > 1)) {
    294             
    295              // available用来记录当前可以读取的数据
    296             int available;
    297 
    298             if (in > out) {
    299                   // in > out表示[out, in)区间数据可读,感觉这里有点多余,因为在receive方法中,只要in>length
    300                 //in就会被设为0
    301                 available = Math.min((buffer.length - out), (in - out));
    302             } else {
    303                 // 首先in是不会等于out的, 因为如果相等, 在上面读第一个数据的时候就会把in赋值-1, 也就不会进入这个循环
    304                 // 当in < out表示[out, buffer.length)和[0, in)两个区间的数据可读
    305                 // 和receive方法类似, 为了不处理跨边界的情况, 先读[out, buffer.length)区间数据
    306                 available = buffer.length - out;
    307             }
    308 
    309             //   外部已经读了一个数据, 所以只需要读(len - 1)个数据了
    310             if (available > (len - 1)) {
    311                 available = len - 1;
    312             }
    313             System.arraycopy(buffer, out, b, off + rlen, available);
    314             out += available;
    315             rlen += available;
    316             len -= available;
    317 
    318             if (out >= buffer.length) {
    319                 out = 0;
    320             }
    321             if (in == out) {
    322                 /* now empty */
    323                 in = -1;
    324             }
    325         }
    326         return rlen;
    327     }
    328 
    329  
    330     public synchronized int available() throws IOException {
    331         if(in < 0)
    332             return 0;
    333         else if(in == out)
    334             return buffer.length;
    335         else if (in > out)
    336             return in - out;
    337         else
    338             return in + buffer.length - out;
    339     }
    340 
    341   
    342     public void close()  throws IOException {
    343         closedByReader = true;
    344         synchronized (this) {
    345             in = -1;
    346         }
    347     }
    348 }
    View Code

    总结:

          现在来解释一下receive和read方法交替执行中缓存数组发生的变化,如下图,这是一个大小为9的缓存数组!下标从0到8

       

    -1

    0

    1

    2

    3

    4

    5

    6

    7

    8

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

         在初始状态,int==-1,out==0

     

    -1

    0

    1

    2

    3

    4

    5

    6

    7

    8

     

    out

     

     

     

     

     

     

     

     

    In

     

     

     

     

     

     

     

     

     

         当我们第一次调用receive方法时,将执行此方法里的以下代码

     

     1 if (in < 0) {//小于0表示缓存中无数据,此时设置读与写的位置,
     2             in = 0;//设置为0是因为要从0号索引开始往缓存中写入数据
     3             out = 0;//设置为0是因为要从0号索引开从缓存中读取数据
     4         }
     5         // 写操作
     6         // 1. 把数据写到目标位置(in)
     7         // 2. 后移in, 指明下一个写数据的位置
     8         buffer[in++] = (byte)(b & 0xFF);//&0xff是为了保证二进制数据的一致性,具体原因跟反码,int和byte的位数有关
     9         if (in >= buffer.length) {// 如果in超出缓存长度, 回到0, 循环利用缓存数组
    10             in = 0;
    11         }
    View Code

        

    -1

    0

    1

    2

    3

    4

    5

    6

    7

    8

     

    out

     

     

     

     

     

     

     

     

     

    in

     

     

     

     

     

     

     

     

    设置为0是因为要从0号索引开始往缓存中写入数据,然后写入数据

    -1

    0

    1

    2

    3

    4

    5

    6

    7

    8

     

    out

     

     

     

     

     

     

     

     

     

    data

    data

    data

    data

    data

    data

    dat

    data

    in

    如果in超出缓存长度, 回到0,如果此时再调用receive方法,就会执行此方法的以下代码,(假设不会发生异常)

     if (in == out)
                awaitSpace();

    这也就是为什么in==out表示数据已经写满缓存数组了,awaitSpace()会阻塞此进程,唤醒读线程,让他读取数组中的数据。

    接下来观察read方法执行过程,当read所在的线程被唤醒后,因为此时数组中存在数据,那么就会执行方法内的以下代码:

     int ret = buffer[out++] & 0xFF;
            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                 // 读取的数据追上了输入的数据, 则当前缓存区域为空, 所以设置in = -1
                in = -1;
            }

    -1

    0

    1

    2

    3

    4

    5

    6

    7

    8

     

     

     

     

     

     

    out

     

     

     

     

     

     

     

     

     

    data

    dat

    data

    in

    当out==in时,此时说明数据已经被读取完,将in设为-1是为了接下来调用receive方法来继续往缓存中写入数据,如果继续调用read方法,就会执行此方法内的以下代码:

    //不考虑异常,假设所有线程均正常工作

     while (in < 0) {
                  // in<0表示缓存区域为空, 只要输入端没有被关闭, 阻塞线程等待数据写入, 即等待in >= 0
                if (closedByWriter) {
                    /* closed by writer, return EOF */
                    return -1;
                }
                if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                    throw new IOException("Pipe broken");
                }
                /* 可以理解为等待写入线程 */
                notifyAll();
                try {
                    // 阻塞线程, 等待1s, 这里会释放锁, 给机会写线程获取锁, 写数据
                    wait(1000);
                } catch (InterruptedException ex) {
                    throw new java.io.InterruptedIOException();
                }

    此后唤醒写线程,执行receive方法。

  • 相关阅读:
    关键路径 图论 ——数据结构课程
    vue+flvjs实现flv格式视频流在线播放
    antdvue时间选择范围TimePicker 的使用,实现对应时间的禁用
    SVN(Subversion)中文站相关网址
    系统重启后ircdircu无法启动问题解决
    在Windows上安装Python+MySQL 的常见问题及解决方法
    windows 配置 pygraphviz
    C# 多线程操作TreeView
    ubuntu下解压rar文件乱码问题的解决
    将jar文件生成maven相关文件
  • 原文地址:https://www.cnblogs.com/lls101/p/11130928.html
Copyright © 2011-2022 走看看