管道流主要用于在不同线程间直接传送数据
一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,进而实现不同线程间的通信
字节管道流
-------------------------------------------------------------字节流-------------------------------------------------------------
1 package com.qf.test12.pipe.Stream; 2 3 import java.io.IOException; 4 import java.io.PipedOutputStream; 5 6 /** 7 * @author qf 8 * @create 2018-09-20 9:26 9 */ 10 public class WriteData { 11 public void testWrite(PipedOutputStream out){ 12 try { 13 System.out.println("write:"); 14 for (int i = 0; i < 50; i++) { 15 String str = " "+(i+1); 16 out.write(str.getBytes()); 17 System.out.print(str); 18 } 19 System.out.println(); 20 out.close(); 21 } catch (IOException e) { 22 e.printStackTrace(); 23 } 24 } 25 }
1 package com.qf.test12.pipe.Stream; 2 3 import java.io.IOException; 4 import java.io.PipedInputStream; 5 6 /** 7 * @author qf 8 * @create 2018-09-20 9:38 9 */ 10 public class ReadData { 11 public void testRead(PipedInputStream in){ 12 try { 13 System.out.println("read:"); 14 byte[] bytes = new byte[1024]; 15 int readLen = in.read(bytes); 16 while (readLen != -1){ 17 String str = new String(bytes,0,readLen); 18 System.out.print(str); 19 readLen = in.read(bytes); 20 } 21 System.out.println(); 22 in.close(); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 }
-------------------------------------------------------------线程类-------------------------------------------------------------
1 package com.qf.test12.pipe.thread; 2 3 import com.qf.test12.pipe.Stream.WriteData; 4 5 import java.io.PipedOutputStream; 6 7 /** 8 * @author qf 9 * @create 2018-09-20 9:44 10 */ 11 public class ThreadW extends Thread { 12 private WriteData writeData; 13 private PipedOutputStream out; 14 15 public ThreadW(WriteData writeData, PipedOutputStream out) { 16 this.writeData = writeData; 17 this.out = out; 18 } 19 20 @Override 21 public void run() { 22 writeData.testWrite(out); 23 } 24 }
1 package com.qf.test12.pipe.thread; 2 3 import com.qf.test12.pipe.Stream.ReadData; 4 5 import java.io.PipedInputStream; 6 7 /** 8 * @author qf 9 * @create 2018-09-20 9:45 10 */ 11 public class ThreadR extends Thread { 12 private ReadData readData; 13 private PipedInputStream in; 14 15 public ThreadR(ReadData readData, PipedInputStream in) { 16 this.readData = readData; 17 this.in = in; 18 } 19 20 @Override 21 public void run() { 22 readData.testRead(in); 23 } 24 }
-------------------------------------------------------------测试类-------------------------------------------------------------
1 package com.qf.test12; 2 3 import com.qf.test12.pipe.Stream.ReadData; 4 import com.qf.test12.pipe.Stream.WriteData; 5 import com.qf.test12.pipe.thread.ThreadR; 6 import com.qf.test12.pipe.thread.ThreadW; 7 8 import java.io.IOException; 9 import java.io.PipedInputStream; 10 import java.io.PipedOutputStream; 11 12 /** 13 * @author qf 14 * @create 2018-09-20 9:47 15 */ 16 public class RunStream { 17 public static void main(String[] args) { 18 try { 19 WriteData write = new WriteData(); 20 ReadData read = new ReadData(); 21 22 PipedInputStream in = new PipedInputStream(); 23 PipedOutputStream out = new PipedOutputStream(); 24 25 out.connect(in);// 或者in.connect(out);作用是使得两个Stream流之间产生通信连接,才能对数据进行输入输出 26 27 ThreadR threadR = new ThreadR(read,in); 28 threadR.start(); 29 30 Thread.sleep(2000); 31 32 ThreadW threadW = new ThreadW(write,out); 33 threadW.start(); 34 35 } catch (IOException e) { 36 e.printStackTrace(); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 } 41 }
------------------------------------------------------------打印输出-----------------------------------------------------------
read: //休眠2000ms write: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
线程threadR先被启动,但是因为管道流中并没有数据,所以线程会阻塞在in.read(bytes)这里,直到有数据被写入,才会继续向下运行
字符管道流
将写入/读取数据的方式改为字符流的形式
-------------------------------------------------------------字符流-------------------------------------------------------------
1 package com.qf.test12.pipe.ReaderWriter; 2 3 import java.io.IOException; 4 import java.io.PipedReader; 5 6 /** 7 * @author qf 8 * @create 2018-09-20 9:38 9 */ 10 public class ReadData { 11 public void testRead(PipedReader in){ 12 try { 13 System.out.println("read:"); 14 char[] chars = new char[1024]; 15 int readLen = in.read(chars); 16 while (readLen != -1){ 17 String str = new String(chars,0,readLen); 18 System.out.print(str); 19 readLen = in.read(chars); 20 } 21 System.out.println(); 22 in.close(); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 }
1 package com.qf.test12.pipe.ReaderWriter; 2 3 import java.io.IOException; 4 import java.io.PipedWriter; 5 6 /** 7 * @author qf 8 * @create 2018-09-20 9:26 9 */ 10 public class WriteData { 11 public void testWrite(PipedWriter out){ 12 try { 13 System.out.println("write:"); 14 for (int i = 0; i < 50; i++) { 15 String str = " "+(i+1); 16 out.write(str.toCharArray()); 17 System.out.print(str); 18 } 19 System.out.println(); 20 out.close(); 21 } catch (IOException e) { 22 e.printStackTrace(); 23 } 24 } 25 }
-------------------------------------------------------------线程类-------------------------------------------------------------
1 package com.qf.test12.pipe.ReaderWriter.thread; 2 3 4 import com.qf.test12.pipe.ReaderWriter.ReadData; 5 6 import java.io.PipedReader; 7 8 9 /** 10 * @author qf 11 * @create 2018-09-20 9:45 12 */ 13 public class ThreadR extends Thread { 14 private ReadData readData; 15 private PipedReader in; 16 17 public ThreadR(ReadData readData, PipedReader in) { 18 this.readData = readData; 19 this.in = in; 20 } 21 22 @Override 23 public void run() { 24 readData.testRead(in); 25 } 26 }
1 package com.qf.test12.pipe.ReaderWriter.thread; 2 3 4 import com.qf.test12.pipe.ReaderWriter.WriteData; 5 6 import java.io.PipedWriter; 7 8 /** 9 * @author qf 10 * @create 2018-09-20 9:44 11 */ 12 public class ThreadW extends Thread { 13 private WriteData writeData; 14 private PipedWriter out; 15 16 public ThreadW(WriteData writeData, PipedWriter out) { 17 this.writeData = writeData; 18 this.out = out; 19 } 20 21 @Override 22 public void run() { 23 writeData.testWrite(out); 24 } 25 }
-------------------------------------------------------------测试类-------------------------------------------------------------
1 package com.qf.test12; 2 3 import com.qf.test12.pipe.ReaderWriter.ReadData; 4 import com.qf.test12.pipe.ReaderWriter.WriteData; 5 import com.qf.test12.pipe.ReaderWriter.thread.ThreadR; 6 import com.qf.test12.pipe.ReaderWriter.thread.ThreadW; 7 8 9 import java.io.*; 10 11 /** 12 * @author qf 13 * @create 2018-09-20 9:47 14 */ 15 public class RunReaderWriter { 16 public static void main(String[] args) { 17 try { 18 WriteData write = new WriteData(); 19 ReadData read = new ReadData(); 20 21 PipedWriter out = new PipedWriter(); 22 PipedReader in = new PipedReader(); 23 24 out.connect(in);// 或者in.connect(out);作用是使得两个Stream流之间产生通信连接,才能对数据进行输入输出 25 26 ThreadR threadR = new ThreadR(read,in); 27 threadR.start(); 28 29 Thread.sleep(2000); 30 31 ThreadW threadW = new ThreadW(write,out); 32 threadW.start(); 33 34 } catch (IOException e) { 35 e.printStackTrace(); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 } 40 }
------------------------------------------------------------打印输出-----------------------------------------------------------
read: //睡眠2000ms write: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50