zoukankan      html  css  js  c++  java
  • java网络编程中的BIO,NIO,AIO

    BIO,是指Blocking IO阻塞的io,是针对socket编程从网络上读写操作而言的。

    对于传统的IO,在socket编程中,read,write,connect服务端的accept都会产生阻塞。

    因此,在jdk1.4之前,如果遇到并发请求。那么就需要开启多个线程,每个线程处理一个网络请求。

    这就会造成问题

    线程的增多,会造成cpu压力增大

    线程增大,会导致线程栈增多,因此会占用大量的内存。

    然而这些线程,并不是都在请求,所以会造成cpu,内存的浪费。

    利用线程池确实可以解决,但是线程池也是有数量的。假设有5个线程,那如果这5个线程同时阻塞了。那么另外的线程就又在等待了。

    基于上诉问题。NIO出现了。

    NIo首先设计了Channel,使得socket的accept方法不在阻塞,然后将所有的通道(这里是socket在NIO中是通道)注册在一个多路复用器中,Selector。这个多路复用器会阻塞调用select方法。这个方法会返回所有有事件的通道个数,如果个数为0,则阻塞。直到有活跃通道。这里的事件是指,连接,读,写等等。然后再根据事件,处理通道。因此,这个可以单线程处理所有客户端。第一,服务器监听客户端连接,不在阻塞。第二,采用了事件触发。只有当有事件的通道,cpu才会去执行。没有事件,就一直轮训。这里的事件包括客户端连接服务器。即accept。

    但是这模式也有弊端。第一,如果有长连接,单线程下,在处理这个长连接的同时,是不能处理其他通道的读写事件和监听事件的。如果通道的连接处理时间过长,仍然会产生阻塞。这时就产生了多线程NIO,将通道处理的逻辑放在线程池中去做。

    这种方法仍然有弊端,就是select轮询所有的注册通道,因为有的通道注册了,但是并没有事件,如果连接数很多,依然会造成性能浪费。依然不能最大化的压榨服务器性能。所以产生了另外的react方式。

     bio是传统的阻塞,等待通信,实现多客户端需要采用循环监听加多线程读写。子线程阻塞不影响主线程监听。主线程监听阻塞不影响子线程读写。否则,监听的阻塞,在没监听到客户端连接的情况下,程序阻塞,将无法读取已监听的客户端的读写。或者在读写已经连接的客户端时候,程序阻塞,因此无法监听新的客户端

    nio是采用了事件触发模式。首先监听不在阻塞。此时在没有监听到客户端连接的情况下,不影响已连接客户端的通信。第二,把客户端通道设为非阻塞,在客户端没有读到内容时,将不再阻塞,因此程序会继续循环监听。所以,如果直接读取内容,一旦客户端没有发送数据,程序将继续进行,不在阻塞等待。此时如果另外的客户端连接上来,那么前一个通道将被覆盖。因此,需要不断的循环监听同时,将连接到的客户端放入List。然后轮询list,每当拿出一个channel,就read,如果读到内容就处理,读不到就继续下一个,因为是非阻塞的。这时就实现了多路复用。

    上面的多路复用是我们自己实现的。效率比较低。jvm采用了poll,select,epoll来让操作系统实现这个轮询。一旦发现有读写操作的channel,就会返回活跃的通道个数,并且有selectedkeys返回活跃通道集合,然后我们根据通道注册的事件,来进行下一步的处理。就不用轮询所有的channel了。

    package NIO;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    public class ServerTest {
        private Selector selector=null;
        private int port=9999;
        public void init(){
            try {
                selector=Selector.open();
                System.out.println("多路复用器准备就绪。。。");
                ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(port));
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("服务端通道已注册。。。");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void run(){
            while (true) {
                try {
                    if (!(selector.select()>0)) {
                        System.out.println("没有活跃通道!");
                        break;
                    }
                    System.out.println("有"+selector.select()+"条活跃通道");
                    for(SelectionKey sk:selector.selectedKeys()){
                        selector.selectedKeys().remove(sk);
                        if(sk.isAcceptable()){
                            System.out.println("服务端通道有连接事件");
    
                            SocketChannel socketChannel=((ServerSocketChannel)sk.channel()).accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector,SelectionKey.OP_READ);
                            System.out.println("客户端通道注册完成!");
                        }
                        if(sk.isReadable()){
                            System.out.println("有活跃通道读事件");
                            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                            String content="";
                            try {
                                while (((SocketChannel) sk.channel()).read(byteBuffer) > 0) {
                                    byteBuffer.flip();
                                    content += Charset.defaultCharset().decode(byteBuffer);
                                }
                                System.out.println(content);
                            }catch (Exception e){
                                sk.cancel();
                                if(sk.channel()!=null){
                                    sk.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        public static void main(String[] args) {
            ServerTest ser=new ServerTest();
            ser.init();
            ser.run();
        }
    }
    

      代码中的

    ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
    的open方法,

     

     

     

    而产生provider需要调用create方法,create方法实现如下

     

     

     

     最终调了poll0方法,poll0在操作系统中调用了Windows的select方法。这是在Windows系统下的

    如果是在linux下,则会掉epoll。

    这两个方法,可以将注册在多路复用器中的channel,交给操作系统内核,然后他的轮询就交由操作系统来做。因此效率比较高。

    另外NIO还有一个特性是0拷贝,也是效率比较高。

     上面介绍的BIO和NIO都是同步IO,因为IO分为两部分,1,程序发出IO请求。2完成实际的IO操作。对于前面的两种阻塞和非阻塞,是针对于第一步,划分的。如果发出请求会阻塞线程,则为阻塞IO,如果没有阻塞线程,则为非阻塞IO。但是同步IO与异步IO的区别在第二步,如果实际的IO操作由操作系统完成,将结果返回给应用程序,这就是异步IO。如果实际的IO需要应用程序本身区执行,会阻塞线程,那就是同步IO。BIO和NIO都是同步IO,NIO是同步非阻塞IO

    异步IO是采用

    AsynchronousServerSocketChannel ,
    AsynchronousSocketChannel
    两个通道来完成。服务端通道的accept方法有两种一种是普通,另一种需要传递
    AsynchronousChannelGroup 
    例子如下
    public class server02 {
        static List<AsynchronousSocketChannel>channelList=new ArrayList<AsynchronousSocketChannel>();
        public static void main(String[] args) {
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            ExecutorService executorService= Executors.newFixedThreadPool(80);
            try {
                AsynchronousChannelGroup channelGroup=AsynchronousChannelGroup.withThreadPool(executorService);
                AsynchronousServerSocketChannel serverSocketChannel=
                        AsynchronousServerSocketChannel.open(channelGroup);
                serverSocketChannel.bind(new InetSocketAddress(8888));
                System.out.println("开启监听。。。。");
                serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                    @Override
                    public void completed(AsynchronousSocketChannel sc, Object o) {
                        System.out.println("连接成功,接收数据。。。");
                        channelList.add(sc);
                        serverSocketChannel.accept(null,this);
                       sc.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
                           @Override
                           public void completed(Integer integer, Object o) {
                               byteBuffer.flip();
                               System.out.println("接收到数据:");
                               System.out.println(Charset.defaultCharset().decode(byteBuffer));
                               System.out.println("发送给其他客户端");
                               for (var ch:channelList
                                    ) {
                                   try {
                                       ch.write(byteBuffer).get();
                                   } catch (InterruptedException e) {
                                       e.printStackTrace();
                                   } catch (ExecutionException e) {
                                       e.printStackTrace();
                                   }
    
                               }
                               byteBuffer.clear();
                               sc.read(byteBuffer,null,this);
                           }
    
                           @Override
                           public void failed(Throwable throwable, Object o) {
                               System.out.println("读取数据失败:"+throwable);
                               channelList.remove(sc);
    
                           }
                       });
                               
                    }
    
                    @Override
                    public void failed(Throwable throwable, Object o) {
                        System.out.println("连接失败:"+throwable+"!");
    
                    }
                });
                Scanner scanner=new Scanner(System.in);
                scanner.nextInt();
            } catch (Exception e) {
                e.printStackTrace();
            }
          
        }
    

      

  • 相关阅读:
    AGC037F Counting of Subarrays
    AGC025F Addition and Andition
    CF506C Mr. Kitayuta vs. Bamboos
    AGC032D Rotation Sort
    ARC101F Robots and Exits
    AGC032E Modulo Pairing
    CF559E Gerald and Path
    CF685C Optimal Point
    聊聊Mysql索引和redis跳表
    什么是线程安全
  • 原文地址:https://www.cnblogs.com/tomato190/p/12678450.html
Copyright © 2011-2022 走看看