zoukankan      html  css  js  c++  java
  • Netty源码解析—客户端启动

    Netty源码解析—客户端启动

    Bootstrap示例

    public final class EchoClient {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.git
            // 配置 SSL
            final SslContext sslCtx;
            if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the client.
            // 创建一个 EventLoopGroup 对象
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // 创建 Bootstrap 对象
                Bootstrap b = new Bootstrap();
                b.group(group) // 设置使用的 EventLoopGroup
                 .channel(NioSocketChannel.class) // 设置要被实例化的为 NioSocketChannel 类
                 .option(ChannelOption.TCP_NODELAY, true) // 设置 NioSocketChannel 的可选项
                 .handler(new ChannelInitializer<SocketChannel>() { // 设置 NioSocketChannel 的处理器
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(new EchoClientHandler());
                     }
                 });
    
                // Start the client.
                // 连接服务器,并同步等待成功,即启动客户端
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                // Wait until the connection is closed.
                // 监听客户端关闭,并阻塞等待
                f.channel().closeFuture().sync();
            } finally {
                // Shut down the event loop to terminate all threads.
                // 优雅关闭一个 EventLoopGroup 对象
                group.shutdownGracefully();
            }
        }
    }
    

    我们进入到启动客户端的地方connect(...)

    	public ChannelFuture connect(SocketAddress remoteAddress) {
            if (remoteAddress == null) {
                throw new NullPointerException("remoteAddress");
            }
    		// 校验必要参数
            validate();
             // 解析远程地址,并进行连接
            return doResolveAndConnect(remoteAddress, config.localAddress());
        }
    

    我们进入到doResolveAndConnect()方法

        /**
         * @see #connect()
         */
        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            // 初始化并注册一个 Channel 对象,因为注册是异步的过程,所以返回一个 ChannelFuture 对象。
            final ChannelFuture regFuture = initAndRegister();
    		//...
            if (regFuture.isDone()) {
                //...
                // 解析远程地址,并进行连接
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                //...
                //如果异步注册对应的 ChanelFuture 未完成,则调用下面方法,添加监听器
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        //...
                        // 解析远程地址,并进行连接
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        //...
                    }
                });
                return promise;
            }
            //...
        }
    

    省略掉部分代码,我们可以知道这个方法主要做了两件事

    1. 注册一个Channel对象
    2. 解析地址并连接

    我们直接开始撸initAndRegister()

        final ChannelFuture initAndRegister() {
            //...
            //反射初始化一个NioSocketChannel
            channel = channelFactory.newChannel();
            //设置参数
            init(channel);
    		//...
            //注册channel
            ChannelFuture regFuture = config().group().register(channel);
            //...
            return regFuture;
        }
    

    我们到NioSocketChannel

        public NioSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
        
        private static SocketChannel newSocket(SelectorProvider provider) {
            try {
                //相当于java NIO的SocketChannel.open();
                return provider.openSocketChannel();
            } catch (IOException e) {
                throw new ChannelException("Failed to open a socket.", e);
            }
        }
        
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            //调用父类构造方法
            super(parent, socket);
            //初始化config属性,创建NioSocketChannelConfig对象
            config = new NioSocketChannelConfig(this, socket.socket());
        }
    

    我们再回到init(Channel channel)方法,初始化Channel配置

    @Override
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
    
        // 添加处理器到 pipeline 中
        p.addLast(config.handler());
    
        // 初始化 Channel 的可选项集合
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        // 初始化 Channel 的属性集合
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
    

    至此initAndRegister() 方法已经走完,回到我们的doResolveAndConnect()方法中,继续调用doResolveAndConnect0(),解析远程地址,并进行连接

        private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                                   final SocketAddress localAddress, final ChannelPromise promise) {
            try {
                final EventLoop eventLoop = channel.eventLoop();
                final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    
                if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                    // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                    doConnect(remoteAddress, localAddress, promise);
                    return promise;
                }
    			// 解析远程地址
                final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
                if (resolveFuture.isDone()) {
                	// 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
                    final Throwable resolveFailureCause = resolveFuture.cause();
    
                    if (resolveFailureCause != null) {
                        // Failed to resolve immediately
                        channel.close();
                        promise.setFailure(resolveFailureCause);
                    } else {
                        // Succeeded to resolve immediately; cached? (or did a blocking lookup)				
                        // 连接远程地址
                        doConnect(resolveFuture.getNow(), localAddress, promise);
                    }
                    return promise;
                }
    
                // Wait until the name resolution is finished.
                resolveFuture.addListener(new FutureListener<SocketAddress>() {
                    @Override
                    public void operationComplete(Future<SocketAddress> future) throws Exception {
                        // 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
                        if (future.cause() != null) {
                            channel.close();
                            promise.setFailure(future.cause());
                         // 解析远程地址成功,连接远程地址
                        } else {
                            doConnect(future.getNow(), localAddress, promise);
                        }
                    }
                });
            } catch (Throwable cause) {
                 // 发生异常,并回调通知 promise 异常
                promise.tryFailure(cause);
            }
            return promise;
        }
    

    我们走到doConnect()方法,执行Channel连接远程地址的逻辑.

        private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    

    然后调用connect()方法一直断点到AbstractNioUnsafe#connect

           AbstractNioUnsafe.java
    		@Override
            public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                try {
                    // 目前有正在连接远程地址的 ChannelPromise ,则直接抛出异常,禁止同时发起多个连接。
                    if (connectPromise != null) {
                        // Already a connect in process.
                        throw new ConnectionPendingException();
                    }
    				// 记录 Channel 是否激活
                    boolean wasActive = isActive();
                    // 执行连接远程地址
                    if (doConnect(remoteAddress, localAddress)) {
                        fulfillConnectPromise(promise, wasActive);
                    } else {
                        // 记录 connectPromise
                        connectPromise = promise;
                        // 记录 requestedRemoteAddress
                        requestedRemoteAddress = remoteAddress;
    
                        // 使用 EventLoop 发起定时任务,监听连接远程地址超时。若连接超时,则回调通知 connectPromise 超时异常。
                        // Schedule connect timeout.
                        int connectTimeoutMillis = config().getConnectTimeoutMillis();
                        if (connectTimeoutMillis > 0) {
                            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    ConnectTimeoutException cause =
                                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                        close(voidPromise());
                                    }
                                }
                            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }
    
                        // 添加监听器,监听连接远程地址取消。
                        promise.addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isCancelled()) {
                                    // 取消定时任务
                                    if (connectTimeoutFuture != null) {
                                        connectTimeoutFuture.cancel(false);
                                    }
                                    // 置空 connectPromise
                                    connectPromise = null;
                                    close(voidPromise());
                                }
                            }
                        });
                    }
                } catch (Throwable t) {
                    // 回调通知 promise 发生异常
                    promise.tryFailure(annotateConnectException(t, remoteAddress));
                    closeIfClosed();
                }
            }
    

    调用isActive()方法,获得Channel是否激活,判断SocketChannel是否处于打开,并且连接的状态,此时,一般返回false

        @Override
        public boolean isActive() {
            SocketChannel ch = javaChannel();
            return ch.isOpen() && ch.isConnected();
        }
    

    调用doConnect()方法

       NioSocketChannel.java
        @Override
        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        	//绑定本地地址
            if (localAddress != null) {
                doBind0(localAddress);
            }
            boolean success = false;
            try {
            	//连接远程地址
                boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
                // 若未连接完成,则关注连接( OP_CONNECT )事件
                if (!connected) {
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                // 标记执行是否成功
                success = true;
                return connected;
            } finally {
             	// 执行失败,则关闭 Channel
                if (!success) {
                    doClose();
                }
            }
        }
    

    一般情况下NIO client是不需要绑定本地地址的. 所以我们跟踪调用链AbstractChannelHandlerContext可知,localAddress传进来的是null

        AbstractChannelHandlerContext.java
    	@Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return connect(remoteAddress, null, promise);
        }
    

    所以走到boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);

    若连接没有完成时,我们调用selectionKey().interestOps(SelectionKey.OP_CONNECT);添加事件SelectionKey.OP_CONNECT,也就是说连接远程地址成功时,Channel对应的Selector将会轮训到该事件,可以进一步处理

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
    
        unsafe.finishConnect();
    }
    

    我们进入到AbstractNioChannel#finishConnect方法中

            @Override
            public final void finishConnect() {
                // Note this method is invoked by the event loop only if the connection attempt was
                // neither cancelled nor timed out.
    			//判断是否在EventLoop的线程中
                assert eventLoop().inEventLoop();
    
                try {
                  	//获得Channel是否激活
                    boolean wasActive = isActive();
                  	//执行完成连接
                    doFinishConnect();
                  	//通知connectPromice连接完成
                    fulfillConnectPromise(connectPromise, wasActive);
                } catch (Throwable t) {
                  	//通知connectPromice连接异常
                    fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
                } finally {
                  	// 取消 connectTimeoutFuture 任务
                    // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                    // See https://github.com/netty/netty/issues/1770
                    if (connectTimeoutFuture != null) {
                        connectTimeoutFuture.cancel(false);
                    }
                  	// 置空 connectPromise
                    connectPromise = null;
                }
            }
    

    在调试的过程中我们可以知道isActive();返回false,说明连接还没完成,然后调用doFinishConnect();

        @Override
        protected void doFinishConnect() throws Exception {
            if (!javaChannel().finishConnect()) {
                throw new Error();
            }
        }
    

    在这里调用java NIO的方法进行连接,然后调用fulfillConnectPromise(ChannelPromise promise, boolean wasActive)

            private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
                if (promise == null) {
                    // Closed via cancellation and the promise has been notified already.
                    return;
                }
    			// 获得 Channel 是否激活
                // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
                // We still need to ensure we call fireChannelActive() in this case.
                boolean active = isActive();
    			// 回调通知 promise 执行成功
                // trySuccess() will return false if a user cancelled the connection attempt.
                boolean promiseSet = promise.trySuccess();
    			// 若 Channel 是新激活的,触发通知 Channel 已激活的事件。
                // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
                // because what happened is what happened.
                if (!wasActive && active) {
                    pipeline().fireChannelActive();
                }
    
                // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
                if (!promiseSet) {
                    close(voidPromise());
                }
            }
    

    在这里isActive();已返回true,说明已成功连接.

    然后回调通知promise执行成功,如果你在connect(...)方法返回的ChannelFuture 的 ChannelFutureListener 的监听器,那么会执行里面的通知.

    最后调用pipeline().fireChannelActive();触发Channel激活的事件.调用channelActive()doBeginRead()走server端一样的代码,设置的 readInterestOp = SelectionKey.OP_READ 添加为感兴趣的事件

    走完doConnect(remoteAddress, localAddress)我们断点可知返回了false,所以是不会走fulfillConnectPromise(promise, wasActive);,而是执行else分支.

    int connectTimeoutMillis = config().getConnectTimeoutMillis();这里可以知道,设置了一个30s的时间

        @Override
        public int getConnectTimeoutMillis() {
            return connectTimeoutMillis;
        }
    	private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
    
    	private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
    

    调用 EventLoop#schedule(Runnable command, long delay, TimeUnit unit) 方法,发起定时任务connectTimeoutFuture,监听连接远程地址是否超时,并回调connectPromise超时异常

    if (connectTimeoutMillis > 0) {
        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                ConnectTimeoutException cause =
                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                    close(voidPromise());
                }
            }
        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    

    调用ChannelPromise#addListener(ChannelFutureListener)方法,添加监听器,监听连接远程地址是否取消.若取消,则取消 connectTimeoutFuture 任务,并置空 connectPromise 。这样,客户端 Channel 可以发起下一次连接。

    promise.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isCancelled()) {
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
                close(voidPromise());
            }
        }
    });
    

    至此为止,doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)已经全部分析完毕.

  • 相关阅读:
    MFC Windows 程序设计>WinMain 简单Windows程序 命令行编译
    AT3949 [AGC022D] Shopping 题解
    CF643D Bearish Fanpages 题解
    CF643C Levels and Regions 题解
    CF241E Flights 题解
    CF671C Ultimate Weirdness of an Array 题解
    CF1592F Alice and Recoloring 题解
    GYM 102452E 题解
    CF494C Helping People 题解
    P5556 圣剑护符
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/netty.html
Copyright © 2011-2022 走看看