zoukankan      html  css  js  c++  java
  • xsocket分析二,接收连接、收发数据

    接着上一篇分析,Acceptor阻塞在accept函数中

    SocketChannel channel = serverChannel.accept();//等待新的连接
    
    // create IoSocketHandler
    IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();//获取Dispatcher
    IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel,sslContext, sslOn);
    
    // notify call back
    callback.onConnectionAccepted(ioHandler);
    acceptedConnections++;  

    有新的连接时进入到下面三行函数,首先从DispatcherPool中获取一个Dispatcher,采用轮询的方式,然后构造一个IoChainableHandler(可以成链)对象,返回的是IoSocketHandler类的一个实例(非常重要),IoSocketHandler类继承自IoChainableHandler,封装了channel和Dispatcher以及一些当前连接的属性。然后回调callback的onConnectionAccepted,这个callback就是前面的那个LifeCycleHandler,该回调函数中有两行非常重要的代码

    NonBlockingConnection connection = new NonBlockingConnection(connectionManager, handlerAdapter.getConnectionInstance()));
    init(connection, ioHandler);

    这里出现了一个connectionManager,对连接进行管理,先不管,然后是一个handlerAdapter,它封装了一个handlerInfo,就是对我们传入的EchoHandler进行解析适配,根据这两个参数构造出一个NonBlockingConnection。这个类是对连接的抽象(非常重要),后面会看到它就是传给我们的Handler的参数。NonBlockingConnection有几个很重要的成员变量如IoHandlerCallback,SerializedTaskQueue等后面会讲。

    然后是init函数,参数ioHandler就是前面构造的IoChainableHandler,在init函数中将NonBlockingConnection的IoHandlerCallback(后面讲)赋值给了handlerAdapter的IIoHandlerCallback,然后调用了Dispatcher的register方法,将channel注册到Dispatcher的Selector中。

    public boolean register(IoSocketHandler socketHandler, int ops) throws IOException {
        socketHandler.setMemoryManager(memoryManager);
    
        if (isDispatcherInstanceThread()) {
            registerHandlerNow(socketHandler, ops);//判断是否在dispatcher线程中,在则直接注册
        } else {
        }
        registerQueue.add(new RegisterTask(socketHandler, ops));//不在加入到队列中,同时将IoSocketHandler传给了RegisterTask
        wakeUp();
        }
        return true;
    }

     注意这里并不是直接注册,而是将注册任务加入到了Dispatcher的registerQueue中,因为线程安全的问题。同时将IoSocketHandler对象传给了RegisterTask的构造函数,RegisterTask实现了Runnable,在run方法中调用了

    socketHandler.getChannel().register(selector, ops, socketHandler);                
    socketHandler.onRegisteredEvent();

     可以看到socketHandler作为了key attachment,ops为读操作。

    之后在Dispatcher的run方法中从registerQueue中取出registerTask执行run方法完成注册,可想而知这里的registerQueue是线程安全的ConcurrentLinkedQueue。

    这样就完成了新连接的注册以及读事件的监听,这里已经可以看出IoSocketHandler和NonBlockingConnection的重要性。

    从客户端发出一条字符串信息,Dispatcher run方法里的selector.select就会检测到读事件,调用handleReadWriteKeys()

        private void handleReadWriteKeys() {
            Set<SelectionKey> selectedEventKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedEventKeys.iterator();
            // handle read & write
            while (it.hasNext()) {
                try {
                    SelectionKey eventKey = it.next();
                    it.remove();//需要移除这个key
                    IoSocketHandler socketHandler = (IoSocketHandler) eventKey.attachment();//取出IoSocketHandler
                    try {
                        // read data
                        if (eventKey.isValid() && eventKey.isReadable()) {
                            onReadableEvent(socketHandler);//读
                        }
                        // write data
                        if (eventKey.isValid() && eventKey.isWritable()) {
                            onWriteableEvent(socketHandler);//写
                        }
                    } catch (Exception e) {
                        socketHandler.close(e);
                    }
                } catch (Exception e) {
                    // eat and log exception
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("error occured by handling selection keys + " + e.toString());
                    }
                }
            }
        }

     Dispatcher的onReadableEvent()调用了IoSocketHandler的onReadableEvent

        long onReadableEvent() throws IOException {long read = 0;
            // read data from socket
            ByteBuffer[] received  = readSocket();//读取数据,里面涉及到ByteBuffer的内存管理后面再讲
            // handle the data
            if (received != null) {
                int size = 0;
                for (ByteBuffer byteBuffer : received) {
                    size += byteBuffer.remaining();
                }
                read += size;//作为统计
                getPreviousCallback().onData(received, size);//调用回调函数
                getPreviousCallback().onPostData();//回调
            }
            // increase preallocated read memory if not sufficient
            checkPreallocatedReadMemory();
            return read;
        }

     这里的getPreviousCallback()即返回了之前NonBlockingConnection传的IoHandlerCallback,它是NonBlockingConnection的一个内部类,它的方法直接调用了NonBlockingConnection相应的方法,这个内部类可以看做只是提供了闭包的作用,甚至可以将NonBlockingConnection直接传给IoSocketHandler的。

    首先是onData方法,将received 对象加入到了NonBlockingConnection的readQueue中(重要,后面会从此取数据)。然后是onPostData,它调用了

    private void onPostData() {
            /**
             * This method will be called (by IoSocketHandler) after the onData method 
             * is called
             */
            //这里获取的就是之前的handlerAdapter,同时将NonBlockingConnection本身以及 NonBlockingConnection的taskQueue,workerpool传给了handlerAdapter
            handlerAdapterRef.get().onData(NonBlockingConnection.this, taskQueue, workerpool, false, false);
    }

     HandlerAdapter的onData函数调用了NonblockingConnection的taskQueue的如下方法

    taskQueue.performMultiThreaded(new PerformOnDataTask(connection, taskQueue, ignoreException, (IDataHandler) handler), workerpool);

    这里的taskQueue就是之前提到的SerializedTaskQueue,它封装了一个Runnable 链表保存任务,一个执行线程执行链表中的任务。上面的方法将新创建的PerFormOnDataTask加入到了链表,并使用workpool启动了执行线程,这样就执行了PerformOnData的run方法。之前的操作都是在Dispatcher线程中,到此Dispatcher线程重新回到循环中等待新的事件。也就是每次我们的逻辑代码都是用的独立的线程执行的,PerformOnDataTask的run方法调用了performOnData函数

        private static void performOnData(INonBlockingConnection connection, SerializedTaskQueue taskQueue, boolean ignoreException, IDataHandler handler) 
            try { 
                // loop until readQueue is empty or nor processing has been done (readQueue has not been modified)
                while ((connection.available() != 0) && !connection.isReceivingSuspended()) {    
                    if (connection.getHandler() != handler) {
                        if (LOG.isLoggable(Level.FINE)) {
                            LOG.fine("[" + connection.getId() + "] handler " + " replaced by " + connection.getHandler() + ". stop handling data for old handler");
                        }
                        return;
                    }
                    handler.onData(connection);
                }...省略了部分代码

     调用了handler的onData方法,这个handler就是我们的上一篇文章中的EchoHandler,在onData函数中我们调用了

    String data = nbc.readStringByDelimiter("
    "); //这里的nbc就是上面的connection

     这里的readStringByDelimiter调用了readByteBufferByDelimiter

    ByteBuffer[] buffers = readQueue.readByteBufferByDelimiter(delimiter.getBytes(encoding), maxLength);

     这里可以看到从Connection的readQueue中取数据,到此接收读取数据就完成了。(这里有tcp分包的处理,后续再讲)

    再看写数据,在EchoHandler里读完数据后我们调用了下面的代码写数据

    nbc.write(data + "
    ");

    其调用了如下方法

    public int write(String message, String encoding) throws IOException, BufferOverflowException, ClosedChannelException {
            ensureStreamIsOpenAndWritable();
            
            ByteBuffer buffer = DataConverter.toByteBuffer(message, encoding);//将字符串转换为ByteBuffer,encoding为默认的utf-8
            int written = buffer.remaining();//数据字节数
    
            writeQueue.append(buffer);//加入到写队列中
            onWriteDataInserted();//调用了synchronWriter.syncWrite(recoveryBuffer)进行同步写,当数据写完才会返回
    return written;
    }

    onWriteDataInserted()函数将数据从wirteQueue取出加入到IoSocketHandler的sendQueue中,同时注册写事件,写方法被阻塞等待写完成,通过synchronWrite.this.wait(remainingTime)实现,当然也可能配置为非阻塞写。

    dispatcher.addKeyUpdateTask(setWriteSelectionKeyTask);//这里也是将task加入到了Dispatcher的并发队列keyUpdateQueue中

    之后Dispatcher更新键集,触发写事件

    onWriteableEvent(socketHandler);

     最终调用

        private void writeSocket() throws IOException {
            if (isOpen()) {
                IWriteTask writeTask = null;
                // does former task exists?
                if (pendingWriteTask != null) {
                    writeTask = pendingWriteTask;
                    pendingWriteTask = null;// no, create a new one
                } else {
                    try {
                        writeTask = TaskFactory.newTask(sendQueue, soSendBufferSize);//创建写任务
                    } catch (Throwable t) {
                        throw ConnectionUtils.toIOException(t);
                    }
                }
                // perform write task
                IWriteResult result = writeTask.write(this); //从sendQueue中取出数据写到channel里,这里并没有新建线程,客户端socket这步就接收到了消息        
                // is write task not complete?
                if (result.isAllWritten()) {
                    sendQueue.removeLeased();
                    writeTask.release();
                    result.notifyWriteCallback();//唤醒前面等待写完的线程,也即工作线程返回
                } else {
                    pendingWriteTask = writeTask;
                } 
            } 

     这样就完成了写,之后取消掉写操作

    boolean isKeyUpdated = dispatcher.unsetWriteSelectionKeyNow(this);//this即当前IoSocketHandler

     这样就完成了一次读写操作。

  • 相关阅读:
    Linux内核之 I/O多路复用
    Linux内核之 页高速缓存与页回写
    Linux内核之 块I/O层及I/O调度
    Linux内核之 文件I/O
    C++雾中风景15:聊聊让人抓狂的Name Mangling
    音频数据增强及python实现
    深度学习中“过拟合”的产生原因和解决方法
    资料-资源(不定期更新)
    论文翻译:2020_Acoustic Echo Cancellation Challenge Datasets And Testingframework
    语音信号处理概念
  • 原文地址:https://www.cnblogs.com/coderway/p/4220930.html
Copyright © 2011-2022 走看看