zoukankan      html  css  js  c++  java
  • NIO

    NIO

      -- Channels  通道

        -- FileChannel -- DatagramChannel  --SocketChannel --ServerSocketChannel

      -- Buffers  缓冲区

        -- ByteBuffer 基本类型 CharBuffer 。。。。   Mappedyteuffer 直接内存映射文件

      --Selectors   选择器

    要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

    IO 与NIO

    IO是面向流的Stream ,NIO是面向缓冲区的Buffer 

    阻塞与非阻塞IO 

    其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西。

    以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。

    nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。

    当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们。

    这其实也是非常接近当前社会分工细化的现实,也是统分利用现有资源达到并发效果的一种很经济的手段,而不是动不动就来个并行处理,虽然那样是最简单的,但也是最浪费资源的方式

    选择器(Selectors) 

    可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。 

    Channel

      getChannel()   open()   Files的newByteChannel

    Buffer

      mark 回读标志  position 读取定位  limit   读取几个   capacity  容量

      flip()  读取后切换到写模式  limit = position   position = 0

      clear() 定位数清空 compact()方法只会清除已经读过的数据,未读的续着

    写数据到Buffer:                                               从Buffer中读取数据:
      inchannel.read(Buffer)                                        outChannel.write(buffer)

      buffer.put()                                                            buffer.get();

      rewind()   重读position= 0 

    分散(Scatter)/聚集(Gather)

    ByteBuffer[] bufferArray = { header, body }; 

    通道之间的数据传输

    toChannel.transferFrom(position, count, fromChannel)

    选择器(Selector)

    一个单独的线程可以管理多个channel,从而管理多个网络连接。 

    Selector创建 Selector selector = Selector.open();

    向Selector注册通道  channel.configureBlocking(false); 通道设为非阻塞 

     SelectionKey key = channel.register(selector, Selectionkey.OP_READ); (Connect Accept, Read,Write)  注册多个int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

    int interestSet = selectionKey.interestOps();    interestSet &SelectionKey.OP_CONNECT == isConnectable(); 类似  或

    isAcceptable();   isReadable();  isWritable();   监测什么事件已经就绪

      从SelectionKey访问Channel和Selector       selectionKey.channel();     selectionKey.selector();  

      附加对象到SelectionKey 上 selectionKey.attach(theObject);     Object attachedObj = selectionKey.attachment();  

    select()阻塞到至少有一个通道在你注册的事件上就绪了

     select()方法返回的int值表示有多少通道已经就绪

    selectedKeys() 

    一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示: 循环遍历

    keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中   

    SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。 

    public class Server {
        public static void server(String[] args) throws IOException {
            ServerSocket ss = null;
           
            
            try {
                ss = new ServerSocket(1111);
            } catch (IOException e) {
                e.printStackTrace();
            }
            Socket s = null;
            while (true) {
                s = ss.accept();
                new Thread(
                    () -> {
                        PrintWriter pw = null;
                        BufferedReader br = null;
                        pw = new PrintWriter(s.getOutputStream(), true);
                        br = new BufferedReader(new InputStreamReader(s.getInputStream()));
                        
                        pw.println("Hello 
    ");
                        pw.flush();
                        
                        while (true) {
                            String str = br.readLine();
                            System.out.println(str);
                            if (str == "") {
                                break;
                            } else {
                                pw.println("echo"+str);
                                pw.flush();
                            }
                        }
                        pw.close();
                        br.close();
                        s.close();
                        ss.close();
                    }).start();
                }
                
            }
        
        public static void Client(String[] args) throws IOException
        {
            Socket socket = null;
            PrintWriter pw = null;
            BufferedReader br = null;
            
            try
            {
                socket = new Socket("localhost", 1111);
                pw = new PrintWriter(socket.getOutputStream(), true);
                br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            }
            catch (UnknownHostException e)
            {
                System.err.println("Don't know abount host:localhost");
                System.exit(1);
            }
            System.out.println(br.readLine());
            System.out.println(br.readLine());
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
            String userInput;
            // 将客户端Socket输入流(即服务器端Socket的输出流)输出到标准输出上
            while ((userInput = stdIn.readLine()) != null)
            {
                pw.println(userInput);
                System.out.println(br.readLine());
            }
            // 同样的,将该关闭的资源给关闭掉
            pw.close();
            br.close();
            socket.close();
        }
        }
    多线程BIOSOCKET
    package nio;/*作者:马兴争
     *日期: 2018年7月16日
     *时间: 下午11:49:38
     **/
    /**
     * @author mxz
     *一'使用NIO 完成网络通信的三个核心
     *    1.通道:负责连接
     *    2.缓冲区(Buffer):负责数据的存取
     *            SelectableChannel
     *                SocketChannel
     *                ServerSocketChannel
     *                DatagramChannel
     *    3.选择器(Selector):是SelectableChannel的多路复用器,用于监控SelectableChannel的IO状况
     */
    public class TestBlockingNIO {
        
        //客户端
        @Test
        public void client() throws IOException {
            //1.获取通道
            SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 80));
            
            //2.切换到非阻塞模式
            sChannel.configureBlocking(false);
            
            //3.分配指定大小缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);
            
            Scanner scan = new Scanner(System.in);
            
            while (scan.hasNext()) {
                String str = scan.next();
                //4.发送数据给服务端
                buf.put((LocalDateTime.now().toString() + "
    " + str).getBytes());
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            
            //5.关闭通道
            sChannel.close();
        }
        
        @Test
        public void server() throws IOException {
            //1.获取通道
            ServerSocketChannel ssChannel = ServerSocketChannel.open();
            
            //2.切换非阻塞
            ssChannel.configureBlocking(false);
            //绑定连接
            ssChannel.bind(new InetSocketAddress(80));
            
            //4.获取选择器
            Selector selector = Selector.open();
            //5.将通道注册到选择器上  监控什么状态及事件  (接收事件)
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
            
            //6.轮询式的获取选择器上已经准备就绪的事件
            while (selector.select() > 0) {
                //获取当前选择器中所有注册的选择键(已就绪的监听事件)
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                
                while (it.hasNext()) {
                    //8.获取准备就绪的事件
                    SelectionKey sk = it.next();
                    //9.判断是什么事件准备就绪
                    if(sk.isAcceptable()) {
                        //10.若接受就绪,获取客户端连接
                        SocketChannel sChannel = ssChannel.accept();
                        
                        //11.切换非阻塞模式
                        sChannel.configureBlocking(false);
                        
                        //12.将该通道注册到选择器上
                        sChannel.register(selector, SelectionKey.OP_READ);
                    } else if (sk.isReadable()) {
                        //13 获取当前选择器上读就绪'状态'的通道
                        SocketChannel sChannel = (SocketChannel) sk.channel();
                        
                        //14 读取数据
                        ByteBuffer buf = ByteBuffer.allocate(1024);
                        int len = 0;
                        while ((len = sChannel.read(buf)) > 0) {
                            buf.flip();
                            System.err.println(new String(buf.array(), 0, len));
                            buf.clear();
                        }
                    }
                    //15.取消选择键SelectionKey
                    it.remove();
                }
            }
        }
    }
    //客户端
        @Test
        public void client() throws IOException {
            //1.获取通道
            SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 80));
            
            FileChannel inChannel = FileChannel.open(Paths.get("biji.txt"), StandardOpenOption.READ);
            //2.分配制定大小的缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);
            //3.读取本地文件,并发到服务端
    //        while (inChannel.read(buf) != -1) {
    //            buf.flip();
    //            sChannel.write(buf);
    //            buf.clear();
    //        }
            Scanner sc = new Scanner(System.in);
            while (sc.hasNext()) {
                String s = sc.next();
                buf.put(s.getBytes());
                buf.flip();
                sChannel.write(buf);
                buf.clear();
            }
            //通知服务端,我发完了
            sChannel.shutdownOutput();
            
            //接收服务端的反馈
            int len =0;
            while((len = sChannel.read(buf))!= -1) {
                buf.flip();
                System.out.println(new String(buf.array(), 0, len));
                buf.clear();
            }
            //4.关闭通道
            inChannel.close();
            sChannel.close();
        }
        
        @Test
        public void server() throws IOException {
            //1.获取通道
            ServerSocketChannel ssChannel = ServerSocketChannel.open();
            
            FileChannel outChannel = FileChannel.open(Paths.get("2.txt"), StandardOpenOption.WRITE,StandardOpenOption.CREATE);
            //2.绑定连接
            ssChannel.bind(new InetSocketAddress(80));
            
            //3.获取客户端连接的通道
            SocketChannel sChannel = ssChannel.accept();
            
            ByteBuffer buf = ByteBuffer.allocate(1024);
            
            //4.接收客户端的数据并保存到本地
            while (sChannel.read(buf) != -1) {
                buf.flip();
                outChannel.write(buf);
                buf.clear();
            }
            buf.put("服务端接收数据成功".getBytes());
            sChannel.write(buf);
            
            //关闭通道
            sChannel.close();
            outChannel.close();
            ssChannel.close();
         
        }
    普通Channel 无使用selector

    文件通道

    FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下

    //利用通道完成文件的复制(非直接缓冲区)
        @Test
        public void test2() throws IOException {
            
            FileInputStream fis = new FileInputStream("20180715-033358-002.jpg");
            FileOutputStream fos = new FileOutputStream("2.jpg");
            //1 获取通道
            FileChannel inChannel = fis.getChannel();
            FileChannel outChannel = fos.getChannel();
            
            //2分配指定大小的缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);
            
            //3.将通道中的数据存入缓冲区中
            while (inChannel.read(buf) != -1) {
                buf.flip();
                //4 将缓冲中的数据写入通道中
                outChannel.write(buf);
                buf.clear();//清空缓冲区
            }
            
            outChannel.close();
            inChannel.close();
            fis.close();
            fos.close();
        }
    利用通道完成文件的复制
    //利用直接缓冲区完成文件的复制(内存映射文件)
            @Test
            public void test3() throws IOException {
                FileChannel inChanel = FileChannel.open(Paths.get("20180715-033358-002.jpg"), StandardOpenOption.READ);
                //CREATE_NEW 存在报异常  CREATE 直接覆盖
                FileChannel outChanel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ,StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
                
                //内存映射文件
                MappedByteBuffer inMappedBuf = inChanel.map(MapMode.READ_ONLY, 0, inChanel.size());
                MappedByteBuffer outMappedBuf = outChanel.map(MapMode.READ_WRITE, 0, inChanel.size());
                
                //直接对缓冲区进行数据的读写操作
                byte[] dst = new byte[inMappedBuf.limit()];
                inMappedBuf.get(dst);
                outMappedBuf.put(dst);
                
                inChanel.close();
                outChanel.close();
            }
    利用直接缓冲区完成文件的复制(内存映射文件)
            //通道之间的数据传输
                    @Test
                    public void test4() throws IOException {
                        FileChannel inChanel = FileChannel.open(Paths.get("20180715-033358-002.jpg"), StandardOpenOption.READ);
                        //CREATE_NEW 存在报异常  CREATE 直接覆盖
                        FileChannel outChanel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ,StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
                        inChanel.transferTo(0 , inChanel.size(), outChanel);
                        
                        inChanel.close();
                        outChanel.close();
                    }
    通道间数据传输

    管道

        public void test() throws IOException {
            //1.获取管道
            Pipe pipe = Pipe.open();
            
            //2.将缓冲区中的数据写入管道
            ByteBuffer buf = ByteBuffer.allocate(1024);
            
            Pipe.SinkChannel sinkChannel = pipe.sink();
            
            buf.put("通过单项管道发送数据".getBytes());
            buf.flip();
            sinkChannel.write(buf);
            
            //读取缓冲区的数据
            SourceChannel sourceChannel = pipe.source();
            buf.flip();
            int len = sourceChannel.read(buf);
            System.out.println(new String(buf.array(), 0, len));
            sourceChannel.close();
            sinkChannel.close();
        }

    缓冲区 负责数据的存取 就是数组 ByteBuffer CharBuffer 都是通过allocate()获取缓冲区 ByteBuffer.allocate(1024)
    allocate (缓冲区建立在jvm上 ) allocateDirect (直接缓冲区建立在物理硬盘上)
    put flip(切换到读取模式 将postion 和limit设定) get rewind(切换到再读一遍) clear 清空缓冲区(其实还在,只是吧position,limit,capity设为)
    mark reset 0<=mark<=position <=limit<=capacity hasRemaing(看缓冲区还有数据妈)
    通道Channel
    FileChannel SocketChannel ServerSocketChannel DatagramChannel(udp)
    获取通道 getChannel() open() Files的newByteChannel

    transferFrom

  • 相关阅读:
    20161101学习笔记
    20161031学习笔记
    20161028学习笔记
    20161027学习笔记
    ReentrantLock Condition
    ReentrantLock 重入锁
    CountDownLatch用法与原理
    场景化解释 AQS原理
    Atomic
    多线程工具类
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9323677.html
Copyright © 2011-2022 走看看