zoukankan      html  css  js  c++  java
  • 通过管道进行线程间通信

      在Java语言中提供了各种各样的输入/输出流Stream,使我们能够方便地对数据进行操作,其中管道流(pipeStream)是一种特殊的流,用于在不同线程间直接传送数据。一个发送数据到输出管道,另一个线程从输入管道中读数据。通过使用管道,实现不同线程间的通信,而无需借助于临时文件之类的动西。

      在Java的JDK中提供了4个类来使线程间可以通信:

      (1)PipedInputStream和PipedOutputStream

      (2)PipedReader和PipedWriter

    1. 通过字节流 Stream实现线程间通信

    用 PipedInputStream和PipedOutputStream实现线程间通信:

    package cn.qlq.thread.eight;
    
    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import sun.misc.IOUtils;
    
    /**
     * 通过管道进行线程间通信:字节流
     * 
     * @author Administrator
     *
     */
    public class Demo1 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class);
    
        public static void main(String[] args) throws IOException {
            final PipedInputStream inputStream = new PipedInputStream();
            final PipedOutputStream outputStream = new PipedOutputStream();
    
            // 使管道建立连接
            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);
    
            // 开启线程读和写入
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                    try {
                        for (int i = 0; i < 10; i++) {
                            String data = String.valueOf(i);
                            outputStream.write(data.getBytes());
                            LOGGER.info("threadName->{} write data ->{}", Thread.currentThread().getName(), data);
                            outputStream.flush();
                        }
                        outputStream.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "writeThread").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                        byte[] buffer = new byte[20];
                        int readLength = inputStream.read(buffer);
                        while (readLength != -1) {
                            String data = new String(buffer, 0, readLength);
                            LOGGER.info("threadName->{} read data ->{}", Thread.currentThread().getName(), data);
                            readLength = inputStream.read(buffer);
                        }
                        inputStream.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "readThread").start();
        }
    }

    结果:

    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->readThread 进入run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->writeThread 进入run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->0
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->1
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->0
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->2
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->12
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->3
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->3
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->4
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->4
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->5
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->5
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->6
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->6
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->7
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->7
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->8
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->8
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->9
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->9
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->writeThread 结束run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->readThread 结束run

      使用代码outputStream.connect(inputStream); (或者inputStream.connect(outputStream);)是为了使两个管道建立通信连接,这样才可以将数据进行输入与输出。

      readThread 启动后,由于当时没有数据被写入,所以线程阻塞在int readLength = inputStream.read(buffer);  直到有数据被写入时,才继续向下运行。

    2. 通过字符流 ReaderWriter实现线程间通信

      例子与上面一样:

    package cn.qlq.thread.eight;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 通过管道进行线程间通信:字符流
     * 
     * @author Administrator
     *
     */
    public class Demo2 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);
    
        public static void main(String[] args) throws IOException {
            final PipedWriter writer = new PipedWriter();
            final PipedReader reader = new PipedReader();
    
            // 使管道建立连接
            // writer.connect(reader);
            reader.connect(writer);
    
            // 开启线程读和写入
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                    try {
                        for (int i = 0; i < 5; i++) {
                            String data = String.valueOf(i);
                            writer.write(data);
                            LOGGER.info("threadName->{} write data ->{}", Thread.currentThread().getName(), data);
                        }
                        writer.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "writerThread").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                        char[] buffer = new char[20];
                        int read = reader.read(buffer);
                        while (read != -1) {
                            String data = new String(buffer, 0, read);
                            LOGGER.info("threadName->{} read data ->{}", Thread.currentThread().getName(), data);
                            read = reader.read(buffer);
                        }
                        reader.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "readThread").start();
        }
    }

    结果:

    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->writerThread 进入run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->readThread 进入run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->0
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->0
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->1
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->1
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->2
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->2
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->3
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->3
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->4
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->4
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->5
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->5
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->6
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->6
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->7
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->7
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->8
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->8
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->9
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->9
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->writerThread 结束run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->readThread 结束run

      

      代码    // writer.connect(reader); (或者 reader.connect(writer);) 是使两个管道建立通信连接。

  • 相关阅读:
    修复 Visual Studio Error “No exports were found that match the constraint”
    RabbitMQ Config
    Entity Framework Extended Library
    Navisworks API 简单二次开发 (自定义工具条)
    NavisWorks Api 简单使用与Gantt
    SQL SERVER 竖表变成横表
    SQL SERVER 多数据导入
    Devexpress GridControl.Export
    mongo DB for C#
    Devexress XPO xpPageSelector 使用
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10118733.html
Copyright © 2011-2022 走看看