zoukankan      html  css  js  c++  java
  • workerGroup注册NioSocketChannel

    workerGroup注册NioSocketChannel

     

    书接上文,ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)方法将NioSocketChannel注册到workerGroup。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
                setAttributes(child, childAttrs);
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }

    注册的是NioSocketChannel类型

     

    而bossGroup注册的是NioServerSocketChannel

     此参数是在NioServerSocketChannel的doReadMessages(List<Object> buf)方法中初始化。

     childGroup.register(child)内部实现和bossGroup一样,也是从workerGroup中选一个NioEventLoop进行注册。

    可看到workerGroup的线程已开启,2-1为bossGroup,3-1为workerGroup

    然后就是workerGroup的线程执行注册任务,后续操作和bossGroup类似。注册选择器,执行pipeline.invokeHandlerAddedIfNeeded();方法,执行我们自定义的childHandler的handlerAdded方法,执行ChannelInitializer的initChannel方法,将我们自定义的处理器添加进去。然后执行相应的handlerAdded,channelRegistered,channelActive方法。完成任务之后,就进入select阻塞了。

     读取消息

    跟之前一样,接收到消息,调用processSelectedKeys等后续方法

    public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();//字节缓冲区分配器
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);//传递事件
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }

    doReadBytes(byteBuf)

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取分配处理器
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());//设置可写的字节
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());//读取通道的数据,写入字节缓冲区
        }

    writeBytes(ScatteringByteChannel in, int length) :向缓冲区写入数据

    public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
            ensureWritable(length);
            int writtenBytes = setBytes(writerIndex, in, length);
            if (writtenBytes > 0) {
                writerIndex += writtenBytes;
            }
            return writtenBytes;
        }

    PooledByteBuf.setBytes(int index, ScatteringByteChannel in, int length)

    in.read:通道从底层去读取socket缓冲区的数据到字节缓冲区里:

    public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
            try {
                return in.read(internalNioBuffer(index, length));
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }

    PooledByteBuf.internalNioBuffer(int index, int length):返回ByteBuffer ,底层封装了ByteBuffer

    public final ByteBuffer internalNioBuffer(int index, int length) {
            checkIndex(index, length);
            return _internalNioBuffer(index, length, false);
        }

    _internalNioBuffer(int index, int length, boolean duplicate)

    final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
            index = idx(index);
            ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
            buffer.limit(index + length).position(index);
            return buffer;
        }

    直接缓冲区DirectByteBuffer,少了一次从内核到用户空间的数据拷贝

     pipeline.fireChannelRead(byteBuf);

    把缓冲区数据传递到管道里,让处理器处理,传递到我自定义的处理器中,读取出来:

     客户端:

  • 相关阅读:
    JavaScript数组方法--includes、indexOf、lastIndexOf
    JavaScript数组方法--flat、forEach、map
    JavaScript数组方法--filter、find、findIndex
    bootstrap-14
    bootstrap-13
    bootstrap-12
    bootstrap-11
    bootstrap-10
    bootstrap-9
    bootstrap-8
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15387085.html
Copyright © 2011-2022 走看看