接着上一篇分析,Acceptor阻塞在accept函数中
SocketChannel channel = serverChannel.accept();//等待新的连接 // create IoSocketHandler IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();//获取Dispatcher IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel,sslContext, sslOn); // notify call back callback.onConnectionAccepted(ioHandler); acceptedConnections++;
有新的连接时进入到下面三行函数,首先从DispatcherPool中获取一个Dispatcher,采用轮询的方式,然后构造一个IoChainableHandler(可以成链)对象,返回的是IoSocketHandler类的一个实例(非常重要),IoSocketHandler类继承自IoChainableHandler,封装了channel和Dispatcher以及一些当前连接的属性。然后回调callback的onConnectionAccepted,这个callback就是前面的那个LifeCycleHandler,该回调函数中有两行非常重要的代码
NonBlockingConnection connection = new NonBlockingConnection(connectionManager, handlerAdapter.getConnectionInstance())); init(connection, ioHandler);
这里出现了一个connectionManager,对连接进行管理,先不管,然后是一个handlerAdapter,它封装了一个handlerInfo,就是对我们传入的EchoHandler进行解析适配,根据这两个参数构造出一个NonBlockingConnection。这个类是对连接的抽象(非常重要),后面会看到它就是传给我们的Handler的参数。NonBlockingConnection有几个很重要的成员变量如IoHandlerCallback,SerializedTaskQueue等后面会讲。
然后是init函数,参数ioHandler就是前面构造的IoChainableHandler,在init函数中将NonBlockingConnection的IoHandlerCallback(后面讲)赋值给了handlerAdapter的IIoHandlerCallback,然后调用了Dispatcher的register方法,将channel注册到Dispatcher的Selector中。
public boolean register(IoSocketHandler socketHandler, int ops) throws IOException { socketHandler.setMemoryManager(memoryManager); if (isDispatcherInstanceThread()) { registerHandlerNow(socketHandler, ops);//判断是否在dispatcher线程中,在则直接注册 } else { } registerQueue.add(new RegisterTask(socketHandler, ops));//不在加入到队列中,同时将IoSocketHandler传给了RegisterTask wakeUp(); } return true; }
注意这里并不是直接注册,而是将注册任务加入到了Dispatcher的registerQueue中,因为线程安全的问题。同时将IoSocketHandler对象传给了RegisterTask的构造函数,RegisterTask实现了Runnable,在run方法中调用了
socketHandler.getChannel().register(selector, ops, socketHandler);
socketHandler.onRegisteredEvent();
可以看到socketHandler作为了key attachment,ops为读操作。
之后在Dispatcher的run方法中从registerQueue中取出registerTask执行run方法完成注册,可想而知这里的registerQueue是线程安全的ConcurrentLinkedQueue。
这样就完成了新连接的注册以及读事件的监听,这里已经可以看出IoSocketHandler和NonBlockingConnection的重要性。
从客户端发出一条字符串信息,Dispatcher run方法里的selector.select就会检测到读事件,调用handleReadWriteKeys()
private void handleReadWriteKeys() { Set<SelectionKey> selectedEventKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedEventKeys.iterator(); // handle read & write while (it.hasNext()) { try { SelectionKey eventKey = it.next(); it.remove();//需要移除这个key IoSocketHandler socketHandler = (IoSocketHandler) eventKey.attachment();//取出IoSocketHandler try { // read data if (eventKey.isValid() && eventKey.isReadable()) { onReadableEvent(socketHandler);//读 } // write data if (eventKey.isValid() && eventKey.isWritable()) { onWriteableEvent(socketHandler);//写 } } catch (Exception e) { socketHandler.close(e); } } catch (Exception e) { // eat and log exception if (LOG.isLoggable(Level.FINE)) { LOG.fine("error occured by handling selection keys + " + e.toString()); } } } }
Dispatcher的onReadableEvent()调用了IoSocketHandler的onReadableEvent
long onReadableEvent() throws IOException {long read = 0; // read data from socket ByteBuffer[] received = readSocket();//读取数据,里面涉及到ByteBuffer的内存管理后面再讲 // handle the data if (received != null) { int size = 0; for (ByteBuffer byteBuffer : received) { size += byteBuffer.remaining(); } read += size;//作为统计 getPreviousCallback().onData(received, size);//调用回调函数 getPreviousCallback().onPostData();//回调 } // increase preallocated read memory if not sufficient checkPreallocatedReadMemory(); return read; }
这里的getPreviousCallback()即返回了之前NonBlockingConnection传的IoHandlerCallback,它是NonBlockingConnection的一个内部类,它的方法直接调用了NonBlockingConnection相应的方法,这个内部类可以看做只是提供了闭包的作用,甚至可以将NonBlockingConnection直接传给IoSocketHandler的。
首先是onData方法,将received 对象加入到了NonBlockingConnection的readQueue中(重要,后面会从此取数据)。然后是onPostData,它调用了
private void onPostData() { /** * This method will be called (by IoSocketHandler) after the onData method * is called */
//这里获取的就是之前的handlerAdapter,同时将NonBlockingConnection本身以及 NonBlockingConnection的taskQueue,workerpool传给了handlerAdapter
handlerAdapterRef.get().onData(NonBlockingConnection.this, taskQueue, workerpool, false, false); }
HandlerAdapter的onData函数调用了NonblockingConnection的taskQueue的如下方法
taskQueue.performMultiThreaded(new PerformOnDataTask(connection, taskQueue, ignoreException, (IDataHandler) handler), workerpool);
这里的taskQueue就是之前提到的SerializedTaskQueue,它封装了一个Runnable 链表保存任务,一个执行线程执行链表中的任务。上面的方法将新创建的PerFormOnDataTask加入到了链表,并使用workpool启动了执行线程,这样就执行了PerformOnData的run方法。之前的操作都是在Dispatcher线程中,到此Dispatcher线程重新回到循环中等待新的事件。也就是每次我们的逻辑代码都是用的独立的线程执行的,PerformOnDataTask的run方法调用了performOnData函数
private static void performOnData(INonBlockingConnection connection, SerializedTaskQueue taskQueue, boolean ignoreException, IDataHandler handler) try { // loop until readQueue is empty or nor processing has been done (readQueue has not been modified) while ((connection.available() != 0) && !connection.isReceivingSuspended()) { if (connection.getHandler() != handler) { if (LOG.isLoggable(Level.FINE)) { LOG.fine("[" + connection.getId() + "] handler " + " replaced by " + connection.getHandler() + ". stop handling data for old handler"); } return; } handler.onData(connection); }...省略了部分代码
调用了handler的onData方法,这个handler就是我们的上一篇文章中的EchoHandler,在onData函数中我们调用了
String data = nbc.readStringByDelimiter(" "); //这里的nbc就是上面的connection
这里的readStringByDelimiter调用了readByteBufferByDelimiter
ByteBuffer[] buffers = readQueue.readByteBufferByDelimiter(delimiter.getBytes(encoding), maxLength);
这里可以看到从Connection的readQueue中取数据,到此接收读取数据就完成了。(这里有tcp分包的处理,后续再讲)
再看写数据,在EchoHandler里读完数据后我们调用了下面的代码写数据
nbc.write(data + " ");
其调用了如下方法
public int write(String message, String encoding) throws IOException, BufferOverflowException, ClosedChannelException { ensureStreamIsOpenAndWritable(); ByteBuffer buffer = DataConverter.toByteBuffer(message, encoding);//将字符串转换为ByteBuffer,encoding为默认的utf-8 int written = buffer.remaining();//数据字节数 writeQueue.append(buffer);//加入到写队列中 onWriteDataInserted();//调用了synchronWriter.syncWrite(recoveryBuffer)进行同步写,当数据写完才会返回
return written;
}
onWriteDataInserted()函数将数据从wirteQueue取出加入到IoSocketHandler的sendQueue中,同时注册写事件,写方法被阻塞等待写完成,通过synchronWrite.this.wait(remainingTime)实现,当然也可能配置为非阻塞写。
dispatcher.addKeyUpdateTask(setWriteSelectionKeyTask);//这里也是将task加入到了Dispatcher的并发队列keyUpdateQueue中
之后Dispatcher更新键集,触发写事件
onWriteableEvent(socketHandler);
最终调用
private void writeSocket() throws IOException { if (isOpen()) { IWriteTask writeTask = null; // does former task exists? if (pendingWriteTask != null) { writeTask = pendingWriteTask; pendingWriteTask = null;// no, create a new one } else { try { writeTask = TaskFactory.newTask(sendQueue, soSendBufferSize);//创建写任务 } catch (Throwable t) { throw ConnectionUtils.toIOException(t); } } // perform write task IWriteResult result = writeTask.write(this); //从sendQueue中取出数据写到channel里,这里并没有新建线程,客户端socket这步就接收到了消息 // is write task not complete? if (result.isAllWritten()) { sendQueue.removeLeased(); writeTask.release(); result.notifyWriteCallback();//唤醒前面等待写完的线程,也即工作线程返回 } else { pendingWriteTask = writeTask; } }
这样就完成了写,之后取消掉写操作
boolean isKeyUpdated = dispatcher.unsetWriteSelectionKeyNow(this);//this即当前IoSocketHandler
这样就完成了一次读写操作。