zoukankan      html  css  js  c++  java
  • 多线程(11) — NIO

      Java NIO是new IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制,严格来说,NIO与并发并无直接关系,但是使用NIO技术可以大大提高线程的使用效率。Java NIO设计的基础内容有通道(Channel)、缓冲区(Buffer)、Selector(选择器)。下面说说这几个内容

    1)通道(Channel)

      Channel:Channel是一对象,可以通过它读取和写入数据。可以把它看做是IO中的流,不同的是:

    • Channel是双向的,既可以读又可以写,而流是单向的
    • Channel可以进行异步的读写
    • 对Channel的读写必须通过buffer对象

      正如上面提到的,所有数据都通过Buffer对象处理,所以不会将字节写入到Channel中,而是将数据写入到Buffer中;不会从Channel中读取字节,而是将数据从Channel读入Buffer,再从Buffer获取这个字节。Channel可以比流更好地反映出底层操作系统的真实情况。特别是在Unix模型中,底层操作系统通常都是双向的。在Java NIO中的Channel主要有如下几种类型:

    • FileChannel:从文件读取数据的
    • DatagramChannel:读写UDP网络协议数据
    • SocketChannel:读写TCP网络协议数据
    • ServerSocketChannel:可以监听TCP连接

    2)缓冲区(Buffer)

      Buffer是一对象,它包含一些要写入或者读到的Stream对象。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的。在NIO中,所有的数据都是用Buffer处理的,它是NIO读写数据的中转池。Buffer实质上是一个数组,通常是一个字节数据,但也可以是其他类型的数组。但一个缓冲区不仅仅是一个数组,重要的是它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。使用 Buffer 读写数据一般遵循以下四个步骤:

    1. 写入数据到 Buffer;
    2. 调用 flip() 方法;
    3. 从 Buffer 中读取数据;
    4. 调用 clear() 方法或者 compact() 方法。

      当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。Buffer主要有如下几种:

    • ByteBuffer
    • CharBuffer
    • DoubleBuffer
    • FloatBuffer
    • IntBuffer
    • LongBuffer
    • ShortBuffer

      CopyFile执行三个基本的操作:创建一个Buffer,然后从源文件读取数据到缓冲区,然后再将缓冲区写入目标文件。

    public static void copyFileUseNIO(String src,String dst) throws IOException{
        //声明源文件和目标文件
        FileInputStream fi=new FileInputStream(new File(src));
        FileOutputStream fo=new FileOutputStream(new File(dst));
        //获得传输通道channel
        FileChannel inChannel=fi.getChannel();
        FileChannel outChannel=fo.getChannel();
        //获得容器buffer
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        while(true){
            //判断是否读完文件
            int eof =inChannel.read(buffer);
            if(eof==-1){
                break;  
            }
            //重设一下buffer的position=0,limit=position
            buffer.flip();
            //开始写
            outChannel.write(buffer);
            //写完要重置buffer,重设position=0,limit=capacity
            buffer.clear();
        }
        inChannel.close();
        outChannel.close();
        fi.close();
        fo.close();
    }   

    三)Selector(选择器对象)

      Selector是一个对象,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。Selector 就是注册对各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。

    Selector selector = Selector.open();

      为了能让Channel和Selector配合使用,我们需要把Channel注册到Selector上。通过调用 channel.register()方法来实现注册:

    channel.configureBlocking(false);
    SelectionKey key =channel.register(selector,SelectionKey.OP_READ);

      注意,注册的Channel 必须设置成异步模式 才可以,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为FileChannel没有异步模式,但是网络编程中的SocketChannel是可以的。

      register()的调用的返回值是一个SelectionKey,代表这个通道在此 Selector 上注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

    SelectionKey中包含如下属性:

    (1)interestSet

      把Channel注册到Selector来监听感兴趣的事件,interestSet就是你要选择的感兴趣的事件的集合。可以通过SelectionKey对象来读写interest set:

    int interestSet = selectionKey.interestOps();
    boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
    boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
    boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
    boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE; 

      通过上面例子可以看到,我们可以通过用 & 和 SelectionKey 中的常量做运算,从SelectionKey中找到我们感兴趣的事件。

    (2)readySet

      readySet 是通道已经准备就绪进行操作的集合。在一次选Selection之后,你应该会首先访问这个readySet。Selection将在下一小节进行解释。可以这样访问ready集合,也可以用像检测interest集合那样的方法,来检测Channel中什么事件或操作已经就绪:

    int readySet = selectionKey.readyOps();
    selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();

    (3)Channel 和 Selector

    我们可以通过SelectionKey获得Selector和注册的Channel:

    Channel  channel  = selectionKey.channel();
    Selector selector = selectionKey.selector(); 

    (4)Attach一个对象

      可以将一个对象或者更多信息attach 到SelectionKey上,这样就能识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或包含聚集数据对象。使用方法如下:

    selectionKey.attach(theObject);
    Object attachedObj = selectionKey.attachment();

      还可以在用register()方法向Selector注册Channel的时候附加对象。如:

    SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

    NIO多路复用

    主要步骤和元素:

    • 首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色。

    • 然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求。

    • 注意,为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常。

    • Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒。

    • 在具体的方法中,通过 SocketChannel 和 Buffer 进行数据操作

      IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高

    下面用NIO设计一个Echo服务器:

    首先定义一个Selector和线程池

    private Selector selector;
    private ExecutorService tp = Executors.newCachedThreadPool();

      selector处理所有的网络连接,tp线程池处理每一个客户端请求。为了统计服务器线程在客户端花费的时间,还需要定义一个时间统计有关的变量,用于统计在某一个Socket上花费的时间,time_stat的key为Socket,value为时间戳:

    public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240);

      下面来看一下NIO服务器的核心代码,startServer()方法用于启动NIO Server。

        private void startServer() throws IOException{
            this.selector = SelectorProvider.provider().openSelector();
            ServerSocketChannel ssc = ServerSocketChannel.open();                          // 服务端SocketChannel
            ssc.configureBlocking(false);                                                  // 设置为非阻塞模式
            InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(),8000);// 使用8000端口
            ssc.socket().bind(isa);
            SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);   // 将ServerSocketChannel绑定到Selector上,感兴趣的时间为Accept
            for(;;){                     // 主要任务是等待-分发网络消息
                this.selector.select(); // 阻塞方法,如果当前没有准备好的的数据,就会等待,如果有的话返回已经准备好的SelectionKey数量
                Set<SelectionKey> readyKeys = this.selector.selectedKeys(); // 获取准备好的SelectionKey
                Iterator<SelectionKey> i = readyKeys.iterator();
                long e = 0;
                while(i.hasNext()){
                    SelectionKey sk = i.next();
                    i.remove();// 处理一个删除一个,不然可能重复处理
                    if(sk.isAcceptable()){
                        doAccept(sk);
                    }else if(sk.isValid() && sk.isReadable()){// 判断是否可以读
                        if(!time_stat.containsKey(((SocketChannel) sk.channel()).socket())){
                            time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());
                        }
                        doRead(sk);
                    }else if(sk.isValid() && sk.isWritable()){ // 判断是否可以写
                        doWrite(sk);
                        e = System.currentTimeMillis();
                        long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
                        System.out.println("spend: "+(b-e)+"ms");
                    }
                }
            }
        }

      在了解服务端整体框架后,下面从具体的方法中看看几个主要方法的使用:

        private void doAccept(SelectionKey sk) {
            ServerSocketChannel server = (ServerSocketChannel) sk.channel();
            SocketChannel clientChannel;
            try {
                clientChannel = server.accept();
                clientChannel.configureBlocking(false);// 非阻塞
                SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);//将Channel注册到Selector上,并告诉Selector对读感兴趣,Channel准备好读时给线程一个通知
                EchoClient ec = new EchoClient();
                clientKey.attach(ec);// 客户端实例作为附件,附加到表示这个连接的SelectionKey上,可以在整个连接过程共享ec
                InetAddress clientAddress  = clientChannel.socket().getInetAddress();
                System.out.println("Accepted connection from "+clientAddress.getHostAddress());
            } catch (Exception e) {}
        }

      EchoClient封装一个队列,保存在需要恢复给这个客户端所有信息上,这样再进行回复,只要outq对象中弹出元素即可。

    public class EchoClient {
        private LinkedList<ByteBuffer> outq;
        public EchoClient() {
            this.outq = new LinkedList<ByteBuffer>();
        }
        public LinkedList<ByteBuffer> getOutq() {
            return outq;
        }
        public void enqueue(ByteBuffer bb) {
            this.outq.addFirst(bb);
        }
    }

    下面看看doRead()方法的实现。

        private void doRead(SelectionKey sk) {
            SocketChannel c = (SocketChannel) sk.channel();
            ByteBuffer bb = ByteBuffer.allocate(8192);
            int len;
            try {
                len = c.read(bb);// 存放读取的数据
                if(len<0){
                    disconnect(sk);
                    return;
                }
            } catch (Exception e) {
                System.out.println("Failed to read from client!");
                e.printStackTrace();
                disconnect(sk);
                return;
            }
            bb.flip();
            tp.execute(new HandleMsg(sk,bb)); // 线程池处理数据
        }

      HandleMsg的实现很简单:

    public class HandleMsg implements Runnable{
    
        SelectionKey sk;
        ByteBuffer bb;
        public HandleMsg(SelectionKey sk,ByteBuffer bb){
            this.sk = sk;
            this.bb = bb;
        }
        @Override
        public void run() {
            EchoClient ec = (EchoClient) sk.attachment();
            ec.enqueue(bb);// 将收到的数据压入队列,业务逻辑也可以在这个地方处理了
            sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
            selector.wakeup();// 强迫Selector立即返回
        }
    }

      doWrite()代码如下,这个方法拿到的sk和doread()方法拿到的是同一个,通过这个sk可以操作共享的EchoClient

        private void doWrite(SelectionKey sk) {
            SocketChannel c = (SocketChannel) sk.channel();
            EchoClient ec = (EchoClient) sk.attachment();
            LinkedList<ByteBuffer> outq = ec.getOutq();
            ByteBuffer bb = outq.getLast();// 列表顶部元素,写回客户端
            try {
                int len = c.write(bb);
                if(len == -1){
                    disconnect(sk);
                    return;
                }
                if(bb.remaining()== 0){
                    outq.removeLast();// 缓冲区已经完成写,删除它
                }
            } catch (Exception e) {
                System.out.println("Failed to write to client.");
                e.printStackTrace();
                disconnect(sk);
                return;
            }
            if(outq.size()==0){
                sk.interestOps(SelectionKey.OP_READ);
            }
        }

    下面用NIO设计一个客户端

      首先初始化Selector和Channel

    private Selector selector;
    public void init(String ip,int port) throws IOException{
        SocketChannel s = SocketChannel.open();
        s.configureBlocking(false);
        this.selector = SelectorProvider.provider().openSelector();
        s.connect(new InetSocketAddress(ip,port));// 并不定连接成功,需要finishConnect()确认
        s.register(selector, SelectionKey.OP_CONNECT);
    }

      程序的工作执行逻辑,主要两件事,一个是链接就绪的Connect,一个是刻度的read()事件:

        public void working() throws IOException{
            while(true){
                if(!this.selector.isOpen()){
                    break;
                }
                this.selector.select();
                Iterator<SelectionKey> i = this.selector.selectedKeys().iterator();
                while(i.hasNext()){
                    SelectionKey key = i.next();
                    i.remove();
                    if(key.isConnectable()){
                        connect(key);// 判断有没有完成连接,没有的话使用finishConnect()方法完成连接,并向Channel中写入数据及感兴趣的事情
                    }else if(key.isReadable()){
                        read(key);
                    }
                }
            }
        }

    下面是read事件

        private void read(SelectionKey key) throws IOException {
            SocketChannel c = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(100);
            c.read(buffer);
            byte[] bs = buffer.array();
            String msg = new String(bs).trim();
            System.out.println("客户端收到信息:"+msg);
            c.close();
            key.selector().close();
        }
  • 相关阅读:
    网络管理 之 Fedora Core 网络配置工具systemconfignetwork介绍
    文件系统管理 之 在Fedora core 4.0 加载NTFS和FAT32分区详述
    系统引导管理 之 系统引导管理器GRUB,为初学者指南
    文件系统管理 之 reiserfs文件系统反删除(Undelete)操作的实践
    文件系统管理 之 Linux 文件系统概述
    安装配置管理 之 apt+synaptic 为Fedora core 4.0 中安装Nvida芯片显示卡及Ati 卡显示驱动
    安装配置管理 之 安装和配置 JPackage Java
    安装配置管理 之 Fedora 6.0 蓝牙bluebooth传送文件的问题解决方法
    软件包管理 之 关于Fedora Core 5.0 通过Yum在线升级说明
    软件包管理 之 文件解压缩
  • 原文地址:https://www.cnblogs.com/wangyongwen/p/11337420.html
Copyright © 2011-2022 走看看