zoukankan      html  css  js  c++  java
  • 【Java学习笔记】管道流

    作者:gnuhpc
    出处:http://www.cnblogs.com/gnuhpc/

    1.引言

    Java I/O系统是建立在数据流概念之上的,而在UNIX/Linux中有一个类似的概念,就是管道,它具有将一个程序的输出当作另一个程序的输入的能力。在Java中,可以使用管道流进行线程之间的通信,输入流和输出流必须相连接,这样的通信有别于一般的Shared Data通信,其不需要一个共享的数据空间。

     

    2.相关类及其关系

    1)字节流:

    分为管道输出流(PipedOutputStream)和管道输入流(PipedInputStream),利用 java.io.PipedOutputStream和java.io.PipedInputStream可以实现线程之间的二进制信息传输。如果要进行管道输出,则必须把输出流连在输入流上。 java.io.PipedOutputStream是java.io.OutputStream的直接子类,而java.io. PipedInputStream是java.io.InputStream的直接子类。PipedOutputStream和 PipedInputStream往往成对出现、配合使用。举例说明:

     

    TestPipe.Java

    import java.io.IOException;

    public class TestPipe {

        public static void main(String[] args) {

            Send s = new Send();
            Receive r = new Receive();
            try {
                s.getPos().connect(r.getPis()); // 连接管道
            } catch (IOException e) {
                e.printStackTrace();
            }
            new Thread(s).start(); // 启动线程
            new Thread(r).start(); // 启动线程
        }
    }

    Receive.java

    import java.io.IOException;
    import java.io.PipedInputStream;

    class Receive implements Runnable { // 实现Runnable接口
        private PipedInputStream pis = null;

        public Receive() {
            this.pis = new PipedInputStream(); // 实例化输入流
        }

        public void run() {
            byte b[] = new byte[1024];
            int len = 0;
            try {
                len = this.pis.read(b); // 接收数据
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                this.pis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("接收的内容为:" + new String(b, 0, len));
        }

        public PipedInputStream getPis() {
            return pis;
        }
    }

    Send.java

    import java.io.IOException;
    import java.io.PipedOutputStream;
    class Send implements Runnable {
        // 实现Runnable接口
        private PipedOutputStream pos = null; // 管道输出流

        public Send() {
            this.pos = new PipedOutputStream();// 实例化输出流
        }

        public void run() {
            String str = "Hello World!!!";
            try {
                this.pos.write(str.getBytes()); // 输出信息
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                this.pos.close(); // 关闭输出流
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public PipedOutputStream getPos() { // 通过线程类得到输出流
            return pos;
        }
    }

    我们可以看到使用管道流,通过connect方法进行连接,实现了Send线程和Receive线程之间的通信。

    注意:

    PipedInputStream中实际是用了一个1024字节固定大小的循环缓冲区。写入PipedOutputStream的数据实际上保存到对应的 PipedInputStream的内部缓冲区。从PipedInputStream执行读操作时,读取的数据实际上来自这个内部缓冲区。如果对应的 PipedInputStream输入缓冲区已满,任何企图写入PipedOutputStream的线程都将被阻塞。而且这个写操作线程将一直阻塞,直至出现读取PipedInputStream的操作从缓冲区删除数据。这也就是说往PipedOutputStream写数据的线程Send若是和从PipedInputStream读数据的线程Receive是同一个线程的话,那么一旦Send线程发送数据过多(大于1024字节),它就会被阻塞,这就直接导致接受数据的线程阻塞而无法工作(因为是同一个线程嘛),那么这就是一个典型的死锁现象,这也就是为什么javadoc中关于这两个类的使用时告诉大家要在多线程环境下使用的原因了。

    JavaConsoleOutput_2

     

    应用:过滤器模式

    image

    使用这个模式的典型例子是Unix的shell命令。这个模式的好处在于过滤器无需知道它与何种东西进行连接,并且这可以实现并行,而且系统的可扩展性可以根据添加删除或者改变Filter进行增强。

    在这举一个不断计算平均值的例子,producer作为前端的数据源,不断产生随机数,通过pipe进入filter进行数据处理,然后通过第二个pipe就行后端处理。

    import java.util.*;
    import java.io.*;

    public class PipeTest
    /* 建立3个线程(Producer、Filter、Consumer)类和两组通信管道,通过多线程将管道1的数据传送到管道2中,实现管道的通信。
    * Producer => pout1->pin1 =>  Filter(pin1->pout2) => pout2->pin2 =>Consumer
    */
    {
        public static void main(String args[]) {
            try {
                PipedOutputStream pout1 = new PipedOutputStream();
                PipedInputStream pin1 = new PipedInputStream(pout1);

                PipedOutputStream pout2 = new PipedOutputStream();
                PipedInputStream pin2 = new PipedInputStream(pout2);

                /* construct threads */

                Producer prod = new Producer(pout1);
                Filter filt = new Filter(pin1, pout2);
                Consumer cons = new Consumer(pin2);

                /* start threads */

                prod.start();
                filt.start();
                cons.start();
            } catch (IOException e) {
            }
        }
    }

    // 前端:该类的作用是产生随机数,并将其放到管道1的输出流中
    class Producer extends Thread {
        private DataOutputStream out;// DataOutputStream是用于写入一些基本类型数据的类,此类的实例用于生成伪随机数流
        private Random rand = new Random();

        public Producer(OutputStream os) {
            out = new DataOutputStream(os);
        }

        public void run() {
            while (true) {
                try {
                    double num = rand.nextDouble();
                    // 将double值直接写入流
                    out.writeDouble(num);
                    System.out.println("写入流中的值是 :" + num);
                    out.flush();
                    sleep(Math.abs(rand.nextInt()%10));//随机休眠一段时间
                } catch (Exception e) {
                    System.out.println("Error:   " + e);
                }
            }
        }
    }

    // 过滤器,起数据处理作用,读取管道1中输入流的内容,并将其放到管道2的输出流中
    class Filter extends Thread {
        private DataInputStream in;
        private DataOutputStream out;
        private double total = 0;
        private int count = 0;

        public Filter(InputStream is, OutputStream os) {
            in = new DataInputStream(is);
            out = new DataOutputStream(os);
        }

        public void run() {
            for (;;) {
                try {
                    double x = in.readDouble(); // 读取流中的数据
                    total += x;
                    count++;
                    if (count != 0) {
                        double d = total / count;
                        out.writeDouble(d); // 将得到的数据平均值写入流
                    }
                } catch (IOException e) {
                    System.out.println("Error:   " + e);
                }
            }
        }
    }

    // 后端:读取管道2输入流的内容
    class Consumer extends Thread {
        private double old_avg = 0;
        private DataInputStream in;

        public Consumer(InputStream is) {
            in = new DataInputStream(is);
        }

        public void run() {
            for (;;) {
                try {
                    double avg = in.readDouble();
                    if (Math.abs(avg - old_avg) > 0.01) {
                        System.out.println("现在的平均值是:   " + avg);
                        System.out.println();
                        old_avg = avg;
                    }
                } catch (IOException e) {
                    System.out.println("Error:   " + e);
                }
            }
        }

    }

     

    2)字符流

    Java利用 java.io.PipedWriter和java.io.PipedReader在线程之间传输字符信息。与 java.io.PipedOutputStream和java.io.PipedInputStream类似,java.io.PipedWriter 是java.io.Writer的直接子类,java.io.PipedReader是java.io.Reader的直接子类。PipedWriter拥有一个允许指定输入管道字符流的构造方法,而PipedReader拥有一个允许指定输出管道字符流的构造方法。从而使得PipedWriter和PipedReader往往成对出现、配合使用。

     

    以典型KWIC系统为例,下边的代码演示了如何使用字符流并且使用了过滤器模式:ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.StringTokenizer;

    public class KwicPipe {
        public static void main(String[] args) {
            try {
                //get the input and output path
                String src = args[0];
                String dest = args[1];


                //(writeToShiftThread => readFromShiftThread) = Pipe1
                PipedReader readFromShiftThread = new PipedReader();
                PipedWriter writeToShiftThread = new PipedWriter(readFromShiftThread);


                //(writeToSortLinesThread => readFromSortLinesThread) = Pipe2
                PipedReader readFromSortLinesThread = new PipedReader();
                PipedWriter writeToSortLinesThread = new PipedWriter(readFromSortLinesThread);


                //ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread
                ReadLineThread R1 = new ReadLineThread(writeToShiftThread,src);
                ShiftThread R2 = new ShiftThread(readFromShiftThread,writeToSortLinesThread);
                SortLinesThread R3 = new SortLinesThread(readFromSortLinesThread,dest);


                //Start the three processing thread
                R1.start();
                R2.start();
                R3.start();
            }
            catch (IOException e) {
                System.out.println("NO I/O");
            }
        }
    }

    // read the content of kwici.dat and send the lines to another thread
    class ReadLineThread extends Thread {
        PipedWriter PipeIn;
        String InputFilename= null;
        ReadLineThread(PipedWriter PlaceInPipe, String InputFilename) {
            PipeIn = PlaceInPipe;
            this.InputFilename = InputFilename;
        }
        private BufferedReader fileopen(String InputFilename) {
            BufferedReader input_file = null;
            try {
                input_file = new BufferedReader(new FileReader(InputFilename));
            } catch (IOException e) {
                System.err.println(("File not open" + e.toString()));
                System.exit(1);
            }
            return input_file;
        }
        public void run() {
            try {
                String Input;
                BufferedReader TheInput = fileopen(InputFilename);
                while ( (Input = TheInput.readLine()) != null) {
                    System.out.println(Input);
                    PipeIn.write(Input + "/n"); // Read from the file and then write to the pipe1
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O");
            }
        }
    }

    // read the lines from ReadLineThread and shift them. Send all the shifted lines to SortLinesThread
    class ShiftThread extends Thread {
        PipedReader PipeOut;
        PipedWriter PipeIn;
        ShiftThread(PipedReader ReadFromPipe, PipedWriter WriteToPipe) {
            PipeOut = ReadFromPipe;
            PipeIn = WriteToPipe;
        }
        public void run() {
            char[] cbuf = new char[80];
            int i, j;
            StringBuffer linebuff = new StringBuffer();
            try {
                // read from ReadLineThread
                i = PipeOut.read(cbuf, 0, 80);
                while (i != -1) {               
                    for (j = 0; j < i; j++) {
                        //if new line
                        if (cbuf[j]=='/n'){
                            // When reach the end of line,shift it
                            shiftline(linebuff.toString());
                            // empty the buffer
                            linebuff.delete(0, linebuff.length());
                        }
                        else {
                            linebuff.append(cbuf[j]);
                        }
                    }
                    i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O or end of stream (ShiftThread terminated)");
            }
            /* BECAUSE
             * If a thread was providing data characters to the connected piped output,
             * but the thread is no longer alive, then an IOException is thrown. (javadoc)
             */
        }
        private void shiftline( String line )
        {
            String onetoken = new String ();
            StringTokenizer tokens =
                new StringTokenizer( line );
            ArrayList<String> Tokens = new ArrayList<String> ();
            int count = tokens.countTokens();
            for ( int i = 0; i < count; i++)
            {
                onetoken = tokens.nextToken();
                if (!((onetoken.compareTo( "a" ) == 0) && (onetoken.compareTo( "an" ) == 0) && (onetoken.compareTo( "and" ) == 0) && (onetoken.compareTo( "the" ) == 0)))
                {
                    Tokens.add(onetoken);
                }
            }
            for ( int tokencount = 0; tokencount < count; tokencount++ )
            {
                StringBuffer linebuffer = new StringBuffer ();
                int index = tokencount;
                for ( int i = 0; i< count; i++ )
                {
                    if (index >= count)
                        index = 0;
                    linebuffer.append ( Tokens.get(index)  );
                    linebuffer.append (" ");
                    index++;
                }  //for i
                line = linebuffer.toString();
                // send the line to the SortLinesThread
                try {
                    PipeIn.write(line+ "/n");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }  // for token count
            return;
        }
    }

    class SortLinesThread extends Thread {
        PipedReader PipeOut;
        String OutputFilename;
        ArrayList<String>  KwicList = new ArrayList<String>();
        SortLinesThread(PipedReader ReadFromPipe, String OutputFilename) {
            PipeOut = ReadFromPipe;
            this.OutputFilename = OutputFilename;
        }
        public void run() {
            char[] cbuf = new char[80];
            int i, j;
            StringBuffer linebuff = new StringBuffer();
            try {
                // read from ShiftLineThread
                i = PipeOut.read(cbuf, 0, 80);
                while (i != -1) { // I don't know we're using that (The method Read blocks until at least one character of input is available.)
                    for (j = 0; j < i; j++) {
                        //if new line
                        if (cbuf[j]=='/n'){
                            // add it to the ArrayList
                            KwicList.add(linebuff.toString());
                            // adn empty the buffer
                            linebuff.delete(0, linebuff.length());
                        }
                        else {
                            //append the character to the line
                            linebuff.append(cbuf[j]);
                        }
                    }           
                    i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
                }
            }
            catch (FileNotFoundException e) {
                System.out.println("NO FILE ");
            }
            catch (IOException e) {
                System.out.println("NO I/O or end of stream (SortLinesThread terminated)");
            }
            /* BECAUSE
             * If a thread was providing data characters to the connected piped output,
             * but the thread is no longer alive, then an IOException is thrown. (javadoc)
             */
            // when the reading is finished, sort the ArrayList and diplay
            Collections.sort(KwicList);//sort when added
            displaylist ( KwicList );//Standard Output
            //Export to file
            try {
                export(KwicList, OutputFilename);
            } catch (Exception e) {
                System.out.println("Error Output File ");
            }
        }
        private void displaylist (ArrayList<String> KwicList )
        {
            System.out.println ("/nList : " );
            for ( int count = 0; count < KwicList.size(); count++ )
                System.out.println (KwicList.get (count) );
        }
        private void export(ArrayList<String> List, String oufFilename) throws Exception{
            BufferedWriter writer = null;
            try {
                writer = new BufferedWriter(new FileWriter(oufFilename));
            } catch (FileNotFoundException e) {
                System.err.println(("File not open" + e.toString()));
                System.exit(1);
            }

            for (int count = 0; count < List.size(); count++) {
                  writer.write(List.get(count));
                  writer.write("/r/n");
            }
            writer.flush();
            writer.close();
            System.out.println("Processed Finished");
        }
    }

    作者:gnuhpc
    出处:http://www.cnblogs.com/gnuhpc/

  • 相关阅读:
    git(1)-git关联GitHub-windows-转载
    jenkins(4)-jenkins配置邮件通知
    jenkins(3)-linux下安装jenkins(yum install方式)
    【PAT甲级】1090 Highest Price in Supply Chain (25 分)(DFS)
    【PAT甲级】1087 All Roads Lead to Rome (30 分)(MAP【int,string】,邻接表,DFS,模拟,SPFA)
    【PAT甲级】1018 Public Bike Management (30 分)(DFS,SPFA)
    Educational Codeforces Round 61 (Rated for Div. 2) G(线段树,单调栈)
    Atcoder Grand Contest 032C(欧拉回路,DFS判环)
    Educational Codeforces Round 62 (Rated for Div. 2)E(染色DP,构造,思维,组合数学)
    Atcoder Grand Contest 031C(构造,思维,异或,DFS)
  • 原文地址:https://www.cnblogs.com/gnuhpc/p/2844084.html
Copyright © 2011-2022 走看看