zoukankan      html  css  js  c++  java
  • netty(二)---客户端连接

    概述

    先了解一下 netty 大概框架图 ,可以看到客户端的创建和服务端最大的区别 - 服务端传入两个 EventLoopGroup,客户端传入一个 EventLoopGroup - channel 的类型也不同,服务端传入的是 NioServerSocketChannel ,客户端传入的是 NioSocketChannel - 服务端存在 childHandler 的设置,客户端没有,

    客户端连接过程 : - 和服务端一样先创建 EventLoopGroup (只有一个,内部多个保持多个EventLoop线程,执行处理事务) - connect 方法 ,和服务端一样,使用group里的EventLoop创建一个 channel ,然后注册到 select 中去(这个过程都在channel 中进行) - (异步执行)当连接不上就会交给 EventLoop线程中执行监听的任务。 - 而一旦监听到了就交给 channel 执行。 - selectKey 可以attack一个object,刚好可以用来放channel ,然后在某个线程监听到某个实现的时候再把 channel 拿出来用

    源码分析

    实际上客户端只有一个 Reactor . 那么重点的逻辑就到了 connect 那里

        /**
         * Connect a {@link Channel} to the remote peer.
         */
        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
            if (remoteAddress == null) {
                throw new NullPointerException("remoteAddress");
            }
            validate();
            return doConnect(remoteAddress, localAddress);
        }
    
        /**
         * @see {@link #connect()}
         */
        private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        	//创建channel ,注册到 select , initAndRegister方法是父类的方法我们在分析服务端
        	//的时候已经分析过了
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            //一开始就连接上了,
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise = channel.newPromise();
            if (regFuture.isDone()) {
            	//链路成功后,异步连接 TCP 
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            } else {
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }
    
    
        private static void doConnect0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            // 到了执行连接的操作就转到了Netty 的 NIO线程执行,此刻客户端返回,连接异步执行。
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        if (localAddress == null) {
                        	//没传 localAddress 会传到 TailHandler的connect方法
                            channel.connect(remoteAddress, promise);
                        } else {
                        	//正常情况下到 HeaderHandler的connect 方法
                            channel.connect(remoteAddress, localAddress, promise);
                        }
                        promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    
    
    我们看一下 HeaderHandler的connect 方法。
    
           @Override
           public void connect(
                   ChannelHandlerContext ctx,
                   SocketAddress remoteAddress, SocketAddress localAddress,
                   ChannelPromise promise) throws Exception {
               //执行 HeaderHandler内的unsafe字段的 connect 方法 
               unsafe.connect(remoteAddress, localAddress, promise);
           }
    
    
    

    unsafe会执行AbstractNioChannel(这个类是NioServerSocketChannel和NioSocketChannel的共同父类)的connect 方法

           @Override
           public void connect(
                   final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
               if (!ensureOpen(promise)) {
                   return;
               }
    
               try {
                   if (connectPromise != null) {
                       throw new IllegalStateException("connection attempt already made");
                   }
    
                   boolean wasActive = isActive();
                   //doConnect 方法是个抽象方法
                   if (doConnect(remoteAddress, localAddress)) {
                       fulfillConnectPromise(promise, wasActive);
                   } else {
                       connectPromise = promise;
                       requestedRemoteAddress = remoteAddress;
    
                       // Schedule connect timeout.
                       int connectTimeoutMillis = config().getConnectTimeoutMillis();
                       if (connectTimeoutMillis > 0) {
                           connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                               @Override
                               public void run() {
                                   ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                   ConnectTimeoutException cause =
                                           new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                   if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                       close(voidPromise());
                                   }
                               }
                           }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                       }
    
                       promise.addListener(new ChannelFutureListener() {
                           @Override
                           public void operationComplete(ChannelFuture future) throws Exception {
                               if (future.isCancelled()) {
                                   if (connectTimeoutFuture != null) {
                                       connectTimeoutFuture.cancel(false);
                                   }
                                   connectPromise = null;
                                   close(voidPromise());
                               }
                           }
                       });
                   }
               } catch (Throwable t) {
                   if (t instanceof ConnectException) {
                       Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                       newT.setStackTrace(t.getStackTrace());
                       t = newT;
                   }
                   promise.tryFailure(t);
                   closeIfClosed();
               }
           }
    

    NioSocketChannel 的 doConnect 方法

        
        @Override
        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
            if (localAddress != null) {
                javaChannel().socket().bind(localAddress);
            }
    
            boolean success = false;
            try {
                boolean connected = javaChannel().connect(remoteAddress);
                if (!connected) {
                	//如果绑定不成功,注册连接事件
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                success = true;
                return connected;
            } finally {
                if (!success) {
                    doClose();
                }
            }
        }
    
    

    后续更新...

  • 相关阅读:
    poj 3624 (背包入门)
    poj 2175(最消费用最大流消圈法判断是否为最小费用)
    poj 2195 (最小费用最大流)
    poj 3659 (树上的最小支配集)
    Codeforces Beta Round #76 (Div. 1 Only)
    poj 2516(最小费用最大流)
    2013 腾讯马拉松初赛 第0场
    批量重命名,把文件名中的(1)去掉。
    ms sql server 添加列,删除列。
    winform 获取当前程序所在目录。
  • 原文地址:https://www.cnblogs.com/Benjious/p/11613265.html
Copyright © 2011-2022 走看看