阻塞IO的server结构图:
阻塞IO的server版本代码:
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); //创建新线程来handle // or, single-threaded, or a thread pool } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
在上面版本的代码中,有两个地方是阻塞的:
其一是ServerSocket.accept( )方法,该方法的语义是到连接请求队列中,取出一个建立连接,建立好后,返回这个链接的句柄(Socket)。当队列中无连接请求时,该方法就会一直阻塞。也就是,在返回一个成功建立的连接的句柄之前,该方法是一直阻塞的。Server在阻塞期间,是无法处理其他新到达的连接的。
其二是在handler中的Socket.getInputStream( ).read( )方法,ServerSocket.accept( )方法返回的只是一个句柄(Socket),handler通过这个句柄读/写数据时,都是使用阻塞IO的。比如,要完全从连接上读完数据之后,才能开始process数据。而一般从网络上读数据都是比较慢的,这种阻塞,会使整个server创建大量的handler,而大部分的handler都在等待IO。
非阻塞IO的server结构图:
非阻塞IO的server版本:{Reactor模式}
public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); //非阻塞 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件 sk.attach(new Acceptor()); //attach callback object, Acceptor } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey)(it.next())); //Reactor负责dispatch收到的事件 selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象,即acceptor对象 if (r != null) r.run(); } class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch(IOException ex) { /* ... */ } } } } public class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(521); ByteBuffer output = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); //将Handler作为callback对象 sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件 sel.wakeup(); } boolean inputIsComplete() { /* ... */ return false;} boolean outputIsComplete() { /* ... */ return false;} void process() { /* ... */ } public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件 } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key } }
在上面的代码中,主要有三个类,即Reactor、Acceptor与Handler。
Reactor:主要负责事件的监听与分发,即主要通过selector,轮询注册到这个selector上的各个channel有没有准备好的事件(如SelectionKey.OP_ACCEPT),有就将SelectionKey分发下去。在这个过程中,Reactor不对事件进行任何处理,只负责事件的监听与分发,因而相对于阻塞的ServerSocket而言,其能处理更多的连接。
Acceptor:负责处理Reactor分发来的请求,获取SocketChannel,新建Handler并将获取到的SocketChannel传给Handler进行处理。Acceptor不负责读数据,也不负责业务处理,只负责接住Reactor分发来的事件,然后把它交给对应的handler进行处理。
Handler:在上面的代码中,Handler同样将自己注册到Reactor中的selector上,监听并处理READ/WRITE事件,同时负责业务逻辑的处理。
在server中,只有一个Reactor,也只有一个Acceptor,Acceptor每accept到一个SocketChannel后,就新建一个Handler来处理。
改进版本的server结构图:
Selector[] selectors; //subReactors集合, 一个selector代表一个subReactor int next = 0; class Acceptor { public synchronized void run() { Socket connection = serverSocket.accept(); //主selector负责accept if (connection != null) new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection if (++next == selectors.length) next = 0; } }