zoukankan      html  css  js  c++  java
  • 【Java学习笔记】管道流

    作者:gnuhpc
    出处:http://www.cnblogs.com/gnuhpc/

    1.引言

    Java I/O系统是建立在数据流概念之上的,而在UNIX/Linux中有一个类似的概念,就是管道,它具有将一个程序的输出当作另一个程序的输入的能力。在Java中,可以使用管道流进行线程之间的通信,输入流和输出流必须相连接,这样的通信有别于一般的Shared Data通信,其不需要一个共享的数据空间。

     

    2.相关类及其关系

    1)字节流:

    分为管道输出流(PipedOutputStream)和管道输入流(PipedInputStream),利用 java.io.PipedOutputStream和java.io.PipedInputStream可以实现线程之间的二进制信息传输。如果要进行管道输出,则必须把输出流连在输入流上。 java.io.PipedOutputStream是java.io.OutputStream的直接子类,而java.io. PipedInputStream是java.io.InputStream的直接子类。PipedOutputStream和 PipedInputStream往往成对出现、配合使用。举例说明:

     

    TestPipe.Java

    import java.io.IOException;

    public class TestPipe {

        public static void main(String[] args) {

            Send s = new Send();
            Receive r = new Receive();
            try {
                s.getPos().connect(r.getPis()); // 连接管道
            } catch (IOException e) {
                e.printStackTrace();
            }
            new Thread(s).start(); // 启动线程
            new Thread(r).start(); // 启动线程
        }
    }

    Receive.java

    import java.io.IOException;
    import java.io.PipedInputStream;

    class Receive implements Runnable { // 实现Runnable接口
        private PipedInputStream pis = null;

        public Receive() {
            this.pis = new PipedInputStream(); // 实例化输入流
        }

        public void run() {
            byte b[] = new byte[1024];
            int len = 0;
            try {
                len = this.pis.read(b); // 接收数据
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                this.pis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("接收的内容为:" + new String(b, 0, len));
        }

        public PipedInputStream getPis() {
            return pis;
        }
    }

    Send.java

    import java.io.IOException;
    import java.io.PipedOutputStream;
    class Send implements Runnable {
        // 实现Runnable接口
        private PipedOutputStream pos = null; // 管道输出流

        public Send() {
            this.pos = new PipedOutputStream();// 实例化输出流
        }

        public void run() {
            String str = "Hello World!!!";
            try {
                this.pos.write(str.getBytes()); // 输出信息
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                this.pos.close(); // 关闭输出流
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public PipedOutputStream getPos() { // 通过线程类得到输出流
            return pos;
        }
    }

    我们可以看到使用管道流,通过connect方法进行连接,实现了Send线程和Receive线程之间的通信。

    注意:

    PipedInputStream中实际是用了一个1024字节固定大小的循环缓冲区。写入PipedOutputStream的数据实际上保存到对应的 PipedInputStream的内部缓冲区。从PipedInputStream执行读操作时,读取的数据实际上来自这个内部缓冲区。如果对应的 PipedInputStream输入缓冲区已满,任何企图写入PipedOutputStream的线程都将被阻塞。而且这个写操作线程将一直阻塞,直至出现读取PipedInputStream的操作从缓冲区删除数据。这也就是说往PipedOutputStream写数据的线程Send若是和从PipedInputStream读数据的线程Receive是同一个线程的话,那么一旦Send线程发送数据过多(大于1024字节),它就会被阻塞,这就直接导致接受数据的线程阻塞而无法工作(因为是同一个线程嘛),那么这就是一个典型的死锁现象,这也就是为什么javadoc中关于这两个类的使用时告诉大家要在多线程环境下使用的原因了。

    JavaConsoleOutput_2

     

    应用:过滤器模式

    image

    使用这个模式的典型例子是Unix的shell命令。这个模式的好处在于过滤器无需知道它与何种东西进行连接,并且这可以实现并行,而且系统的可扩展性可以根据添加删除或者改变Filter进行增强。

    在这举一个不断计算平均值的例子,producer作为前端的数据源,不断产生随机数,通过pipe进入filter进行数据处理,然后通过第二个pipe就行后端处理。

    import java.util.*;
    import java.io.*;

    public class PipeTest
    /* 建立3个线程(Producer、Filter、Consumer)类和两组通信管道,通过多线程将管道1的数据传送到管道2中,实现管道的通信。
    * Producer => pout1->pin1 =>  Filter(pin1->pout2) => pout2->pin2 =>Consumer
    */
    {
        public static void main(String args[]) {
            try {
                PipedOutputStream pout1 = new PipedOutputStream();
                PipedInputStream pin1 = new PipedInputStream(pout1);

                PipedOutputStream pout2 = new PipedOutputStream();
                PipedInputStream pin2 = new PipedInputStream(pout2);

                /* construct threads */

                Producer prod = new Producer(pout1);
                Filter filt = new Filter(pin1, pout2);
                Consumer cons = new Consumer(pin2);

                /* start threads */

                prod.start();
                filt.start();
                cons.start();
            } catch (IOException e) {
            }
        }
    }

    // 前端:该类的作用是产生随机数,并将其放到管道1的输出流中
    class Producer extends Thread {
        private DataOutputStream out;// DataOutputStream是用于写入一些基本类型数据的类,此类的实例用于生成伪随机数流
        private Random rand = new Random();

        public Producer(OutputStream os) {
            out = new DataOutputStream(os);
        }

        public void run() {
            while (true) {
                try {
                    double num = rand.nextDouble();
                    // 将double值直接写入流
                    out.writeDouble(num);
                    System.out.println("写入流中的值是 :" + num);
                    out.flush();
                    sleep(Math.abs(rand.nextInt()%10));//随机休眠一段时间
                } catch (Exception e) {
                    System.out.println("Error:   " + e);
                }
            }
        }
    }

    // 过滤器,起数据处理作用,读取管道1中输入流的内容,并将其放到管道2的输出流中
    class Filter extends Thread {
        private DataInputStream in;
        private DataOutputStream out;
        private double total = 0;
        private int count = 0;

        public Filter(InputStream is, OutputStream os) {
            in = new DataInputStream(is);
            out = new DataOutputStream(os);
        }

        public void run() {
            for (;;) {
                try {
                    double x = in.readDouble(); // 读取流中的数据
                    total += x;
                    count++;
                    if (count != 0) {
                        double d = total / count;
                        out.writeDouble(d); // 将得到的数据平均值写入流
                    }
                } catch (IOException e) {
                    System.out.println("Error:   " + e);
                }
            }
        }
    }

    // 后端:读取管道2输入流的内容
    class Consumer extends Thread {
        private double old_avg = 0;
        private DataInputStream in;

        public Consumer(InputStream is) {
            in = new DataInputStream(is);
        }

        public void run() {
            for (;;) {
                try {
                    double avg = in.readDouble();
                    if (Math.abs(avg - old_avg) > 0.01) {
                        System.out.println("现在的平均值是:   " + avg);
                        System.out.println();
                        old_avg = avg;
                    }
                } catch (IOException e) {
                    System.out.println("Error:   " + e);
                }
            }
        }

    }

     

    2)字符流

    Java利用 java.io.PipedWriter和java.io.PipedReader在线程之间传输字符信息。与 java.io.PipedOutputStream和java.io.PipedInputStream类似,java.io.PipedWriter 是java.io.Writer的直接子类,java.io.PipedReader是java.io.Reader的直接子类。PipedWriter拥有一个允许指定输入管道字符流的构造方法,而PipedReader拥有一个允许指定输出管道字符流的构造方法。从而使得PipedWriter和PipedReader往往成对出现、配合使用。

     

    以典型KWIC系统为例,下边的代码演示了如何使用字符流并且使用了过滤器模式:ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.StringTokenizer;

    public class KwicPipe {
        public static void main(String[] args) {
            try {
                //get the input and output path
                String src = args[0];
                String dest = args[1];


                //(writeToShiftThread => readFromShiftThread) = Pipe1
                PipedReader readFromShiftThread = new PipedReader();
                PipedWriter writeToShiftThread = new PipedWriter(readFromShiftThread);


                //(writeToSortLinesThread => readFromSortLinesThread) = Pipe2
                PipedReader readFromSortLinesThread = new PipedReader();
                PipedWriter writeToSortLinesThread = new PipedWriter(readFromSortLinesThread);


                //ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread
                ReadLineThread R1 = new ReadLineThread(writeToShiftThread,src);
                ShiftThread R2 = new ShiftThread(readFromShiftThread,writeToSortLinesThread);
                SortLinesThread R3 = new SortLinesThread(readFromSortLinesThread,dest);


                //Start the three processing thread
                R1.start();
                R2.start();
                R3.start();
            }
            catch (IOException e) {
                System.out.println("NO I/O");
            }
        }
    }

    // read the content of kwici.dat and send the lines to another thread
    class ReadLineThread extends Thread {
        PipedWriter PipeIn;
        String InputFilename= null;
        ReadLineThread(PipedWriter PlaceInPipe, String InputFilename) {
            PipeIn = PlaceInPipe;
            this.InputFilename = InputFilename;
        }
        private BufferedReader fileopen(String InputFilename) {
            BufferedReader input_file = null;
            try {
                input_file = new BufferedReader(new FileReader(InputFilename));
            } catch (IOException e) {
                System.err.println(("File not open" + e.toString()));
                System.exit(1);
            }
            return input_file;
        }
        public void run() {
            try {
                String Input;
                BufferedReader TheInput = fileopen(InputFilename);
                while ( (Input = TheInput.readLine()) != null) {
                    System.out.println(Input);
                    PipeIn.write(Input + "/n"); // Read from the file and then write to the pipe1
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O");
            }
        }
    }

    // read the lines from ReadLineThread and shift them. Send all the shifted lines to SortLinesThread
    class ShiftThread extends Thread {
        PipedReader PipeOut;
        PipedWriter PipeIn;
        ShiftThread(PipedReader ReadFromPipe, PipedWriter WriteToPipe) {
            PipeOut = ReadFromPipe;
            PipeIn = WriteToPipe;
        }
        public void run() {
            char[] cbuf = new char[80];
            int i, j;
            StringBuffer linebuff = new StringBuffer();
            try {
                // read from ReadLineThread
                i = PipeOut.read(cbuf, 0, 80);
                while (i != -1) {               
                    for (j = 0; j < i; j++) {
                        //if new line
                        if (cbuf[j]=='/n'){
                            // When reach the end of line,shift it
                            shiftline(linebuff.toString());
                            // empty the buffer
                            linebuff.delete(0, linebuff.length());
                        }
                        else {
                            linebuff.append(cbuf[j]);
                        }
                    }
                    i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O or end of stream (ShiftThread terminated)");
            }
            /* BECAUSE
             * If a thread was providing data characters to the connected piped output,
             * but the thread is no longer alive, then an IOException is thrown. (javadoc)
             */
        }
        private void shiftline( String line )
        {
            String onetoken = new String ();
            StringTokenizer tokens =
                new StringTokenizer( line );
            ArrayList<String> Tokens = new ArrayList<String> ();
            int count = tokens.countTokens();
            for ( int i = 0; i < count; i++)
            {
                onetoken = tokens.nextToken();
                if (!((onetoken.compareTo( "a" ) == 0) && (onetoken.compareTo( "an" ) == 0) && (onetoken.compareTo( "and" ) == 0) && (onetoken.compareTo( "the" ) == 0)))
                {
                    Tokens.add(onetoken);
                }
            }
            for ( int tokencount = 0; tokencount < count; tokencount++ )
            {
                StringBuffer linebuffer = new StringBuffer ();
                int index = tokencount;
                for ( int i = 0; i< count; i++ )
                {
                    if (index >= count)
                        index = 0;
                    linebuffer.append ( Tokens.get(index)  );
                    linebuffer.append (" ");
                    index++;
                }  //for i
                line = linebuffer.toString();
                // send the line to the SortLinesThread
                try {
                    PipeIn.write(line+ "/n");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }  // for token count
            return;
        }
    }

    class SortLinesThread extends Thread {
        PipedReader PipeOut;
        String OutputFilename;
        ArrayList<String>  KwicList = new ArrayList<String>();
        SortLinesThread(PipedReader ReadFromPipe, String OutputFilename) {
            PipeOut = ReadFromPipe;
            this.OutputFilename = OutputFilename;
        }
        public void run() {
            char[] cbuf = new char[80];
            int i, j;
            StringBuffer linebuff = new StringBuffer();
            try {
                // read from ShiftLineThread
                i = PipeOut.read(cbuf, 0, 80);
                while (i != -1) { // I don't know we're using that (The method Read blocks until at least one character of input is available.)
                    for (j = 0; j < i; j++) {
                        //if new line
                        if (cbuf[j]=='/n'){
                            // add it to the ArrayList
                            KwicList.add(linebuff.toString());
                            // adn empty the buffer
                            linebuff.delete(0, linebuff.length());
                        }
                        else {
                            //append the character to the line
                            linebuff.append(cbuf[j]);
                        }
                    }           
                    i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O or end of stream (SortLinesThread terminated)");
            }
            /* BECAUSE
             * If a thread was providing data characters to the connected piped output,
             * but the thread is no longer alive, then an IOException is thrown. (javadoc)
             */
            // when the reading is finished, sort the ArrayList and diplay
            Collections.sort(KwicList);//sort when added
            displaylist ( KwicList );//Standard Output
            //Export to file
            try {
                export(KwicList, OutputFilename);
            } catch (Exception e) {
                System.out.println("Error Output File ");
            }
        }
        private void displaylist (ArrayList<String> KwicList )
        {
            System.out.println ("/nList : " );
            for ( int count = 0; count < KwicList.size(); count++ )
                System.out.println (KwicList.get (count) );
        }
        private void export(ArrayList<String> List, String oufFilename) throws Exception{
            BufferedWriter writer = null;
            try {
                writer = new BufferedWriter(new FileWriter(oufFilename));
            } catch (FileNotFoundException e) {
                System.err.println(("File not open" + e.toString()));
                System.exit(1);
            }

            for (int count = 0; count < List.size(); count++) {
                  writer.write(List.get(count));
                  writer.write("/r/n");
            }
            writer.flush();
            writer.close();
            System.out.println("Processed Finished");
        }
    }

    作者:gnuhpc
    出处:http://www.cnblogs.com/gnuhpc/

  • 相关阅读:
    自定义lync状态
    实现SharePoint2010企业内容管理资源中心
    用代码实现Sharepoint2010的个人信息的照片上传(原创)
    用代码实现Sharepoint2010的个人信息的照片上传(2)(原创)
    window.open的例子和使用方法以及参数说明(完整版)
    一些困惑的事情
    建立三层结构的ASP应用程序
    ASP.net常用函数
    ASP.NET数据库使用精典读取数据库中数据
    配置你的ASP.NET运行环境
  • 原文地址:https://www.cnblogs.com/gnuhpc/p/2844084.html
Copyright © 2011-2022 走看看