zoukankan      html  css  js  c++  java
  • Netty源码解析 服务端启动过程

    本文通过阅读Netty源码,解析Netty服务端启动过程。
    源码分析基于Netty 4.1

    Netty是一个高性能的网络通信框架,支持NIO,OIO等多种IO模式。通常,我们都是使用NIO模式,该系列文章也是解析Netty下NIO模式的实现。
    首先,看一个NIO网络通信示意图

    Netty中NIO网络通信过程在此基础上实现,下面来看一下具体实现。

    Channel

    首先,看一下Netty中的通道Channel,它代表了一个能完成IO操作的通道,提供read, write, connect, bind等方法。
    Channel中维护了一个Unsafe对象,用于完成数据传输操作(这类操作通常由IO事件触发,而不是用户触发)。

    SocketChannel代表Socket连接的网络通道,面向流,支持读写操作。
    ServerChannel表示可以监听新连接的通道,ServerSocketChannel代表实现TCP/IP协议的ServerChannel。

    AbstractChannel提供基础逻辑实现,它维护了Unsafe和ChannelPipeline对象,并委托这两个对象完成实际工作。同时,它也提供newUnsafe,newChannelPipeline方法给子类构造他们需要的对象。
    AbstractUnsafe是AbstractChannel的内部类,实现了register,bind,disconnect等方法的基础逻辑。

    ChannelPipeline可以理解为拦截器链表,维护了一个ChannelHandler链表,ChannelHandler即具体拦截器,负责逻辑处理。
    DefaultChannelPipeline是ChannelPipeline接口的默认实现。Netty中Nio相关的Channel都使用它。
    可以这样理解,Unsafe负责数据传输,而ChannelPipeline负责逻辑处理。

    AbstractNioChannel实现了NIO基础逻辑,如维护(jvm)SelectableChannel,(jvm)SelectionKey等对象,还有一个很关键的selectionKey,代表关注的NIO事件。
    AbstractNioUnsafe是AbstractNioChannel内部类,继承于AbstractUnsafe,并实现Unsafe另一个子接口NioUnsafe,添加了SelectableChannel相关的方法,如finishConnect,read。

    AbstractNioChannel的子类可以分成ServerChannel实现类和SocketChannel实现类。

    ServerChannel实现类是AbstractNioMessageChannel,newUnsafe方法返回的NioMessageUnsafe。
    NioServerSocketChannel是AbstractNioMessageChannel子类,实现TCP/IP协议。

    SocketChannel实现类是AbstractNioByteChannel,newUnsafe方法返回的NioByteUnsafe。
    NioSocketChannel是AbstractNioByteChannel子类,实现TCP/IP协议。

    Channel各实现类关系如下

    Netty中将接口划分得很细微,最好大家可以按功能层次理解各接口代表含义以及实现类的​逻辑。​
    以免后续看源码时混淆各接口功能。

    服务端启动

    首先简单了解一下EventLoop,可以理解为它负责处理网络事件和异步任务,后面有对应文章详细解析。
    EventLoopGroup则是一组EventLoop集合,它会将操作委托给其中一个EventLoop处理。

    Netty的服务端启动引导类ServerBootstrap中维护了两个EventLoopGroup,EventLoopGroup#childGroup和AbstractBootstrap#group。
    AbstractBootstrap#group负责管理注册于其上的ServerChannel,处理这些Channel上发生的Accept事件,并将生成的SocketChannel注册到EventLoopGroup#childGroup。
    EventLoopGroup#childGroup处理这些SocketChannel上发生的Read,Write事件。
    为了方便,下文我将AbstractBootstrap#group称为AcceptGroup,ServerBootstrap#childGroup称为ReadGroup。

    这些设计来自Reactor模式,详细可以见java.util.concurrent包的作者Doug Lea的《Scalable IO in Java》

    AbstractBootstrap#bind -> AbstractBootstrap#doBind

    private ChannelFuture doBind(final SocketAddress localAddress) {
    	// #1
    	final ChannelFuture regFuture = initAndRegister();	
    	final Channel channel = regFuture.channel();
    	if (regFuture.cause() != null) {
    		return regFuture;
    	}
    
    	if (regFuture.isDone()) {
    		ChannelPromise promise = channel.newPromise();
    		// #2
    		doBind0(regFuture, channel, localAddress, promise);
    		return promise;
    	} else {
    		...
    	}
    }
    

    #1 初始化及注册ServerChannel。
    initAndRegister方法返回ChannelFuture,ChannelFuture继承了(jvm)Future,代表IO异步处理结果,并且可以绑定回调函数,异步IO处理完成Netty后会触发这些回调函数。
    我们要有这个意识,Netty是一个异步框架,所有的IO操作都是异步的(充分利用cpu),IO方法不会等待实际IO操作完成,而是返回ChannelFuture。
    待实际IO完成后,Netty再触发ChannelFuture中的回调函数处理后续逻辑。
    ChannelPromise是一种特殊的ChannelFuture,提供更新操作结果的方法(setSuccess,setFailure方法),一般提供给IO方法作为参数(Unsafe中很多方法都有该参数),IO操作完成后,会调用这些方法更新操作结果。
    #2 注册完成后,绑定ServerChannel监听端口。

    AbstractBootstrap#initAndRegister

    final ChannelFuture initAndRegister() {
    	Channel channel = null;
    	try {
    		// #1
    		channel = channelFactory.newChannel();
    		// #2
    		init(channel);
    	} catch (Throwable t) {
    		...
    	}
    	// #3
    	ChannelFuture regFuture = config().group().register(channel);
    	
    	// #4
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    	return regFuture;
    }
    

    #1 构造ServerChannel
    AbstractBootstrap#channelFactory是一个ReflectiveChannelFactory对象,他通过反射生成Channel。
    ServerBootstrap#channel方法负责构造ReflectiveChannelFactory,并指定具体的ServerChannel类。
    (所以我们要通过该方法指定NioServerSocketChannel.class -- new ServerBootstrap().channel(NioServerSocketChannel.class)
    #2 初始化ServerChannel,该方法由子类实现
    #3 注册Channel到AcceptGroup,注意,config().group()返回AcceptGroup。
    #4 如果IO操作发生了异常,需要关闭Channel。

    NioServerSocketChannel#构造方法 -> NioServerSocketChannel#newSocket方法

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a server socket.", e);
        }
    }
    

    使用(jvm)SelectorProvider,构造一个(jvm)ServerSocketChannel。
    这里完成了NIO网络通信第一步。

    ServerBootstrap#init

    void init(Channel channel) throws Exception {
    	// #1
    	...
    
    	ChannelPipeline p = channel.pipeline();
    
    	final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    
    	// #2
    	p.addLast(new ChannelInitializer<Channel>() {
    		public void initChannel(final Channel ch) throws Exception {
    			final ChannelPipeline pipeline = ch.pipeline();
    			ChannelHandler handler = config.handler();
    			if (handler != null) {
    				pipeline.addLast(handler);
    			}
    
    			ch.eventLoop().execute(new Runnable() {
    				public void run() {
    					// #3
    					pipeline.addLast(new ServerBootstrapAcceptor(
    							ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));	
    				}
    			});
    		}
    	});
    }
    

    #1 设置ServerChannel的Option和Attribute属性。
    #2 给ServerChannel的ChannelPipeline添加一个ChannelInitializer。
    ChannelInitializer是一种特殊的ChannelHandler,initChannel方法负责完成一些Channel初始化工作,该方法的触发可以参考下文的延迟任务。
    #3 上一步骤的ChannelInitializer负责给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,并将SocketChannel的相关配置(childHandler,currentChildHandler,currentChildOptions,currentChildAttrs)交给它,ServerBootstrapAcceptor用于处理Accept事件,文章后面会解析。

    AbstractBootstrap#initAndRegister方法#3步骤 -> SingleThreadEventLoop#register ->(通过Channel调用Unsafe)AbstractUnsafe#register

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ...
    
        AbstractChannel.this.eventLoop = eventLoop;
    	// #1
        if (eventLoop.inEventLoop()) {
            register0(promise);	
        } else {
            try {
                eventLoop.execute(new Runnable() {	
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                ...
            }
        }
    }
    

    eventLoop.inEventLoop()判断当前线程是否为EventLoop执行线程。
    如果是,直接执行操作 -- 调用register0方法处理。
    否则,提交一个任务给EventLoop。
    这是Netty中提交异步任务的通用格式,Netty中有大量类似代码。
    注意,这里是异步的关键,将当前操作作为一个异步任务,提交给EventLoop处理,而不需要阻塞当前线程。
    EventLoop实际上是一个(jvm)EventExecutor,通过execute方法可以给它任务。

    AbstractUnsafe#register0

    private void register0(ChannelPromise promise) {
    	try {
    		if (!promise.setUncancellable() || !ensureOpen(promise)) {
    			return;
    		}
    		boolean firstRegistration = neverRegistered;
    		// #1
    		doRegister();
    		neverRegistered = false;
    		registered = true;
    
    		// #2
    		pipeline.invokeHandlerAddedIfNeeded();
    		// #3
    		safeSetSuccess(promise);
    		// #4
    		pipeline.fireChannelRegistered();
    		// #5
    		if (isActive()) {
    			if (firstRegistration) {
    				pipeline.fireChannelActive();
    			} else if (config().isAutoRead()) {
    				beginRead();
    			}
    		}
    	} catch (Throwable t) {
    		// #6
    		closeForcibly();
    		closeFuture.setClosed();
    		safeSetFailure(promise, t);
    	}
    }
    

    #1 由子类实现具体注册操作
    #2 执行DefaultChannelPipeline中的延迟任务
    #3 设置promise状态为Success
    #4 触发ChannelPipeline#fireChannelRegistered
    #5 如果是首次注册,触发ChannelPipeline#fireChannelActive
    isActive()方法判断当前Channel是否活跃
    NioSocketChannel中调用SocketChannel#isOpen和SocketChannel#isConnected判断
    NioServerSocketChannel中调用SelectableChannel#isOpen和ServerSocket#isBound方法判断
    #6 异常处理,关闭Channel,设置promise状态为Failure。

    AbstractUnsafe#doRegister -> AbstractNioChannel#doRegister

    protected void doRegister() throws Exception {
    	boolean selected = false;
    	for (;;) {
    		try {
    			// #1
    			selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    			return;
    		} catch (CancelledKeyException e) {
    			...
    		}
    	}
    }
    

    #1 javaChannel()获取(jvm)SelectableChannel,
    eventLoop().unwrappedSelector()获取AcceptGroup维护的Selector(jvm)
    这里将(jvm)ServerSocketChannel注册到(jvm)Selector,但还没有注册关注事件Key。
    从Netty层面看,将Channel注册到EventLoop中。
    注意,这里将当前NioServerSocketChannel作为channle#attachment,后面使用它来判断是否为IO事件。

    AbstractUnsafe#register0方法#5步骤 -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
    这里涉及ChannelPipeline的事件传播,后面解析ChannelPipeline时详细说明。

    HeadContext#channelActive会调用readIfIsAutoRead方法,判断是否开启autoRead,开启则自动触发read事件处理方法。
    HeadContext#readIfIsAutoRead -> DefaultChannelPipeline#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead

    protected void doBeginRead() throws Exception {
    	// #1
    	final SelectionKey selectionKey = this.selectionKey;
    	if (!selectionKey.isValid()) {
    		return;
    	}
    
    	readPending = true;
    
    	final int interestOps = selectionKey.interestOps();
    	// #2
    	if ((interestOps & readInterestOp) == 0) {
    		selectionKey.interestOps(interestOps | readInterestOp);
    	}
    }
    

    #1 selectionKey是Selector中关注事件集合(由AbstractNioChannel#doRegister方法中生成)
    #2 这里注册了关注事件readInterestOp。
    那么readInterestOp的值是什么呢? 它在AbstractNioChannel#构造方法中赋值,真正的值来自NioServerSocketChannel构造方法,可以看到,它在ServerChannel中固定为SelectionKey.OP_ACCEPT。
    到这里,注册ServerChannel的关注事件OP_ACCEPT。
    这里完成NIO网络通信第二步,注册关注事件。

    AbstractBootstrap.doBind0 -> AbstractChannel#bind -> DefaultChannelPipeline#bind -> HeadContext#bind -> AbstractUnsafe#bind -> NioServerSocketChannel#doBind

    protected void doBind(SocketAddress localAddress) throws Exception {
    	if (PlatformDependent.javaVersion() >= 7) {
    		javaChannel().bind(localAddress, config.getBacklog());
    	} else {
    		javaChannel().socket().bind(localAddress, config.getBacklog());
    	}
    }
    

    根据不同JDK版本,调用不同的bind方法。
    这里完成了NIO网络通信第三步,分配套接字地址,开始socket监听。

    Accept事件处理

    下面我们来看一下AcceptGroup中如何处理ServerChannel上监听到的accept事件。

    这里涉及EventLoop的相关内容,后面有对应解析文章。
    现在直接看Accept事件的处理方法NioMessageUnsafe#read

    public void read() {
    	...
    	try {
    		try {
    			do {
    				// #1
    				int localRead = doReadMessages(readBuf);
    				if (localRead == 0) {
    					break;
    				}
    				if (localRead < 0) {
    					closed = true;
    					break;
    				}
    
    				allocHandle.incMessagesRead(localRead);
    				
    			} while (allocHandle.continueReading());
    		} catch (Throwable t) {
    			exception = t;
    		}
    
    		int size = readBuf.size();
    		for (int i = 0; i < size; i ++) {
    			readPending = false;
    			// #2
    			pipeline.fireChannelRead(readBuf.get(i));
    		}
    		readBuf.clear();
    		allocHandle.readComplete();
    		// #3
    		pipeline.fireChannelReadComplete();
    
    		...
    	} ...
    }
    

    #1 调用NioServerSocketChannel#doReadMessages,处理Accept事件。
    注意,readBuf是一个List<Object>,用于接收处理结果。

    allocHandle.continueReading(),判断是否需要继续执行,这里都是返回false
    #2 触发DefaultChannelPipeline#fireChannelRead
    #3 触发DefaultChannelPipeline#fireChannelReadComplete

    NioServerSocketChannel#doReadMessages

    protected int doReadMessages(List<Object> buf) throws Exception {
    	// #1
    	SocketChannel ch = SocketUtils.accept(javaChannel());
    
    	try {
    		if (ch != null) {
    			// #2
    			buf.add(new NioSocketChannel(this, ch));
    			return 1;
    		}
    	} catch (Throwable t) {
    		...
    	}
    
    	return 0;
    }
    

    #1 调用(jvm)ServerSocketChannel#accept方法,生成的(jvm)SocketChannel
    #2 使用(jvm)SocketChannel构造NioSocketChannel

    前面说过,ServerChannel注册到AcceptGroup时,会给ServerChannel的ChannelPipeline添加一个ServerBootstrapAcceptor,用于处理accept事件。
    NioMessageUnsafe#read方法#2步骤 -> DefaultChannelPipeline#fireChannelRead -> ServerBootstrapAcceptor#channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	// #1
    	final Channel child = (Channel) msg;
    
    	child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
        setAttributes(child, childAttrs);
    
    	try {
    		// #2
    		childGroup.register(child).addListener(new ChannelFutureListener() {
    			public void operationComplete(ChannelFuture future) throws Exception {
    				if (!future.isSuccess()) {
    					forceClose(child, future.cause());
    				}
    			}
    		});
    	} catch (Throwable t) {
    		forceClose(child, t);
    	}
    }
    

    #1 注意msg参数,就是NioServerSocketChannel#doReadMessages方法中生成的NioSocketChannel。
    上面说了,ServerBootstrap#init方法中会将ServerBootstrap中SocketChannel相关配置交给ServerBootstrapAcceptor。
    这里配置NioSocketChannel的Options,Attribute,并将childHandler添加给pipeline。
    #2 将NioSocketChannel注册到ReadGroup中,注册过程类似于NioServerSocketChannel注册到AcceptGroup,调用AbstractUnsafe#register方法实现。
    但有一点不同,调用AbstractNioChannel#doBeginRead方法注册关注事件时,关注事件(即AbstractNioChannel#readInterestOp),是来自子类AbstractNioByteChannel#构造方法,固定为SelectionKey.OP_READ。
    到这里,(jvm)SocketChannel已经注册到ReadGroupo维护中(jvm)Selector,关注的事件Key为read。

    延迟任务

    前面说了,ServerBootstrap#init方法#2步骤中ChannelInitializer#initChannel方法由延迟任务触发。现在看一下延迟任务的实现。

    添加延迟任务
    DefaultChannelPipeline#addFirst

    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    	final AbstractChannelHandlerContext newCtx;
    	synchronized (this) {
    		checkMultiplicity(handler);
    		name = filterName(name, handler);
    		// #1
    		newCtx = newContext(group, name, handler);
    		addFirst0(newCtx);
    		// #2
    		if (!registered) {
    			newCtx.setAddPending();
    			callHandlerCallbackLater(newCtx, true);
    			return this;
    		}
    		// #3
    		EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
    	}
    	callHandlerAdded0(newCtx);
    	return this;
    }
    

    #1 构造一个ChannelHandlerContext并添加到拦截链表首部位置
    #2 当前Channel未注册,调用DefaultChannelPipeline#callHandlerCallbackLater,添加一个延迟任务
    #3 当前Channel已注册,调用DefaultChannelPipeline#callHandlerAdded0,完成ChannelHandler添加扩展操作。

    DefaultChannelPipeline#callHandlerCallbackLater方法,将当前ChannelHandlerContext转化为一个延迟任务PendingHandlerAddedTask或者PendingHandlerRemovedTask,加到DefaultChannelPipeline#pendingHandlerCallbackHead列表中。
    DefaultChannelPipeline#addLast/removeFirst/removeLast同样有类似处理延迟任务的逻辑。

    执行延迟任务
    AbstractUnsafe#register0方法#2步骤 -> DefaultChannelPipeline#callHandlerAddedForAllHandlers,该方法会执行pendingHandlerCallbackHead列表所有任务,调用其execute方法。
    PendingHandlerAddedTask#execute会调用ChannelHandler#handlerAdded,完成ChannelHandler添加扩展工作。
    PendingHandlerRemovedTask#execute则调用ChannelHandler#handlerRemoved,完成ChannelHandler移除善后工作。

    ServerBootstrap#init方法#2步骤给ServerChannel的ChannelPipeline添加一个ChannelInitializer,它是Netty提供的工具类,实现了ChannelHandler#handlerAdded方法,实现逻辑是如果当前Channel已注册,则调用initChannel方法,否则不处理(所以我们常常利用该接口在注册完成后添加新的ChannelHandler给ChannelHandler)。

    回到ServerBootstrap#init方法,由于该方法执行时Channel未注册,所以会生成延迟任务,由AbstractUnsafe#register0方法#2步骤触发完成实际操作,将ServerBootstrapAcceptor添加到ServerChannel的ChannelPipeline中。

    最后说一下本文提到的netty组件。
    Channel,通信通道,是Netty通信的基础组件。
    EventLoop,ChannelPipeline是Netty中比较重要的组件,后面有对应的文章解析。
    Unsafe负责实际数据传输工作,在解析Netty流程时会注解介绍它。
    ChannelFuture,ChannelPromise代表Netty异步IO结果,通过回调函数执行后续操作。

    如果您觉得本文不错,欢迎关注我的微信公众号。您的关注是我坚持的动力!

  • 相关阅读:
    MySQL 分区
    InnoDB 锁
    【神经网络】自编码聚类算法--DEC (Deep Embedded Clustering)
    【神经网络】变分自编码大杂烩
    【异常检测】Isolation forest 的spark 分布式实现
    【推荐系统】评估指标总结
    【推荐系统】neural_collaborative_filtering(源码解析)
    hadoop之计数器和管道的mrunit测试
    thrift0.5入门操作
    awk之close函数
  • 原文地址:https://www.cnblogs.com/binecy/p/13908698.html
Copyright © 2011-2022 走看看