zoukankan      html  css  js  c++  java
  • Netty源码解析 客户端启动过程

    上一篇文章分享了Netty服务端启动过程,本文继续分享Netty客户端启动过程。
    源码分析基于Netty 4.1

    Connect

    客户端启动过程比较简单,主要是Connect操作。
    Netty客户端启动引导类是Bootstrap,同样继承了AbstractBootstrap,它只有一个EventLoopGroup,下文称为ConnectGroup。

    Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
    										   final SocketAddress localAddress, final ChannelPromise promise) {
    	try {
    		final EventLoop eventLoop = channel.eventLoop();
    		// #1
    		final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    		
    		...
    		
    		final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
    		if (resolveFuture.isDone()) {
    			final Throwable resolveFailureCause = resolveFuture.cause();
    
    			if (resolveFailureCause != null) {
    				channel.close();
    				promise.setFailure(resolveFailureCause);
    			} else {
    				// #2
    				doConnect(resolveFuture.getNow(), localAddress, promise);
    			}
    			return promise;
    		}
    
    		...
    	} catch (Throwable cause) {
    		promise.tryFailure(cause);
    	}
    	return promise;
    }
    

    #1
    AddressResolver负责解析SocketAddress。它可以做一些地址转换工作。如Netty提供了RoundRobinInetAddressResolver,可以对下游服务集群进行轮询调用。
    Bootstrap#resolver是一个AddressResolverGroup,它负责构造AddressResolver,默认使用DefaultAddressResolverGroup。
    #2 调用doConnect,执行Connect操作。

    doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
    (这里涉及DefaultChannelPipeline的内容后续有文章解析)

    public final void connect(
    		final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    	...
    
    	try {
    		...
    
    		boolean wasActive = isActive();
    		// #1
    		if (doConnect(remoteAddress, localAddress)) {
    			fulfillConnectPromise(promise, wasActive);
    		} else {
    			connectPromise = promise;
    			requestedRemoteAddress = remoteAddress;
    
    			// #2
    			int connectTimeoutMillis = config().getConnectTimeoutMillis();
    			if (connectTimeoutMillis > 0) {
    				connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    					public void run() {
    						ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    						ConnectTimeoutException cause =
    								new ConnectTimeoutException("connection timed out: " + remoteAddress);
    						if (connectPromise != null && connectPromise.tryFailure(cause)) {
    							close(voidPromise());
    						}
    					}
    				}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    			}
    			// #3
    			promise.addListener(new ChannelFutureListener() {
    				public void operationComplete(ChannelFuture future) throws Exception {
    					if (future.isCancelled()) {
    						if (connectTimeoutFuture != null) {
    							connectTimeoutFuture.cancel(false);
    						}
    						connectPromise = null;
    						close(voidPromise());
    					}
    				}
    			});
    		}
    	} catch (Throwable t) {
    		promise.tryFailure(annotateConnectException(t, remoteAddress));
    		closeIfClosed();
    	}
    }
    

    #1 调用SocketChannel#connect,如果是非阻塞Socket调用,该方法返回false。
    #2 给EventLoop添加一个定时任务,如果连接超时则关闭Channel。
    Netty中也提供了ReadTimeoutHandler处理读超时的场景。
    #3 给promise添加一个回调方法,connect操作完成时,如果connect操作被取消了,则关闭Channel。

    NioSocketChannel#doConnect

    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        ...
    
        boolean success = false;
        try {
        	// #1
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            // #2
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }
    

    #1 调用(jvm)SocketChannel#connect方法,同样,非阻塞SocketChannel调用该方法,返回false。
    #2 关注OP_CONNECT事件。

    EventLoop中负责处理OP_CONNECT事件(EventLoop后面有文章解析),调用AbstractNioUnsafe#finishConnect完成连接操作。

    public final void finishConnect() {
    	...
    	try {
    		boolean wasActive = isActive();
    		// #1
    		doFinishConnect();
    		// #2
    		fulfillConnectPromise(connectPromise, wasActive);
    	} catch (Throwable t) {
    		fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    	} finally {
    		// #3
    		if (connectTimeoutFuture != null) {
    			connectTimeoutFuture.cancel(false);
    		}
    		connectPromise = null;
    	}
    }
    

    #1 doFinishConnect方法由子类NioSocketChannel实现,就是调用(jvm)SocketChannel#finishConnect()方法
    #2 设置connectPromise处理成功
    #3 取消connectTimeoutFuture延迟任务

    注册关注Read事件
    AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
    前面解析服务端启动过程时说过,HeadContext#channelActive会调用readIfIsAutoRead方法,判断是否开启autoRead,开启则自动触发read事件处理方法。
    HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
    AbstractNioChannel#doBeginRead在解析服务端启动过程时也说过,这里会注册关注Read事件。

    客户端启动完成后,客户端和服务端就可以开始进行Read/Write操作了。

    如果您觉得本文不错,欢迎关注我的微信公众号。您的关注是我坚持的动力!

  • 相关阅读:
    字符、字符串和文本处理
    接口
    泛型
    事件
    Expression表达式树 案例
    栈帧
    属性
    方法
    常量和字段
    Dynamic
  • 原文地址:https://www.cnblogs.com/binecy/p/13909853.html
Copyright © 2011-2022 走看看