zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

    Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
        interface Unsafe {
            RecvByteBufAllocator.Handle recvBufAllocHandle();
            SocketAddress localAddress();
            SocketAddress remoteAddress();
            void register(EventLoop eventLoop, ChannelPromise promise);
            void bind(SocketAddress localAddress, ChannelPromise promise);
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
            void disconnect(ChannelPromise promise);
            void close(ChannelPromise promise);
            void closeForcibly();
            void deregister(ChannelPromise promise);
            void beginRead();
            void write(Object msg, ChannelPromise promise);
            void flush();
            ChannelPromise voidPromise();
            ChannelOutboundBuffer outboundBuffer();
        }
        public interface NioUnsafe extends Unsafe {
            SelectableChannel ch();
            void finishConnect();
            void read();
            void forceFlush();
        }
    }

    NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

    AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

    每个操作大概分四个阶段处理

            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
                //执行前检查
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                boolean wasActive = isActive();
                //调用实现
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                //调用业务,通知pipeline
                if (!wasActive && isActive()) {
                    invokeLater(()-> pipeline.fireChannelActive(););
                }
                //完成阶段处理
                safeSetSuccess(promise);
            }
     @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    return;
                }
    
                outboundBuffer.addFlush();
                flush0();
            }
    
            @SuppressWarnings("deprecation")
            protected void flush0() {
                //刚完成Flush操作
                if (inFlush0) {
                     return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    
                //发送数据前链路检查
                if (!isActive()) {
                    try {
                        if (isOpen()) {
                            //true 通知 handler channelWritabilityChanged方法
                            outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                        } else {
                            outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    //调用channel实现
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    if (t instanceof IOException && config().isAutoClose()) {
                        close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    } else {
                        outboundBuffer.failFlushed(t, true);
                    }
                } finally {
                    inFlush0 = false;
                }
            }

    AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

    NioByteUnsafe:主要对NioUnsafe接口 read操作实现

    NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

    NioByteUnsafe read方法

          public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        //填充byteBuf 调用channel实现
                        int size = doReadBytes(byteBuf);
                        //记录最后读取长度
                        allocHandle.lastBytesRead(size);
                        //链路关闭,释放byteBuf
                        if (allocHandle.lastBytesRead() <= 0) {
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
                        //自增消息读取处理次数
                        allocHandle.incMessagesRead(1);
                        //已完成填充byteBuf 调用业务pipeline
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    //如果不是主动read 要完成后要清理read op
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }

    小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理

  • 相关阅读:
    yii1.0 yii2.0 ar
    php 函数
    整除理论,1.1数的整除性定理总结
    设M=5^2003+7^2004+9^2005+11^2006,求证8|M。(整除理论,1.1.8)
    已知整数m,n,p,q适合(m-p)|(mn+pq)证明:(m-p)|(mq+np)(整除理论1.1.5)
    证明:一个整数a若不能被6整除,则a2+24必能被24整除。(整除理论,1.1.4)
    ural 1073.Square Country(动态规划)
    10个男孩和n个女孩共买了n2+8n+2本书,已知他们每人买的书本的数量是相同的,且女孩人数多于南海人数,问女孩人数是多少?(整除原理1.1.3)
    设正整数n的十进制表示为n=ak……a1a0(0<=ai<=9,0<=i<=k,ak!=0),n的个位为起始数字的数字的正负交错之和T(n)=a0+a1+……+(-1)kak,证明:11|n的充分必要条件是11|T(n);(整除理论1.1.2))
    设n是奇数,证明:16|(n4+4n2+11)(整除原理1.1.1)
  • 原文地址:https://www.cnblogs.com/solq111/p/7059788.html
Copyright © 2011-2022 走看看