zoukankan      html  css  js  c++  java
  • Netty源码解析—客户端启动

    Netty源码解析—客户端启动

    Bootstrap示例

    public final class EchoClient {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.git
            // 配置 SSL
            final SslContext sslCtx;
            if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the client.
            // 创建一个 EventLoopGroup 对象
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                // 创建 Bootstrap 对象
                Bootstrap b = new Bootstrap();
                b.group(group) // 设置使用的 EventLoopGroup
                 .channel(NioSocketChannel.class) // 设置要被实例化的为 NioSocketChannel 类
                 .option(ChannelOption.TCP_NODELAY, true) // 设置 NioSocketChannel 的可选项
                 .handler(new ChannelInitializer<SocketChannel>() { // 设置 NioSocketChannel 的处理器
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(new EchoClientHandler());
                     }
                 });
    
                // Start the client.
                // 连接服务器,并同步等待成功,即启动客户端
                ChannelFuture f = b.connect(HOST, PORT).sync();
    
                // Wait until the connection is closed.
                // 监听客户端关闭,并阻塞等待
                f.channel().closeFuture().sync();
            } finally {
                // Shut down the event loop to terminate all threads.
                // 优雅关闭一个 EventLoopGroup 对象
                group.shutdownGracefully();
            }
        }
    }
    

    我们进入到启动客户端的地方connect(...)

    	public ChannelFuture connect(SocketAddress remoteAddress) {
            if (remoteAddress == null) {
                throw new NullPointerException("remoteAddress");
            }
    		// 校验必要参数
            validate();
             // 解析远程地址,并进行连接
            return doResolveAndConnect(remoteAddress, config.localAddress());
        }
    

    我们进入到doResolveAndConnect()方法

        /**
         * @see #connect()
         */
        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            // 初始化并注册一个 Channel 对象,因为注册是异步的过程,所以返回一个 ChannelFuture 对象。
            final ChannelFuture regFuture = initAndRegister();
    		//...
            if (regFuture.isDone()) {
                //...
                // 解析远程地址,并进行连接
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                //...
                //如果异步注册对应的 ChanelFuture 未完成,则调用下面方法,添加监听器
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        //...
                        // 解析远程地址,并进行连接
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        //...
                    }
                });
                return promise;
            }
            //...
        }
    

    省略掉部分代码,我们可以知道这个方法主要做了两件事

    1. 注册一个Channel对象
    2. 解析地址并连接

    我们直接开始撸initAndRegister()

        final ChannelFuture initAndRegister() {
            //...
            //反射初始化一个NioSocketChannel
            channel = channelFactory.newChannel();
            //设置参数
            init(channel);
    		//...
            //注册channel
            ChannelFuture regFuture = config().group().register(channel);
            //...
            return regFuture;
        }
    

    我们到NioSocketChannel

        public NioSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
        
        private static SocketChannel newSocket(SelectorProvider provider) {
            try {
                //相当于java NIO的SocketChannel.open();
                return provider.openSocketChannel();
            } catch (IOException e) {
                throw new ChannelException("Failed to open a socket.", e);
            }
        }
        
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            //调用父类构造方法
            super(parent, socket);
            //初始化config属性,创建NioSocketChannelConfig对象
            config = new NioSocketChannelConfig(this, socket.socket());
        }
    

    我们再回到init(Channel channel)方法,初始化Channel配置

    @Override
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
    
        // 添加处理器到 pipeline 中
        p.addLast(config.handler());
    
        // 初始化 Channel 的可选项集合
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        // 初始化 Channel 的属性集合
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
    

    至此initAndRegister() 方法已经走完,回到我们的doResolveAndConnect()方法中,继续调用doResolveAndConnect0(),解析远程地址,并进行连接

        private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                                   final SocketAddress localAddress, final ChannelPromise promise) {
            try {
                final EventLoop eventLoop = channel.eventLoop();
                final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    
                if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                    // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                    doConnect(remoteAddress, localAddress, promise);
                    return promise;
                }
    			// 解析远程地址
                final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
                if (resolveFuture.isDone()) {
                	// 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
                    final Throwable resolveFailureCause = resolveFuture.cause();
    
                    if (resolveFailureCause != null) {
                        // Failed to resolve immediately
                        channel.close();
                        promise.setFailure(resolveFailureCause);
                    } else {
                        // Succeeded to resolve immediately; cached? (or did a blocking lookup)				
                        // 连接远程地址
                        doConnect(resolveFuture.getNow(), localAddress, promise);
                    }
                    return promise;
                }
    
                // Wait until the name resolution is finished.
                resolveFuture.addListener(new FutureListener<SocketAddress>() {
                    @Override
                    public void operationComplete(Future<SocketAddress> future) throws Exception {
                        // 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
                        if (future.cause() != null) {
                            channel.close();
                            promise.setFailure(future.cause());
                         // 解析远程地址成功,连接远程地址
                        } else {
                            doConnect(future.getNow(), localAddress, promise);
                        }
                    }
                });
            } catch (Throwable cause) {
                 // 发生异常,并回调通知 promise 异常
                promise.tryFailure(cause);
            }
            return promise;
        }
    

    我们走到doConnect()方法,执行Channel连接远程地址的逻辑.

        private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    

    然后调用connect()方法一直断点到AbstractNioUnsafe#connect

           AbstractNioUnsafe.java
    		@Override
            public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                try {
                    // 目前有正在连接远程地址的 ChannelPromise ,则直接抛出异常,禁止同时发起多个连接。
                    if (connectPromise != null) {
                        // Already a connect in process.
                        throw new ConnectionPendingException();
                    }
    				// 记录 Channel 是否激活
                    boolean wasActive = isActive();
                    // 执行连接远程地址
                    if (doConnect(remoteAddress, localAddress)) {
                        fulfillConnectPromise(promise, wasActive);
                    } else {
                        // 记录 connectPromise
                        connectPromise = promise;
                        // 记录 requestedRemoteAddress
                        requestedRemoteAddress = remoteAddress;
    
                        // 使用 EventLoop 发起定时任务,监听连接远程地址超时。若连接超时,则回调通知 connectPromise 超时异常。
                        // Schedule connect timeout.
                        int connectTimeoutMillis = config().getConnectTimeoutMillis();
                        if (connectTimeoutMillis > 0) {
                            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    ConnectTimeoutException cause =
                                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                        close(voidPromise());
                                    }
                                }
                            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }
    
                        // 添加监听器,监听连接远程地址取消。
                        promise.addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isCancelled()) {
                                    // 取消定时任务
                                    if (connectTimeoutFuture != null) {
                                        connectTimeoutFuture.cancel(false);
                                    }
                                    // 置空 connectPromise
                                    connectPromise = null;
                                    close(voidPromise());
                                }
                            }
                        });
                    }
                } catch (Throwable t) {
                    // 回调通知 promise 发生异常
                    promise.tryFailure(annotateConnectException(t, remoteAddress));
                    closeIfClosed();
                }
            }
    

    调用isActive()方法,获得Channel是否激活,判断SocketChannel是否处于打开,并且连接的状态,此时,一般返回false

        @Override
        public boolean isActive() {
            SocketChannel ch = javaChannel();
            return ch.isOpen() && ch.isConnected();
        }
    

    调用doConnect()方法

       NioSocketChannel.java
        @Override
        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        	//绑定本地地址
            if (localAddress != null) {
                doBind0(localAddress);
            }
            boolean success = false;
            try {
            	//连接远程地址
                boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
                // 若未连接完成,则关注连接( OP_CONNECT )事件
                if (!connected) {
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                // 标记执行是否成功
                success = true;
                return connected;
            } finally {
             	// 执行失败,则关闭 Channel
                if (!success) {
                    doClose();
                }
            }
        }
    

    一般情况下NIO client是不需要绑定本地地址的. 所以我们跟踪调用链AbstractChannelHandlerContext可知,localAddress传进来的是null

        AbstractChannelHandlerContext.java
    	@Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return connect(remoteAddress, null, promise);
        }
    

    所以走到boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);

    若连接没有完成时,我们调用selectionKey().interestOps(SelectionKey.OP_CONNECT);添加事件SelectionKey.OP_CONNECT,也就是说连接远程地址成功时,Channel对应的Selector将会轮训到该事件,可以进一步处理

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
    
        unsafe.finishConnect();
    }
    

    我们进入到AbstractNioChannel#finishConnect方法中

            @Override
            public final void finishConnect() {
                // Note this method is invoked by the event loop only if the connection attempt was
                // neither cancelled nor timed out.
    			//判断是否在EventLoop的线程中
                assert eventLoop().inEventLoop();
    
                try {
                  	//获得Channel是否激活
                    boolean wasActive = isActive();
                  	//执行完成连接
                    doFinishConnect();
                  	//通知connectPromice连接完成
                    fulfillConnectPromise(connectPromise, wasActive);
                } catch (Throwable t) {
                  	//通知connectPromice连接异常
                    fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
                } finally {
                  	// 取消 connectTimeoutFuture 任务
                    // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                    // See https://github.com/netty/netty/issues/1770
                    if (connectTimeoutFuture != null) {
                        connectTimeoutFuture.cancel(false);
                    }
                  	// 置空 connectPromise
                    connectPromise = null;
                }
            }
    

    在调试的过程中我们可以知道isActive();返回false,说明连接还没完成,然后调用doFinishConnect();

        @Override
        protected void doFinishConnect() throws Exception {
            if (!javaChannel().finishConnect()) {
                throw new Error();
            }
        }
    

    在这里调用java NIO的方法进行连接,然后调用fulfillConnectPromise(ChannelPromise promise, boolean wasActive)

            private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
                if (promise == null) {
                    // Closed via cancellation and the promise has been notified already.
                    return;
                }
    			// 获得 Channel 是否激活
                // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
                // We still need to ensure we call fireChannelActive() in this case.
                boolean active = isActive();
    			// 回调通知 promise 执行成功
                // trySuccess() will return false if a user cancelled the connection attempt.
                boolean promiseSet = promise.trySuccess();
    			// 若 Channel 是新激活的,触发通知 Channel 已激活的事件。
                // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
                // because what happened is what happened.
                if (!wasActive && active) {
                    pipeline().fireChannelActive();
                }
    
                // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
                if (!promiseSet) {
                    close(voidPromise());
                }
            }
    

    在这里isActive();已返回true,说明已成功连接.

    然后回调通知promise执行成功,如果你在connect(...)方法返回的ChannelFuture 的 ChannelFutureListener 的监听器,那么会执行里面的通知.

    最后调用pipeline().fireChannelActive();触发Channel激活的事件.调用channelActive()doBeginRead()走server端一样的代码,设置的 readInterestOp = SelectionKey.OP_READ 添加为感兴趣的事件

    走完doConnect(remoteAddress, localAddress)我们断点可知返回了false,所以是不会走fulfillConnectPromise(promise, wasActive);,而是执行else分支.

    int connectTimeoutMillis = config().getConnectTimeoutMillis();这里可以知道,设置了一个30s的时间

        @Override
        public int getConnectTimeoutMillis() {
            return connectTimeoutMillis;
        }
    	private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
    
    	private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
    

    调用 EventLoop#schedule(Runnable command, long delay, TimeUnit unit) 方法,发起定时任务connectTimeoutFuture,监听连接远程地址是否超时,并回调connectPromise超时异常

    if (connectTimeoutMillis > 0) {
        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                ConnectTimeoutException cause =
                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                    close(voidPromise());
                }
            }
        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    

    调用ChannelPromise#addListener(ChannelFutureListener)方法,添加监听器,监听连接远程地址是否取消.若取消,则取消 connectTimeoutFuture 任务,并置空 connectPromise 。这样,客户端 Channel 可以发起下一次连接。

    promise.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isCancelled()) {
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
                close(voidPromise());
            }
        }
    });
    

    至此为止,doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)已经全部分析完毕.

  • 相关阅读:
    Data Base Oracle 常用命令
    ASP.NET Core ASP.NET Core+MVC搭建及部署
    Hadoop 之 MapReduce 框架演变详解
    计算机网络: IP地址,子网掩码,默认网关,DNS服务器详解
    Linux系统基本网络配置之ifconfig命令
    Linux-eth0 eth0:1 和eth0.1关系、ifconfig以及虚拟IP实现介绍
    Linux 中将用户添加到组的指令
    几种常见的Shell
    常见的Shell
    Linux(CentOS6.5)下创建新用户和组,并制定用户和组ID
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/netty.html
Copyright © 2011-2022 走看看