zoukankan      html  css  js  c++  java
  • 从PipedInputStream/PipedOutputStream谈起

    本篇主要从分析PipeInputStremPipedOutputStream谈起。谈及软件设计的变化,以及如何将软件拆分、组合,适配……

    源代码分析

           下面将详细分析PipedInputStreamPipedOutputStream的源代码。

    1.1 PipedInputStream

     

    package java.io;

    //PipedInputStream必须和PipedOutputStream联合使用。即必须连接输入部分。

    //其原理为:PipedInputStream内部有一个Buffer

    //PipedInputStream可以使用InputStream的方法读取其Buffer中的字节。

    //PipedInputStreamBuffer中的字节是PipedOutputStream调用PipedInputStream的方法放入的。

     

    public class PipedInputStream extends InputStream {

        boolean closedByWriter = false;                                                             //标识有读取方或写入方关闭

        volatile boolean closedByReader = false;

        boolean connected = false;                                                                     //是否建立连接

        Thread readSide;                                                                                             //标识哪个线程

        Thread writeSide;

     

        protected static final int PIPE_SIZE = 1024;                         //缓冲区的默认大小

        protected byte buffer[] = new byte[PIPE_SIZE];                  //缓冲区

        protected int in = -1;               //下一个写入字节的位置。0代表空,in==out代表满

        protected int out = 0;               //下一个读取字节的位置

     

        public PipedInputStream(PipedOutputStream src) throws IOException {                //给定源的输入流

                       connect(src);

        }

     

        public PipedInputStream() {    }                                                //默认构造器,下部一定要connect

     

        public void connect(PipedOutputStream src) throws IOException {               //连接输入源

                       src.connect(this);                                                                           //调用源的connect方法连接当前对象

        }

     

        protected synchronized void receive(int b) throws IOException {                   //只被PipedOuputStream调用

            checkStateForReceive();                                                                                 //检查状态,写入

            writeSide = Thread.currentThread();                                                      //永远是PipedOuputStream

            if (in == out)     awaitSpace();                                                           //输入和输出相等,等待空间

             if (in < 0) {

                 in = 0;

                 out = 0;

             }

             buffer[in++] = (byte)(b & 0xFF);                                                             //放入buffer相应的位置

             if (in >= buffer.length) {      in = 0;         }                                             //in0表示buffer已空

        }

     

        synchronized void receive(byte b[], int off, int len)  throws IOException {

            checkStateForReceive();

            writeSide = Thread.currentThread();                                   //PipedOutputStream可以看出

            int bytesToTransfer = len;

            while (bytesToTransfer > 0) {

                if (in == out)    awaitSpace();                                 //满了,会通知读取的;空会通知写入

                int nextTransferAmount = 0;

                if (out < in) {

                    nextTransferAmount = buffer.length - in;

                } else if (in < out) {

                    if (in == -1) {

                        in = out = 0;

                        nextTransferAmount = buffer.length - in;

                    } else {

                        nextTransferAmount = out - in;

                    }

                }

                if (nextTransferAmount > bytesToTransfer)     nextTransferAmount = bytesToTransfer;

                assert(nextTransferAmount > 0);

                System.arraycopy(b, off, buffer, in, nextTransferAmount);

                bytesToTransfer -= nextTransferAmount;

                off += nextTransferAmount;

                in += nextTransferAmount;

                if (in >= buffer.length) {     in = 0;      }

            }

        }

     

        private void checkStateForReceive() throws IOException {                           //检查当前状态,等待输入

            if (!connected) {

                throw new IOException("Pipe not connected");

            } else if (closedByWriter || closedByReader) {

                 throw new IOException("Pipe closed");

             } else if (readSide != null && !readSide.isAlive()) {

                throw new IOException("Read end dead");

            }

        }

     

        private void awaitSpace() throws IOException {                                              //Buffer已满,等待一段时间

             while (in == out) {                                                                                             //in==out表示满了,没有空间

                 checkStateForReceive();                                                                       //检查接受端的状态

                 notifyAll();                                                                                  //通知读取端

                 try {

                     wait(1000);

                 } catch (InterruptedException ex) {

                       throw new java.io.InterruptedIOException();

                 }

             }

        }

     

        synchronized void receivedLast() {                  //通知所有等待的线程()已经接受到最后的字节

             closedByWriter = true;                             //

             notifyAll();

        }

     

        public synchronized int read()  throws IOException {

            if (!connected) {                                                                              //检查一些内部状态

                throw new IOException("Pipe not connected");

            } else if (closedByReader) {

                 throw new IOException("Pipe closed");

             } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {

                throw new IOException("Write end dead");

             }

            readSide = Thread.currentThread();                                            //当前线程读取

             int trials = 2;                                                                                             //重复两次????

             while (in < 0) {

                 if (closedByWriter) {              return -1;        }                 //输入断关闭返回-1

                 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {          //状态错误

                       throw new IOException("Pipe broken");

                 }

                 notifyAll();                                                             // 空了,通知写入端可以写入

                 try {

                     wait(1000);

                 } catch (InterruptedException ex) {

                       throw new java.io.InterruptedIOException();

                 }

            }

             int ret = buffer[out++] & 0xFF;                                                        //

             if (out >= buffer.length) {             out = 0;                }

             if (in == out) {           in = -1;                 }                             //没有任何字节

             return ret;

        }

     

        public synchronized int read(byte b[], int off, int len)  throws IOException {

         if (b == null) {                                                                                 //检查输入参数的正确性

             throw new NullPointerException();

         } else if (off < 0 || len < 0 || len > b.length - off) {

             throw new IndexOutOfBoundsException();

         } else if (len == 0) {

             return 0;

         }

         int c = read();                                                                                 //读取下一个

         if (c < 0) {    return -1;       }                                             //已经到达末尾了,返回-1

         b[off] = (byte) c;                                                                    //放入外部buffer

         int rlen = 1;                                                                            //return-len

         while ((in >= 0) && (--len > 0)) {                                          //下一个in存在,且没有到达len

             b[off + rlen] = buffer[out++];                                         //依次放入外部buffer

             rlen++;

             if (out >= buffer.length) {         out = 0;           }        //读到buffer的末尾,返回头部

             if (in == out) {     in = -1;      }               //读、写位置一致时,表示没有数据

         }

         return rlen;                                                                            //返回填充的长度

        }

     

        public synchronized int available() throws IOException {             //返回还有多少字节可以读取

             if(in < 0)

                 return 0;                                                                                         //到达末端,没有字节

             else if(in == out)

                 return buffer.length;                                                               //写入的和读出的一致,表示满

             else if (in > out)

                 return in - out;                                                                                 //写入的大于读出

             else

                 return in + buffer.length - out;                                                //写入的小于读出的

        }

     

        public void close()  throws IOException {                //关闭当前流,同时释放与其相关的资源

             closedByReader = true;                                             //表示由输入流关闭

            synchronized (this) {     in = -1;    }        //同步化当前对象,in-1

        }

    }

     

    1.2 PipedOutputStream

    // PipedOutputStream一般必须和一个PipedInputStream连接。共同构成一个pipe

    //它们的职能是:

     

    package java.io;

    import java.io.*;

     

    public class PipedOutputStream extends OutputStream {

        private PipedInputStream sink;                //包含一个PipedInputStream

     

        public PipedOutputStream(PipedInputStream snk)throws IOException {       //带有目的地的构造器

                       connect(snk);

        }

       

        public PipedOutputStream() {  }                      //默认构造器,必须使用下面的connect方法连接

       

        public synchronized void connect(PipedInputStream snk) throws IOException {

            if (snk == null) {                                                                    //检查输入参数的正确性

                throw new NullPointerException();

            } else if (sink != null || snk.connected) {

                 throw new IOException("Already connected");

             }

             sink = snk;                                                                           //一系列初始化工作

             snk.in = -1;

             snk.out = 0;

            snk.connected = true;

        }

     

        public void write(int b) throws IOException {                        //向流中写入数据

            if (sink == null) {    throw new IOException("Pipe not connected");      }

             sink.receive(b);            //本质上是,调用PipedInputStreamreceive方法接受此字节

        }

     

        public void write(byte b[], int off, int len) throws IOException {

            if (sink == null) {                                                                   //首先检查输入参数的正确性

                throw new IOException("Pipe not connected");

            } else if (b == null) {

                 throw new NullPointerException();

             } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {

                 throw new IndexOutOfBoundsException();

             } else if (len == 0) {

                 return;

             }

             sink.receive(b, off, len);                                                                 //调用PipedInputStreamreceive方法接受

        }

     

        public synchronized void flush() throws IOException {                 //flush输出流

             if (sink != null) {

                synchronized (sink) {     sink.notifyAll();     } //本质是通知输入流,可以读取

             }

        }

     

        public void close()  throws IOException {                         //关闭流同时释放相关资源

             if (sink != null) {    sink.receivedLast();         }

        }

    }

     

    2 Buffer的状态

           上图是PipedInputStream中缓存的状态图。在程序中我们利用了byte数组,循环地向其中写入数据,写入有一个cursorin),读出也有一个cursorout)。上图表示inout不同位置时,buffer中的各个位置的状态。蓝色的代表可以读取的字节。白色的表示此位置没有字节,或者此位置已经被PipedInputStream读取了。

    交互简图

           下图是从源代码部分转换过来的关于PipedInputStreamPipedOutputStream的交互图。

     

     

           从图中可以看出:

    1.         整个PipedInputStream是这对管道的核心。管道本身是一个byte的数组。

    2.         PipedOutputStream对象通过Delegate方法复用PipedInputStream,同时屏蔽了其中的读取的方法,我们仅仅可以构造PipedOutputStream对象。(从这一点可以看出Delegate复用比继承复用的优越性了!)从设计模式的角度更象Adapter――PipedInputStream本身提供读取和写入的功能,将写入的功能适配到OutputStream,就成为一个PipedOutputStream。这样就形成一个类,适配后形成两种功能的类。

    3.         调用PipedOutputStream的连接方法实际就是调用PipedInputStream的连接方法。

    4.         调用PipedOutputStream的写相关的方法实际就是调用PipedInputStream的对应方法。

    以上也是一种适配,将管道的概念适配到流的概念,同时将两者的职能分开。

     

    Chanel放入PipedOutputStream

           上面的例子中,Chanel放在PipedInputStream中,我们仔细思考后可以顺理成章地将其Chanel放入PipedOutputStream中。请注意synchronized方法是得到哪个字节流的锁!!

    5 Chanel移出的一个例子

           在上面两个例子中Buffer要么在写入对象的内部,要么在读取对象的内部。主要通过适配该对象的方法,达到自己的需求而已。下面是一个一般的例子――将Chanel移出,Chanel提供了写入与读取的功能。这也完全合乎OO的“Single Responsibility Protocol――SRP”。输入部分使用Delegate复用此Chanel,将其适配至InputStreamOutputStream。下面是简单的Source code

    //PipedChanel.java

           import java.io.IOException ;

     

    public class PipedChanel {

        protected static final int PIPE_SIZE = 1024;

        protected byte buffer[] = new byte[PIPE_SIZE];   

        protected int in = -1;

    protected int out = 0;

       

        public PipedChanel(){  }       

        public PipedChanel(int size){

               buffer = new byte[size]  ; 

        }   

       

        public synchronized int read() throws IOException {    }       

        public synchronized int read(byte b[], int off, int len)  throws IOException {    }   

        public synchronized int available() throws IOException {}   

        public synchronized void close()  throws IOException {}       

       

    public synchronized void write(int b)  throws IOException {}

    public synchronized void write(byte b[]) throws IOException {}

        public synchronized void write(byte b[], int off, int len) throws IOException {}         

    public synchronized void flush() throws IOException {}       

     

        public void waitWhileFull(){    }            //Chanel已经满了,写线程等待

    public void waitWhileEmpty{    }        //Chanel为空,读取线程等待

    //以上是两个操作Chanel时的状态相关的方法。

    //是一致性编程部分,典型的设计模式。

    //这两个方法,包含在对应读或写方法的最前面。

    }

     

     

           // PipedChanelInputStream.java

    import java.io.*;

     

    public class PipedChanelInputStream extends InputStream {

           private PipedChanel chanel ;

          

           public PipedChanelInputStream(PipedChanel chanel){

                  this.chanel = chanel ;

           }

          

           public int read() throws IOException {

                  return chanel.read();

           }   

       

        public  int read(byte b[], int off, int len)  throws IOException {

               return chanel.read(b,off,len);

        }

       

        public  int available() throws IOException {

               return chanel.available();

        }

       

        public  void close()  throws IOException {

               chanel.close();      

        }        

          

    }

     

     

           // PipedChanelOutputStream.java

    import java.io.*;

     

    public class PipedChanelOutputStream extends OutputStream {

           private PipedChanel chanel ;

          

           public PipedChanelOutputStream(PipedChanel chanel){

                  this.chanel = chanel ;

           }

       

        public synchronized void write(int b)  throws IOException {

               chanel.write(b);     

        }

        public synchronized void write(byte b[]) throws IOException {

               chanel.write(b);     

        }

        public synchronized void write(byte b[], int off, int len) throws IOException {

               chanel.write(b,off,len); 

        }    

        public synchronized void flush() throws IOException {

               chanel.flush();

        }

        public synchronized void close()  throws IOException {

               chanel.close();      

        }   

    }

     

           很简单的例子。我们可以体会适配器模式,可以体会软件设计的灵活性……

           上面的关于PipedInputStreamPipedOutputStream的例子,本质上是对一个Chanel的几个不同的适配。Chanel作为一种编程模式,在软件设计中有极其广泛的应用。下面一节是JMS的简洁阐述!

           以上的例子其实是一个典型的使用适配器。

     

    6 JMS的架构

           JMSJ2EE部分的面向消息中间件的APIJMSQueueTopic某种意义上就是我们上面Chanel移到网络的其它一段――服务器上的一个例子。同时该Chanel得到了很多强化。如:1.支持交易;2.支持持久化……

           J2EEJMS是一个比较重要的方向,大型的企业应用中都会使用。不过J2EE中给出了其API,背后的理念还是相当丰富的!(具体细节以后会有相关文章!!唉,还是因为忙!!) 

  • 相关阅读:
    1101-Trees on the Level
    1099-移动小球
    1096-组合数
    Windows环境配置Apache+Mysql+PHP
    ArtDialog简单使用示例
    实现数字与字母的随机数
    SQLServer2005:在执行批处理时出现错误。错误消息为: 目录名无效
    sql语句总结
    在SQL SErver中实现数组功能
    aspnet_regiis.exe 的用法
  • 原文地址:https://www.cnblogs.com/daichangya/p/12959540.html
Copyright © 2011-2022 走看看