zoukankan      html  css  js  c++  java
  • 001 服务端Channel的创建

    一 .入口

    在我们的服务器启动的代码之中,存在如下的代码:

    ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)

    我们调用channel方法传入来一个NioServerSocketChannel的字节码对象.

    服务端的channel正是使用这个字节码对象通过反射的方式进行创建的.

    我们回到主程序的入口bind()方法.

    二 .初始化Channel对象

    ChannelFuture regFuture = initAndRegister();
    

      在bind()方法之中的第一行代码如上.

    在该方法之中包含核心的逻辑分成三个:

    创建Channel对象

    初始化Channel对象

    注册Channel对象.

    我们下面分析Channel的创建.

    channel = channelFactory.newChannel();
    

      通过ChannelFactory对象调用newChannel()方法创建得到Channel对象.

    public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

    我们发现是通过反射创建的对象.

    下面我们需要分析一下Channel的默认构造函数.

        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }

    我们首先看看newSocket()方法的实现.

        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }

    实际上直接调用jdk的方法创建一个ServerSocketChannel对象.

    public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            // 创建NioServerSocketChannelConfig 与当前的对象进行绑定,后面都使用这个配置对象进行Channel的配置
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }

    在该构造函数之中包含两个逻辑,一个调用父类的构造函数完成构造,另外一个就是创建conifg对象,保存我们对服务端channel的配置信息.

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            // 保存jdk的Channel
            this.ch = ch;
            // 保存感兴趣的事件
            this.readInterestOp = readInterestOp;
            try {
                // 配置Channel为非阻塞的模式
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }

    我们需要看看父类的构造函数.

    protected AbstractChannel(Channel parent) {
            this.parent = parent;
            // 创建Channel的id
            id = newId();
            // 创建unsafe()对象,这个对象在后面会介绍
            unsafe = newUnsafe();
            // 创建ChannelPipeline对象,也就是说一个Channel对象会和一个ChannelPipeline进行绑定
            pipeline = newChannelPipeline();
        }

    构造了一个Channel最为核心的组件,unsafe对象和pipeline对象.

    好了现在我们完成了Channel的创建过程,下面分析Channel的初始化工作.

    init(channel);

    这个方法实际上是一个抽象方法,由ServerBootStrap进行实现.

    void init(Channel channel) throws Exception {
            // 保存属性
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            // 这个也就是我们说的workerGroup对象
            final EventLoopGroup currentChildGroup = childGroup;
            // 这个就是我们保存的childChannel,也就是初始化的ChannelHandler.
            final ChannelHandler currentChildHandler = childHandler;
    
            // 保存childChannel的属性,也就是那些tcp属性
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            // 向管道之中增加一个Channel的初始化器
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    // 我们通过handler()方法,存储的Handler对象
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    // 会向pipeline对象之中增加一个ServerBootstrapAcceptor 对象
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    // channel , 当前的处理器 子channel的配置属性
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }

    代码比较多,但是核心的代码吗主要分成下面的几个部分.

    [1] 保存服务端channel的配置信息

    [2]保存客户端连接的配置信息.

    [3]如果服务器端拥有handle,那么配置进去.

    [4] 向服务端channel之中添加一个ServerBootstrapAcceptor对象,携带的参数都是客户端连接信息.

    在这个方法之中我们调用了execute()方法,在后面的内容之中我们会分析事件循环组的内容.

    服务端channel的注册:

    doBind0(regFuture, channel, localAddress, promise);

    下面分析注册的过程.

    channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        // 这个操作是在boss线程完成的
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });

    实际上是向事件循环组之中放入了一个任务,我们现在不管事件循环组的功能,就当做一个线程池就好了,就向向线程组之中提交任务一样.

    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    pipeline.bind(localAddress, promise);
    tail.bind(localAddress, promise);
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            if (isNotValidPromise(promise, false)) {
                // cancelled
                return promise;
            }
    
            // 找寻outbound的处理器
            final AbstractChannelHandlerContext next = findContextOutbound();
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeBind(localAddress, promise);
            } else {
                safeExecute(executor, new Runnable() {
                    @Override
                    public void run() {
                        next.invokeBind(localAddress, promise);
                    }
                }, promise, null);
            }
            return promise;
        }

    找到一个ountbind()处理器,然后会调用next.invokeBind()方法.

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
            if (invokeHandler()) {
                try {
                    ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
                } catch (Throwable t) {
                    notifyOutboundHandlerException(t, promise);
                }
            } else {
                bind(localAddress, promise);
            }
        }

    现在pipeline之中仅仅只有tail,head,he之前增加的一个ServerbootAcceptor.

    三个之中唯一的ountbind()处理器就是Head.

            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }

    我们发现现在调用了unsafe()对象进行绑定.

     protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }

    现在我们终于找到实际调用jdk的底层帮助实现了channel的注册.

    以上就是我们通过bind()方法分析得出的结论,下文我们分析事件循环组的功能.

  • 相关阅读:
    现代算法(一) 基因算法
    01-02周 学习总结
    Linux命令之touch详解
    Linux命令之umask详解
    Linux命令之wc详解
    Linux命令之stat详解
    Linux命令之tail详解
    Linux命令之head详解
    Linux命令之less详解
    Linux命令
  • 原文地址:https://www.cnblogs.com/trekxu/p/13586811.html
Copyright © 2011-2022 走看看