zoukankan      html  css  js  c++  java
  • netty(五) channel

    问题

    • channel 是如何处理发送一半中断后继续重发的
    • channel 具体作用是什么

    概述

    这一节我们将介绍 Channel 和内部接口 Unsafe .其中Unsafe 是内部接口,聚合在Channel 中协助网络读写操作相关的操作,设计初衷就是 Channel 的内部辅助类,不应该被用户使用。 
    

    继承类分析

    继承关系链 : 
    

    AbstractChannel -> AbstractNioChannel -> AbstractNioByteChannel -> NioSocketChannel 如下图 NioSocketChannel.PNG 从以上的类结构我们也要学习一下类的构建,各个类实现应该实现的功能,最后生成的具体类具有不同的功能。 AbstractChannel ,保存以下重要的字段 ,主要 - EventLoop - localAddress - remoteAddress - unsafe - DefaultChannelPipleline - Future类 和 Promise类 等

    AbstractNioChannel,从类名可以看出和nio 中 Channel 相关,注册,监听

        private final SelectableChannel ch;
        protected final int readInterestOp;
        private volatile SelectionKey selectionKey;
        private volatile boolean inputShutdown;
    
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    
    
     AbstractNioByteChannel 这个类是Channel对Byte进行操作,对ByteBuff的读写。
    

    源码分析

    AbstractChannel

    AbstractChannel 的读写方法都是交由 ChannelPiple 来解决的
    
        @Override
        public Channel read() {
            pipeline.read();
            return this;
        }
    
        @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    
    

    eventLoop方法,直接返回持有的 eventloop对象

        @Override
        public EventLoop eventLoop() {
            return eventLoop;
        }
    

    AbstractNioChannel

    public abstract class AbstractNioChannel extends AbstractChannel {
    
        private static final InternalLogger logger =
                InternalLoggerFactory.getInstance(AbstractNioChannel.class);
    
    
        // No.1 注册监听相关的字段
        private final SelectableChannel ch;
        protected final int readInterestOp;
        private volatile SelectionKey selectionKey;
        private volatile boolean inputShutdown;
    
    
    
        // No.2 异步执行的字段,或是回调相关的字段
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    
        ...
    
        //核心方法
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                	//拿父类的channel对象(父类的channel对象是java原生channel 对象)
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
        //开始read的操作
        @Override
        protected void doBeginRead() throws Exception {
            if (inputShutdown) {
                return;
            }
    
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            //就是改变监听的事件
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    
    
    

    AbstractNioByteChannel

        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;
    
            //循环
            for (;;) {
                Object msg = in.current(true);
                
                if (msg == null) {
                    // Wrote all messages.写完了(发送完了)所有的消息,清除标志,结束
                    clearOpWrite();
                    break;
                }
    
                if (msg instanceof ByteBuf) {
                    //加入是ByteBuf类型
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    //判断当前的可读字节是否为 0 ,为 0 丢弃掉
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }
    
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    //循环发送次数
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        //doWriteBytes 子类实现
                        int localFlushedAmount = doWriteBytes(buf);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        //一直到不可读
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }
                    //发送完,更新发送的进度(有可能没发完)
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        //没发完,设置写半包标识,启动刷新线程继续发送之前没有发送完成的半包消息
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else if (msg instanceof FileRegion) {
                    FileRegion region = (FileRegion) msg;
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
    
                    //循环发送
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        long localFlushedAmount = doWriteFileRegion(region);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        if (region.transfered() >= region.count()) {
                            done = true;
                            break;
                        }
                    }
                    //发送完(有可能发送了一半)更新进度
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        //没法完,创建一个任务扔到EventLoop
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else {
                    throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
                }
            }
        }
    
        //没写完(没发送完)
        protected final void incompleteWrite(boolean setOpWrite) {
            // Did not write completely.
            if (setOpWrite) {
                setOpWrite();
            } else {
                // Schedule flush again later so other tasks can be picked up in the meantime
                //创建任务扔到 eventLoop执行 
                Runnable flushTask = this.flushTask;
                if (flushTask == null) {
                    flushTask = this.flushTask = new Runnable() {
                        @Override
                        public void run() {
                            flush();
                        }
                    };
                }
                eventLoop().execute(flushTask);
            }
        }
    
    
    循环发送次数是指一次发送没有完成时(写半包),程序就继续尝试循环写操作,此时IO线程是不能处理其他事件的,例如读新的消息或者执行定时任务和 NioTask 等, 如果网络IO阻塞或者对方接收消息太慢,可能会导致线程假死,于是就要循环发送。 
    

    AbstractNioMessageChannel

    我们再来看一下AbstractNioChannel 的另外一个子类 AbstractNioMessageChannel,直接看doWrite方法
    
        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            final SelectionKey key = selectionKey();
            final int interestOps = key.interestOps();
    
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                    }
                    break;
                }
    
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }
    
                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    //没发送完,设置标志,交给 select 多路复用器轮询对应的channel重新发送尚未发送完成的半包信息
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            }
        }
    
    
    AbstractNioMessageChannel 和 AbstractNioByteChannel的区别在于
    

    NioServerSocketChannel 和 NioServerChannel 的分析

    NioSocketChannel 和 NioServerSocketChannel 的区别到底是什么?后者是服务端当中负责绑定端口,读取数据功能,连接和断开,写消息都不支持,这些功能都在NioSocketChannel中实现
    
    
    
    AbstractNioMessageServerChannel 的具体子类是 NioServerSocketChannel(该类是服务器端接受处理客户端的channel),它的doReadMessages方法(被对应的unsafe类read方法,这里可能有点饶,具体看代码实现)分析如下
    
    	@Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                	//构建一个NioSocketChannel放进数组中
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    

    NioServerChannel的源码分析

    public class NioServerSocketChannel extends AbstractNioMessageServerChannel
                                     implements io.netty.channel.socket.ServerSocketChannel {
    
        private static final ChannelMetadata METADATA = new ChannelMetadata(false);
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
    
        private static ServerSocketChannel newSocket() {
            try {
                return ServerSocketChannel.open();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    
        private final ServerSocketChannelConfig config;
    
        /**
         * Create a new instance
         */
        public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
            super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
            config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
        }
    
        @Override
        public InetSocketAddress localAddress() {
            return (InetSocketAddress) super.localAddress();
        }
    
        @Override
        public ChannelMetadata metadata() {
            return METADATA;
        }
    
        @Override
        public ServerSocketChannelConfig config() {
            return config;
        }
    
        @Override
        public boolean isActive() {
            return javaChannel().socket().isBound();
        }
    
        @Override
        public InetSocketAddress remoteAddress() {
            return null;
        }
    
        @Override
        protected ServerSocketChannel javaChannel() {
            return (ServerSocketChannel) super.javaChannel();
        }
    
        @Override
        protected SocketAddress localAddress0() {
            return javaChannel().socket().getLocalSocketAddress();
        }
    
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    
        @Override
        protected void doClose() throws Exception {
            javaChannel().close();
        }
    
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
        // Unnecessary stuff
        @Override
        protected boolean doConnect(
                SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected void doFinishConnect() throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected SocketAddress remoteAddress0() {
            return null;
        }
    
        @Override
        protected void doDisconnect() throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
    
    
    可以看到 NioServerChannel 的主要都是 override 父类的方法,即是说大部分的逻辑都在父类 Abstract中进行了一层层的封装,给我们一个启发,好的类结构在
    在一开始就已经设计好,最终的具体实现交由尾端实现。
    

    总结

    本文介绍了channel的主要功能作用。
    

    参考资料

    • 《Netty权威指南》
  • 相关阅读:
    【大数据】HDFS高可用
    【Redis】常用命令、问题排查、内存优化
    【OOM】记一次线上OOM解决全流程
    【Git】Github如何弥补提交记录contributions
    Hash算法与Hash碰撞
    【计算机基础】存储单位换算
    【大数据】技术选型对比
    【MQ】Kafka架构与原理
    【Git】Git常用命令合集
    【maven】基本知识点
  • 原文地址:https://www.cnblogs.com/Benjious/p/11634871.html
Copyright © 2011-2022 走看看