zoukankan      html  css  js  c++  java
  • netty(五) channel

    问题

    • channel 是如何处理发送一半中断后继续重发的
    • channel 具体作用是什么

    概述

    这一节我们将介绍 Channel 和内部接口 Unsafe .其中Unsafe 是内部接口,聚合在Channel 中协助网络读写操作相关的操作,设计初衷就是 Channel 的内部辅助类,不应该被用户使用。 
    

    继承类分析

    继承关系链 : 
    

    AbstractChannel -> AbstractNioChannel -> AbstractNioByteChannel -> NioSocketChannel 如下图 NioSocketChannel.PNG 从以上的类结构我们也要学习一下类的构建,各个类实现应该实现的功能,最后生成的具体类具有不同的功能。 AbstractChannel ,保存以下重要的字段 ,主要 - EventLoop - localAddress - remoteAddress - unsafe - DefaultChannelPipleline - Future类 和 Promise类 等

    AbstractNioChannel,从类名可以看出和nio 中 Channel 相关,注册,监听

        private final SelectableChannel ch;
        protected final int readInterestOp;
        private volatile SelectionKey selectionKey;
        private volatile boolean inputShutdown;
    
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    
    
     AbstractNioByteChannel 这个类是Channel对Byte进行操作,对ByteBuff的读写。
    

    源码分析

    AbstractChannel

    AbstractChannel 的读写方法都是交由 ChannelPiple 来解决的
    
        @Override
        public Channel read() {
            pipeline.read();
            return this;
        }
    
        @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    
    

    eventLoop方法,直接返回持有的 eventloop对象

        @Override
        public EventLoop eventLoop() {
            return eventLoop;
        }
    

    AbstractNioChannel

    public abstract class AbstractNioChannel extends AbstractChannel {
    
        private static final InternalLogger logger =
                InternalLoggerFactory.getInstance(AbstractNioChannel.class);
    
    
        // No.1 注册监听相关的字段
        private final SelectableChannel ch;
        protected final int readInterestOp;
        private volatile SelectionKey selectionKey;
        private volatile boolean inputShutdown;
    
    
    
        // No.2 异步执行的字段,或是回调相关的字段
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    
        ...
    
        //核心方法
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                	//拿父类的channel对象(父类的channel对象是java原生channel 对象)
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
        //开始read的操作
        @Override
        protected void doBeginRead() throws Exception {
            if (inputShutdown) {
                return;
            }
    
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            //就是改变监听的事件
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    
    
    

    AbstractNioByteChannel

        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;
    
            //循环
            for (;;) {
                Object msg = in.current(true);
                
                if (msg == null) {
                    // Wrote all messages.写完了(发送完了)所有的消息,清除标志,结束
                    clearOpWrite();
                    break;
                }
    
                if (msg instanceof ByteBuf) {
                    //加入是ByteBuf类型
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    //判断当前的可读字节是否为 0 ,为 0 丢弃掉
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }
    
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    //循环发送次数
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        //doWriteBytes 子类实现
                        int localFlushedAmount = doWriteBytes(buf);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        //一直到不可读
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }
                    //发送完,更新发送的进度(有可能没发完)
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        //没发完,设置写半包标识,启动刷新线程继续发送之前没有发送完成的半包消息
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else if (msg instanceof FileRegion) {
                    FileRegion region = (FileRegion) msg;
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
    
                    //循环发送
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        long localFlushedAmount = doWriteFileRegion(region);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        if (region.transfered() >= region.count()) {
                            done = true;
                            break;
                        }
                    }
                    //发送完(有可能发送了一半)更新进度
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        //没法完,创建一个任务扔到EventLoop
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else {
                    throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
                }
            }
        }
    
        //没写完(没发送完)
        protected final void incompleteWrite(boolean setOpWrite) {
            // Did not write completely.
            if (setOpWrite) {
                setOpWrite();
            } else {
                // Schedule flush again later so other tasks can be picked up in the meantime
                //创建任务扔到 eventLoop执行 
                Runnable flushTask = this.flushTask;
                if (flushTask == null) {
                    flushTask = this.flushTask = new Runnable() {
                        @Override
                        public void run() {
                            flush();
                        }
                    };
                }
                eventLoop().execute(flushTask);
            }
        }
    
    
    循环发送次数是指一次发送没有完成时(写半包),程序就继续尝试循环写操作,此时IO线程是不能处理其他事件的,例如读新的消息或者执行定时任务和 NioTask 等, 如果网络IO阻塞或者对方接收消息太慢,可能会导致线程假死,于是就要循环发送。 
    

    AbstractNioMessageChannel

    我们再来看一下AbstractNioChannel 的另外一个子类 AbstractNioMessageChannel,直接看doWrite方法
    
        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            final SelectionKey key = selectionKey();
            final int interestOps = key.interestOps();
    
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                    }
                    break;
                }
    
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }
    
                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    //没发送完,设置标志,交给 select 多路复用器轮询对应的channel重新发送尚未发送完成的半包信息
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            }
        }
    
    
    AbstractNioMessageChannel 和 AbstractNioByteChannel的区别在于
    

    NioServerSocketChannel 和 NioServerChannel 的分析

    NioSocketChannel 和 NioServerSocketChannel 的区别到底是什么?后者是服务端当中负责绑定端口,读取数据功能,连接和断开,写消息都不支持,这些功能都在NioSocketChannel中实现
    
    
    
    AbstractNioMessageServerChannel 的具体子类是 NioServerSocketChannel(该类是服务器端接受处理客户端的channel),它的doReadMessages方法(被对应的unsafe类read方法,这里可能有点饶,具体看代码实现)分析如下
    
    	@Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                	//构建一个NioSocketChannel放进数组中
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    

    NioServerChannel的源码分析

    public class NioServerSocketChannel extends AbstractNioMessageServerChannel
                                     implements io.netty.channel.socket.ServerSocketChannel {
    
        private static final ChannelMetadata METADATA = new ChannelMetadata(false);
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
    
        private static ServerSocketChannel newSocket() {
            try {
                return ServerSocketChannel.open();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    
        private final ServerSocketChannelConfig config;
    
        /**
         * Create a new instance
         */
        public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
            super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
            config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
        }
    
        @Override
        public InetSocketAddress localAddress() {
            return (InetSocketAddress) super.localAddress();
        }
    
        @Override
        public ChannelMetadata metadata() {
            return METADATA;
        }
    
        @Override
        public ServerSocketChannelConfig config() {
            return config;
        }
    
        @Override
        public boolean isActive() {
            return javaChannel().socket().isBound();
        }
    
        @Override
        public InetSocketAddress remoteAddress() {
            return null;
        }
    
        @Override
        protected ServerSocketChannel javaChannel() {
            return (ServerSocketChannel) super.javaChannel();
        }
    
        @Override
        protected SocketAddress localAddress0() {
            return javaChannel().socket().getLocalSocketAddress();
        }
    
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    
        @Override
        protected void doClose() throws Exception {
            javaChannel().close();
        }
    
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
        // Unnecessary stuff
        @Override
        protected boolean doConnect(
                SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected void doFinishConnect() throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected SocketAddress remoteAddress0() {
            return null;
        }
    
        @Override
        protected void doDisconnect() throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
    
    
    可以看到 NioServerChannel 的主要都是 override 父类的方法,即是说大部分的逻辑都在父类 Abstract中进行了一层层的封装,给我们一个启发,好的类结构在
    在一开始就已经设计好,最终的具体实现交由尾端实现。
    

    总结

    本文介绍了channel的主要功能作用。
    

    参考资料

    • 《Netty权威指南》
  • 相关阅读:
    Bit Manipulation
    218. The Skyline Problem
    Template : Two Pointers & Hash -> String process
    239. Sliding Window Maximum
    159. Longest Substring with At Most Two Distinct Characters
    3. Longest Substring Without Repeating Characters
    137. Single Number II
    142. Linked List Cycle II
    41. First Missing Positive
    260. Single Number III
  • 原文地址:https://www.cnblogs.com/Benjious/p/11634871.html
Copyright © 2011-2022 走看看