zoukankan      html  css  js  c++  java
  • NIO Java API

    Blocking IO

    public class BIOServer {
        public static void main(String[] args) throws IOException {
            ServerSocket server = new ServerSocket();
            server.bind(new InetSocketAddress(9090));
    
            while (true) {
                Socket client = server.accept();
                System.out.println("client	" + client.getPort() + " connection  " );
                new Thread(() -> {
                    try {
                        InputStream is = client.getInputStream();
                        byte[] bytes = new byte[1024];
                        int read = is.read(bytes);
                        System.out.println("client	" + client.getPort() + "   " + new String(bytes, 0, read));
    //                    is.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    
        }
    }

    NoBlocking-IO

    public class NIOServer {
        public static void main(String[] args) throws IOException {
            LinkedList<SocketChannel> clients = new LinkedList<>();
            ServerSocketChannel ss = ServerSocketChannel.open();
            ss.bind(new InetSocketAddress(9090));
            ss.configureBlocking(false); //重点  OS  NONBLOCKING!!!
            while (true) {
                SocketChannel client = ss.accept(); //不会阻塞?  -1NULL
                if (client == null) {
                    System.out.println("null.....");
                } else {
                    client.configureBlocking(false);
                    int port = client.socket().getPort();
                    System.out.println("client...port: " + port);
                    clients.add(client);
                }
                ByteBuffer buffer = ByteBuffer.allocateDirect(4096);  //可以在堆里   堆外
                for (SocketChannel c : clients) {   //串行化!!!!  多线程!!
                    int num = c.read(buffer);  // >0  -1  0   //不会阻塞
                    if (num > 0) {
                        buffer.flip();
                        byte[] aaa = new byte[buffer.limit()];
                        buffer.get(aaa);
    
                        String b = new String(aaa);
                        System.out.println(c.socket().getPort() + " : " + b);
                        buffer.clear();
                    }
    
    
                }
            }
        }
    }

    New-IO

     

    public class SocketMultiplexingSingleThreadv1 {
    
        //马老师的坦克 一 二期
        private ServerSocketChannel server = null;
        private Selector selector = null;   //linux 多路复用器(select poll    epoll kqueue) nginx  event{}
        int port = 9090;
    
        public void initServer() {
            try {
                server = ServerSocketChannel.open();
                server.configureBlocking(false);
                server.bind(new InetSocketAddress(port));
    
    
                //如果在epoll模型下,open--》  epoll_create -> fd3
                selector = Selector.open();  //  select  poll  *epoll  优先选择:epoll  但是可以 -D修正
    
                //server 约等于 listen状态的 fd4
                /*
                register
                如果:
                select,poll:jvm里开辟一个数组 fd4 放进去
                epoll:  epoll_ctl(fd3,ADD,fd4,EPOLLIN
                 */
                server.register(selector, SelectionKey.OP_ACCEPT);
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void start() {
            initServer();
            System.out.println("服务器启动了。。。。。");
            try {
                while (true) {  //死循环
    
                    Set<SelectionKey> keys = selector.keys();
                    System.out.println(keys.size() + "   size");
    
    
                    //1,调用多路复用器(select,poll  or  epoll  (epoll_wait))
                    /*
                    select()是啥意思:
                    1,select,poll  其实  内核的select(fd4)  poll(fd4)
                    2,epoll:  其实 内核的 epoll_wait()
                    *, 参数可以带时间:没有时间,0  :  阻塞,有时间设置一个超时
                    selector.wakeup()  结果返回0
    
                    懒加载:
                    其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用
    
                     */
                    while (selector.select() > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();  //返回的有状态的fd集合
                        Iterator<SelectionKey> iter = selectionKeys.iterator();
                        //so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
                        //  NIO  自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
                        //幕兰,是不是很省力?
                        //我前边可以强调过,socket:  listen   通信 R/W
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            iter.remove(); //set  不移除会重复循环处理,select,会在jvm中开辟一个set 用来存放有事件发生的fd,每次select.select()会将从内核中获取的fd会合并到set中,一起返回,所以处理完成要移除,不然下次还会在获取到,该remove()不涉及内核系统调用
                            if (key.isAcceptable()) {
                                //看代码的时候,这里是重点,如果要去接受一个新的连接
                                //语义上,accept接受连接且返回新连接的FD对吧?
                                //那新的FD怎么办?
                                //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
                                //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
                                acceptHandler(key);
                            } else if (key.isReadable()) {
                                readHandler(key);  //连read 还有 write都处理了
                                //在当前线程,这个方法可能会阻塞  ,如果阻塞了十年,其他的IO早就没电了。。。
                                //所以,为什么提出了 IO THREADS
                                //redis  是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
                                //tomcat 8,9  异步的处理方式  IO  和   处理上  解耦
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void acceptHandler(SelectionKey key) {
            try {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7
                client.configureBlocking(false);
    
                ByteBuffer buffer = ByteBuffer.allocate(8192);  //前边讲过了
    
                // 0.0  我类个去
                //你看,调用了register
                /*
                select,poll:jvm里开辟一个数组 fd7 放进去
                epoll:  epoll_ctl(fd3,ADD,fd7,EPOLLIN
                 */
                client.register(selector, SelectionKey.OP_READ, buffer);
                System.out.println("-------------------------------------------");
                System.out.println("新客户端:" + client.getRemoteAddress());
                System.out.println("-------------------------------------------");
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void readHandler(SelectionKey key) {
            // key.cance(),将fd从内核的监视列表中移除(会有内核的调用,可以理解为epoll_ctl((fd6,del,fd7))同时也会将其放入selector的canceled set中,在一次调用select()方法的时候,会将其从选择器的所有键集合(即 iter.remove() 所对应set)中移除,后续在有数据发送select.select()不会在获取到,(但是不知道为啥不允许再将该fd注册到监视列表中),涉及内核的系统调用
        //此处不可以开启多线程处理,如果开启多线程处理,select.select()会再次获取该client,如果想要用多线程处理,此处需要将当前线程阻塞,等多线程执行完之后再唤醒当前线程,是不是感觉很奇怪?这就是为啥netty要有EventLoopGroup的原因
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            buffer.clear();
            int read = 0;
            try {
                while (true) {
                    read = client.read(buffer);
                    if (read > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            client.write(buffer);
                        }
                        buffer.clear();
                    } else if (read == 0) {
                        break;
                    } else {
                        try {
                            Thread.sleep(5*1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                        client.close();
                        System.out.println( "...close");
                        break;
                    }
                }
            } catch (IOException e) {
                //e.printStackTrace();
    
            }
        }
    
        public static void main(String[] args) {
            SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
            service.start();
        }
    }

      

  • 相关阅读:
    Caused by: com.mysql.cj.core.exceptions.InvalidConnectionAttributeException: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the
    Caused by: java.lang.IllegalArgumentException: @EnableAsync annotation metadata was not injected
    jpa单向一对一关系外键映射
    jpa关联映射
    svn 插件去除已经保存的密码方法
    【前端】less学习
    【CodeForces 520E】Pluses everywhere
    费马小定理证明
    【前端】纯前端的一个‘喜欢我吗?’
    Sublime text3 插件HTML/CSS/JS prettify 格式化代码
  • 原文地址:https://www.cnblogs.com/Tony100/p/14097724.html
Copyright © 2011-2022 走看看