zoukankan      html  css  js  c++  java
  • Netty源码—四、事件处理

    前面经过channel初始化、注册,所需要的数据结构(epoll_event)基本上准备好了,serverSocket也处于监听状态,可以接收来自客户端的请求了。NioServerSocketChannel注册在了NioEventLoop#selector,在注册过程中启动了NioEventLoop,run方法会循环执行,每次循环都会执行select和执行所有的task。如果select有事件,则会处理收到的事件。

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            // 是否使用优化过的selectionKey
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    前面在NioEventLoop初始化的时候说过关于selectionKey优化的问题,这里不再赘述。两种方式主要是遍历selectionKey的方式不同,具体处理事件的调用是一样的。这里以processSelectedKeysOptimized为例。

    accept

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // channel是NioServerSocketChannel
        // unsafe是NioMessageUnsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // 省略中间代码...
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 调用NioMessageUnsafe.read
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
    
    public void read() {
        // 省略中间代码...
                    // 由于是ServerSocket,只负责accept,如果有IO事件说明就是有新的客户端连接,所以这里就是创建NioSocketChannel
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
    
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
    
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                // 注册刚刚创建的NioSocketChannel
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            // 省略中间代码...
        }
    }
    
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 调用java.nio.channels.ServerSocketChannel#accept来创建SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());
    
        try {
            if (ch != null) {
                // 创建NioSocketChannel
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            // 省略中间代码...
        }
    
        return 0;
    }
    

    上面创建了NioSocketChannel之后,接下来注册所有客户端连接的NioSocketChannel,调用的是DefaultChannelPipeline#fireChannelRead方法,接下来是执行pipeline中的handler,在初始化的时候添加了LoggingHandler (如果启动的时候配置了的话),那么目前pipeline中的handler有

    • io.netty.channel.DefaultChannelPipeline$HeadContext:pipeline创建的时候默认的第一个handler
    • io.netty.handler.logging.LoggingHandler:启动的时候用户配置的handler
    • io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
    • io.netty.channel.DefaultChannelPipeline$TailContext:pipeline创建的时候默认的最后一个handler

    下面看下ServerBootstrap$ServerBootstrapAcceptor是什么时候添加到handler的

    // io.netty.bootstrap.ServerBootstrap#init
    // 这个方法是NioServerSocketChannel初始化的时候调用的
    void init(Channel channel) throws Exception {
        // 省略中间代码...
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 在pipeline中添加ServerBootstrapAcceptor
                        pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    

    之所以说ServerBootstrapAcceptor,是因为NioSocketChannel的register过程是这个handler的channelRead方法开始的

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
    
        // 配置NioSocketChannel
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
            // 这里childGroup就是一开始我们配置的workerGroup
            // 所以调用的是io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    接下来的注册过程和NioServerSocketChannel的注册过程是类似的,创建socket,创建SelectionKeyImpl等。只不过NioSocketChannel不监听accept事件。

    read

    上面在接收到来自客户端的连接请求后,将NioSocketChannel注册到selector上,这个selector也是在NioEventLoop里面的,后面和这个客户端的通信都会通过这个channel进行,如果客户端发送来数据,也是selector收到读事件通知,然后调用processSelectedKey来处理read事件。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // channel是NioSocketChannel
        // unsafe是NioSocketChannelUnsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // 省略中间代码...
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 调用NioByteUnsafe.read
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
    
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        // PooledByteBufAllocator,默认的内存申请管理器
        final ByteBufAllocator allocator = config.getAllocator();
        // AdaptiveRecvByteBufAllocator$HandleImpl
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
    
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 申请内存
                byteBuf = allocHandle.allocate(allocator);
                // 读取数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
    
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 执行pipeline中的handler
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
    		// 省略中间代码
        }
    }
    

    一般来说NioSocketChannel中的handler包括

    • io.netty.channel.DefaultChannelPipeline$HeadContext
    • org.lep.test.netty.protocol.custom.codec.NettyMessageDecoder:自定义的解码器
    • org.lep.test.netty.protocol.custom.codec.NettyMessageEncoder:自定义的编码器
    • org.lep.test.netty.protocol.custom.server.LoginAuthRespHandler:自定义的handler
    • org.lep.test.netty.protocol.custom.server.HeartBeatRespHandler:自定义的handler
    • io.netty.channel.DefaultChannelPipeline$TailContext

    netty提供了一些基本的编解码功能,自己可以根据实际需要扩展使用,然后自定义自己的逻辑处理handler。

    上面还涉及到内存的分配部分留在下一节介绍。

    总结

    read事件处理过程:

    1. 接收到read事件
    2. 分配内存,初始化buffer
    3. 调用channel.read将数据读取到buffer中
    4. 执行pipeline中的handler,包括了编解码的handler,自定义的handler来处理数据
  • 相关阅读:
    千百撸
    迅捷PDF转换器(在线)
    request不能接受前端传来的参数的问题
    新买的orico蓝牙usb连接器使用方法与驱动
    win8.1下右下角出现大小写切换状态显示框解决方案
    idea tomcat 热部署方法
    set方法在set传入值时报空指针异常,直接设置定值即可
    eclipse项目转移至IDEA与IDEA tomcat报错(idea自带tomcat版本太高)与war包部署到win服务器与idea提交git的总结
    GNS3 jungle newsfeed 隐藏
    解决 sr 的端口占用问题
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/9369282.html
Copyright © 2011-2022 走看看