EndPoint提供基础的网络IO服务,用来实现网络连接和控制,它是服务器对外I/O操作的接入点。主要任务是管理对外的socket连接,同时将建立好的socket连接交到合适的工作线程中去。
里面两个主要的属性类是Acceptor和Poller、SocketProcessor
Acceptor
Acceptor类实现了Runnable接口,主要用于接收网络请求,建立连接,连接建立之后,将一个SocketChannel对象包装成一个NioChannel,并注册到Poller中。由Poller来负责执行数据的读取和业务执行。
我们看一下Acceptor的run方法:
public void run() { SocketChannel socket = serverSock.accept();//从监听的serversocket中获取新的连接 setSocketOptions(socket);//设置通道的属性 …… } protected boolean setSocketOptions(SocketChannel socket) { NioChannel = channel = new NioChannel(socket, bufhandler);//将通道包装成NioChannel getPoller0().register(channel);//从poller数组中选择一个poller,将channel注册到poller中 …… }
Poller
Poller实现了Runnable接口,在NioEndpoint的时候,会初始化pollers数组,同时启动pollers数组中的线程,让pollers开始工作。
封装后socketchannel的放入Poller线程内部维护的一个PollerEvent队列中,然后在Poller线程运行时处理队列,将socketchannel注册到这个Poller的Selector上。
当事件到来的时候,Selector发现要处理的事件,通过selector.select系列方法来获取数据,然后经由processKey到processSocket方法,封装成一个SocketProcessor对象后,放在EndPoint的线程池中执行。
SocketChannel是如何注册到Poller中的?
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();//内部维护的事件队列 public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r);//socketchannel、key一同加入到了events队列 }
SocketChannel是如何注册到每个Poller的selector中的?答案在event()方法中,在该方法中遍历events队列,依次执行run方法
public boolean events() { while ( (r = (Runnable)events.poll()) != null ) { result = true; r.run(); } …… } public void run() { if ( interestOps == OP_REGISTER ) { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);//注册到selector中 } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); att.setCometOps(ops);//另外一种注册方法? }}
……
Poller的执行在其run方法中,主要是将请求封装成SocketProcessor对象,交给线程池处理。
public void run() { if ( keyCount == 0 ) hasEvents = (hasEvents | events());//通过事件机制监控感兴趣的网络事件,见上面的events()分析 //遍历渠道上到来的key,交给processor去处理 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; for() { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); processKey(sk, attachment); } }
//processKey(sk, attachment)调用processSocket(channel, SocketStatus.OPEN),最终使用Endpoint的线程池执行请求
protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); attachment.setCometNotify(false); //will get reset upon next reg sc = new SocketProcessor(socket,status); if ( dispatch ) executor.execute(sc);=====>任务被封装成SocketProcessor对象,在成功获取线程池后,则通过线程池来进行socket数据数据的读写操作。 else sc.run(); …… }
SocketProcessor
请求到达socketProcess之后,首先执行其run方法,请求被转移到handler.process,根据上下文,我们知道这了的hander指的是Http11ConnectionHandler
public void run() { (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) : (handler.event(socket,status)==Handler.SocketState.CLOSED); }
参考文献:
http://blog.csdn.net/yanlinwang/