Blocking IO
public class BIOServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(9090)); while (true) { Socket client = server.accept(); System.out.println("client " + client.getPort() + " connection " ); new Thread(() -> { try { InputStream is = client.getInputStream(); byte[] bytes = new byte[1024]; int read = is.read(bytes); System.out.println("client " + client.getPort() + " " + new String(bytes, 0, read)); // is.close(); } catch (IOException e) { e.printStackTrace(); } }).start(); } } }
NoBlocking-IO
public class NIOServer { public static void main(String[] args) throws IOException { LinkedList<SocketChannel> clients = new LinkedList<>(); ServerSocketChannel ss = ServerSocketChannel.open(); ss.bind(new InetSocketAddress(9090)); ss.configureBlocking(false); //重点 OS NONBLOCKING!!! while (true) { SocketChannel client = ss.accept(); //不会阻塞? -1NULL if (client == null) { System.out.println("null....."); } else { client.configureBlocking(false); int port = client.socket().getPort(); System.out.println("client...port: " + port); clients.add(client); } ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //可以在堆里 堆外 for (SocketChannel c : clients) { //串行化!!!! 多线程!! int num = c.read(buffer); // >0 -1 0 //不会阻塞 if (num > 0) { buffer.flip(); byte[] aaa = new byte[buffer.limit()]; buffer.get(aaa); String b = new String(aaa); System.out.println(c.socket().getPort() + " : " + b); buffer.clear(); } } } } }
New-IO
public class SocketMultiplexingSingleThreadv1 { //马老师的坦克 一 二期 private ServerSocketChannel server = null; private Selector selector = null; //linux 多路复用器(select poll epoll kqueue) nginx event{} int port = 9090; public void initServer() { try { server = ServerSocketChannel.open(); server.configureBlocking(false); server.bind(new InetSocketAddress(port)); //如果在epoll模型下,open--》 epoll_create -> fd3 selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正 //server 约等于 listen状态的 fd4 /* register 如果: select,poll:jvm里开辟一个数组 fd4 放进去 epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN */ server.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void start() { initServer(); System.out.println("服务器启动了。。。。。"); try { while (true) { //死循环 Set<SelectionKey> keys = selector.keys(); System.out.println(keys.size() + " size"); //1,调用多路复用器(select,poll or epoll (epoll_wait)) /* select()是啥意思: 1,select,poll 其实 内核的select(fd4) poll(fd4) 2,epoll: 其实 内核的 epoll_wait() *, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时 selector.wakeup() 结果返回0 懒加载: 其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用 */ while (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合 Iterator<SelectionKey> iter = selectionKeys.iterator(); //so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!! // NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了? //幕兰,是不是很省力? //我前边可以强调过,socket: listen 通信 R/W while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); //set 不移除会重复循环处理,select,会在jvm中开辟一个set 用来存放有事件发生的fd,每次select.select()会将从内核中获取的fd会合并到set中,一起返回,所以处理完成要移除,不然下次还会在获取到,该remove()不涉及内核系统调用 if (key.isAcceptable()) { //看代码的时候,这里是重点,如果要去接受一个新的连接 //语义上,accept接受连接且返回新连接的FD对吧? //那新的FD怎么办? //select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起 //epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间 acceptHandler(key); } else if (key.isReadable()) { readHandler(key); //连read 还有 write都处理了 //在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。 //所以,为什么提出了 IO THREADS //redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的 //tomcat 8,9 异步的处理方式 IO 和 处理上 解耦 } } } } } catch (IOException e) { e.printStackTrace(); } } public void acceptHandler(SelectionKey key) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7 client.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(8192); //前边讲过了 // 0.0 我类个去 //你看,调用了register /* select,poll:jvm里开辟一个数组 fd7 放进去 epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN */ client.register(selector, SelectionKey.OP_READ, buffer); System.out.println("-------------------------------------------"); System.out.println("新客户端:" + client.getRemoteAddress()); System.out.println("-------------------------------------------"); } catch (IOException e) { e.printStackTrace(); } } public void readHandler(SelectionKey key) { // key.cance(),将fd从内核的监视列表中移除(会有内核的调用,可以理解为epoll_ctl((fd6,del,fd7))同时也会将其放入selector的canceled set中,在一次调用select()方法的时候,会将其从选择器的所有键集合(即 iter.remove() 所对应set)中移除,后续在有数据发送select.select()不会在获取到,(但是不知道为啥不允许再将该fd注册到监视列表中),涉及内核的系统调用 //此处不可以开启多线程处理,如果开启多线程处理,select.select()会再次获取该client,如果想要用多线程处理,此处需要将当前线程阻塞,等多线程执行完之后再唤醒当前线程,是不是感觉很奇怪?这就是为啥netty要有EventLoopGroup的原因 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read = 0; try { while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { client.write(buffer); } buffer.clear(); } else if (read == 0) { break; } else { try { Thread.sleep(5*1000); } catch (InterruptedException e) { e.printStackTrace(); } client.close(); System.out.println( "...close"); break; } } } catch (IOException e) { //e.printStackTrace(); } } public static void main(String[] args) { SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1(); service.start(); } }