zoukankan      html  css  js  c++  java
  • netty源码分析之二:accept请求

    我在前面说过了server的启动,差不多可以看到netty nio主要的东西包括了:nioEventLoop,nioMessageUnsafe,channelPipeline,channelHandler等。比较绕的就是handler的顺序:head---->tail,这个也是netty链式管理复杂的地方。这里再说下accept如何接受客户端的connect请求(connect请求跟server启动分析差不多,我就不说了)。

    入口就在nioEventLoop中:

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    NioUnsafe unsafe = ch.unsafe();
    if(!k.isValid()) {
    unsafe.close(unsafe.voidPromise());
    } else {
    try {
    int ignored = k.readyOps();
    if((ignored & 17) != 0 || ignored == 0) {
    unsafe.read();
    if(!ch.isOpen()) {
    return;
    }
    }

    if((ignored & 4) != 0) {
    ch.unsafe().forceFlush();
    }

    if((ignored & 8) != 0) {
    int ops = k.interestOps();
    ops &= -9;
    k.interestOps(ops);
    unsafe.finishConnect();
    }
    } catch (CancelledKeyException var5) {
    unsafe.close(unsafe.voidPromise());
    }

    }
    }
    当OP_ACCEPT事件到达时,就会执行nioMessageUnsafe.read()方法。
    一直走下去,最终会执行NioServerSocketChannel中的doReadMessages()方法。
    protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = this.javaChannel().accept();

    try {
    if(ch != null) {
           // 这里就创建了nioScoketChannel,创建过程类似NioServerSocketChannel,并最终添加到buf list中。
    buf.add(new NioSocketChannel(this, ch));
    return 1;
    }
    } catch (Throwable var6) {
    logger.warn("Failed to create a new channel from an accepted socket.", var6);

    try {
    ch.close();
    } catch (Throwable var5) {
    logger.warn("Failed to close a socket.", var5);
    }
    }

    return 0;
    }

    我截取一段代码看看添加socketChannel到list集合中发生了什么?
    try {
    int size;
    try {
    do {
    size = AbstractNioMessageChannel.this.doReadMessages(this.readBuf);
    if(size == 0) {
    break;
    }

    if(size < 0) {
    closed = true;
    break;
    }
    } while(config.isAutoRead() && this.readBuf.size() < maxMessagesPerRead);
    } catch (Throwable var11) {
    exception = var11;
    }

    AbstractNioMessageChannel.this.setReadPending(false);
    size = this.readBuf.size();
    int i = 0;

    while(true) {
    if(i >= size) {
    this.readBuf.clear();
    pipeline.fireChannelReadComplete();
    if(exception != null) {
    if(exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
    }

    pipeline.fireExceptionCaught(exception);
    }

    if(closed && AbstractNioMessageChannel.this.isOpen()) {
    this.close(this.voidPromise());
    }
    break;
    }

         // 看到这个方法了吧,这是一个inbound事件,当前pipeline的执行顺序是:head---->serverBootstrapAcceptor---->tail
         // 先看看serverBootstrapAcceptor跳转到1
    pipeline.fireChannelRead(this.readBuf.get(i));
    ++i;
    }
    } finally {
    if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) {
    this.removeReadOp();
    }

    }

    1.这个channelRead是serverBootstrapAcceptor执行的channelRead()
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel)msg;
    // 这个地方添加了childHandler,还记得server启动时指定的childHannel吧?
    child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
    Entry[] t = this.childOptions;
    int len$ = t.length;

    int i$;
    Entry e;
    for(i$ = 0; i$ < len$; ++i$) {
    e = t[i$];

    try {
    if(!child.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
    ServerBootstrap.logger.warn("Unknown channel option: " + e);
    }
    } catch (Throwable var10) {
    ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10);
    }
    }

    t = this.childAttrs;
    len$ = t.length;

    for(i$ = 0; i$ < len$; ++i$) {
    e = t[i$];
    child.attr((AttributeKey)e.getKey()).set(e.getValue());
    }

    try {
    // 进行注册,注意下这里注册的是childGroup,终于worker线程池开始启动了,且正在执行register任务,跳转到2
    this.childGroup.register(child).addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
    if(!future.isSuccess()) {
    ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());
    }

    }
    });
    } catch (Throwable var9) {
    forceClose(child, var9);
    }

    }

    2.AbstractChannel中的register0()方法
    这个方法跟之前分析server的register方法一致,我说下大概思路:
    2.1 将socketChannel注册到selector中;
    2.2 通过触发pipeline.fireChannelRegistered();方法将之前main函数中设置的处理器(还记得上一篇netty源码分析之一中唯一的图片吧,就是那个childHandler,本例中就是MessageRecvChannelInitializer这个handler添加到管道中,并将channelInitializer移除)
    2.3 通过调用channelActive,ops设置为READ事件。
    2.4 worker线程开始监听我们的read事件了。
    2.5 boss线程继续执行OP_ACCEPT事件
    private void register0(ChannelPromise promise) {
    try {
    if(!promise.setUncancellable() || !this.ensureOpen(promise)) {
    return;
    }

    boolean t = this.neverRegistered;
    AbstractChannel.this.doRegister();
    this.neverRegistered = false;
    AbstractChannel.this.registered = true;
    this.safeSetSuccess(promise);
    AbstractChannel.this.pipeline.fireChannelRegistered();
    if(t && AbstractChannel.this.isActive()) {
    AbstractChannel.this.pipeline.fireChannelActive();
    }
    } catch (Throwable var3) {
    this.closeForcibly();
    AbstractChannel.this.closeFuture.setClosed();
    this.safeSetFailure(promise, var3);
    }

    }

    一个完整的ACCEPT事件执行完毕!
  • 相关阅读:
    HDU2149-Public Sale
    分页和多条件查询功能
    hdu 4691 最长的共同前缀 后缀数组 +lcp+rmq
    BZOJ 2588 Count on a tree (COT) 是持久的段树
    windows 设置脚本IP
    hdu 4912 Paths on the tree(树链拆分+贪婪)
    分散式-ubuntu12.04安装hadoop1.2.1
    struts详细解释拦截器
    Codeforces 459E Pashmak and Graph(dp+贪婪)
    C#中的数据格式转换 (未完待更新)
  • 原文地址:https://www.cnblogs.com/cr1719/p/6360201.html
Copyright © 2011-2022 走看看