zoukankan      html  css  js  c++  java
  • 通过管道进行线程间通信

      在Java语言中提供了各种各样的输入/输出流Stream,使我们能够方便地对数据进行操作,其中管道流(pipeStream)是一种特殊的流,用于在不同线程间直接传送数据。一个发送数据到输出管道,另一个线程从输入管道中读数据。通过使用管道,实现不同线程间的通信,而无需借助于临时文件之类的动西。

      在Java的JDK中提供了4个类来使线程间可以通信:

      (1)PipedInputStream和PipedOutputStream

      (2)PipedReader和PipedWriter

    1. 通过字节流 Stream实现线程间通信

    用 PipedInputStream和PipedOutputStream实现线程间通信:

    package cn.qlq.thread.eight;
    
    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import sun.misc.IOUtils;
    
    /**
     * 通过管道进行线程间通信:字节流
     * 
     * @author Administrator
     *
     */
    public class Demo1 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class);
    
        public static void main(String[] args) throws IOException {
            final PipedInputStream inputStream = new PipedInputStream();
            final PipedOutputStream outputStream = new PipedOutputStream();
    
            // 使管道建立连接
            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);
    
            // 开启线程读和写入
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                    try {
                        for (int i = 0; i < 10; i++) {
                            String data = String.valueOf(i);
                            outputStream.write(data.getBytes());
                            LOGGER.info("threadName->{} write data ->{}", Thread.currentThread().getName(), data);
                            outputStream.flush();
                        }
                        outputStream.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "writeThread").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                        byte[] buffer = new byte[20];
                        int readLength = inputStream.read(buffer);
                        while (readLength != -1) {
                            String data = new String(buffer, 0, readLength);
                            LOGGER.info("threadName->{} read data ->{}", Thread.currentThread().getName(), data);
                            readLength = inputStream.read(buffer);
                        }
                        inputStream.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "readThread").start();
        }
    }

    结果:

    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->readThread 进入run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->writeThread 进入run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->0
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->1
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->0
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->2
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->12
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->3
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->3
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->4
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->4
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->5
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->5
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->6
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->6
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->7
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->7
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->8
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->8
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->writeThread write data ->9
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threadName->readThread read data ->9
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->writeThread 结束run
    12:11:33 [cn.qlq.thread.eight.Demo1]-[INFO] threamName ->readThread 结束run

      使用代码outputStream.connect(inputStream); (或者inputStream.connect(outputStream);)是为了使两个管道建立通信连接,这样才可以将数据进行输入与输出。

      readThread 启动后,由于当时没有数据被写入,所以线程阻塞在int readLength = inputStream.read(buffer);  直到有数据被写入时,才继续向下运行。

    2. 通过字符流 ReaderWriter实现线程间通信

      例子与上面一样:

    package cn.qlq.thread.eight;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 通过管道进行线程间通信:字符流
     * 
     * @author Administrator
     *
     */
    public class Demo2 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);
    
        public static void main(String[] args) throws IOException {
            final PipedWriter writer = new PipedWriter();
            final PipedReader reader = new PipedReader();
    
            // 使管道建立连接
            // writer.connect(reader);
            reader.connect(writer);
    
            // 开启线程读和写入
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                    try {
                        for (int i = 0; i < 5; i++) {
                            String data = String.valueOf(i);
                            writer.write(data);
                            LOGGER.info("threadName->{} write data ->{}", Thread.currentThread().getName(), data);
                        }
                        writer.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "writerThread").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        LOGGER.info("threamName ->{} 进入run ", Thread.currentThread().getName());
                        char[] buffer = new char[20];
                        int read = reader.read(buffer);
                        while (read != -1) {
                            String data = new String(buffer, 0, read);
                            LOGGER.info("threadName->{} read data ->{}", Thread.currentThread().getName(), data);
                            read = reader.read(buffer);
                        }
                        reader.close();
                        LOGGER.info("threamName ->{} 结束run ", Thread.currentThread().getName());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "readThread").start();
        }
    }

    结果:

    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->writerThread 进入run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->readThread 进入run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->0
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->0
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->1
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->1
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->2
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->2
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->3
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->3
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->4
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->4
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->5
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->5
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->6
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->6
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->7
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->7
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->8
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->8
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->writerThread write data ->9
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threadName->readThread read data ->9
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->writerThread 结束run
    13:29:52 [cn.qlq.thread.eight.Demo2]-[INFO] threamName ->readThread 结束run

      

      代码    // writer.connect(reader); (或者 reader.connect(writer);) 是使两个管道建立通信连接。

  • 相关阅读:
    配置Kickstart无人值守安装centos5.9 天高地厚
    数据库是什么,它是做什么用的? 天高地厚
    Mysql主从复制 天高地厚
    android开发中eclipse里xml的自动提示
    "error: device not found" and "error:device offline"
    gentoo中emerge失效:File "/usr/bin/emerge", line 43
    android:修改preference中view属性
    gerrit上利用sshkeygen公钥
    git 基本命令介绍
    prebuilt/linuxx86/toolchain/armeabi4.4.3/bin/armeabigcc: /lib/libc.so.6: version `GLIBC_2.11' not found:解决办法
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10118733.html
Copyright © 2011-2022 走看看