zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

    Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
        interface Unsafe {
            RecvByteBufAllocator.Handle recvBufAllocHandle();
            SocketAddress localAddress();
            SocketAddress remoteAddress();
            void register(EventLoop eventLoop, ChannelPromise promise);
            void bind(SocketAddress localAddress, ChannelPromise promise);
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
            void disconnect(ChannelPromise promise);
            void close(ChannelPromise promise);
            void closeForcibly();
            void deregister(ChannelPromise promise);
            void beginRead();
            void write(Object msg, ChannelPromise promise);
            void flush();
            ChannelPromise voidPromise();
            ChannelOutboundBuffer outboundBuffer();
        }
        public interface NioUnsafe extends Unsafe {
            SelectableChannel ch();
            void finishConnect();
            void read();
            void forceFlush();
        }
    }

    NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

    AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

    每个操作大概分四个阶段处理

            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
                //执行前检查
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                boolean wasActive = isActive();
                //调用实现
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                //调用业务,通知pipeline
                if (!wasActive && isActive()) {
                    invokeLater(()-> pipeline.fireChannelActive(););
                }
                //完成阶段处理
                safeSetSuccess(promise);
            }
     @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    return;
                }
    
                outboundBuffer.addFlush();
                flush0();
            }
    
            @SuppressWarnings("deprecation")
            protected void flush0() {
                //刚完成Flush操作
                if (inFlush0) {
                     return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    
                //发送数据前链路检查
                if (!isActive()) {
                    try {
                        if (isOpen()) {
                            //true 通知 handler channelWritabilityChanged方法
                            outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                        } else {
                            outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    //调用channel实现
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    if (t instanceof IOException && config().isAutoClose()) {
                        close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    } else {
                        outboundBuffer.failFlushed(t, true);
                    }
                } finally {
                    inFlush0 = false;
                }
            }

    AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

    NioByteUnsafe:主要对NioUnsafe接口 read操作实现

    NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

    NioByteUnsafe read方法

          public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        //填充byteBuf 调用channel实现
                        int size = doReadBytes(byteBuf);
                        //记录最后读取长度
                        allocHandle.lastBytesRead(size);
                        //链路关闭,释放byteBuf
                        if (allocHandle.lastBytesRead() <= 0) {
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
                        //自增消息读取处理次数
                        allocHandle.incMessagesRead(1);
                        //已完成填充byteBuf 调用业务pipeline
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    //如果不是主动read 要完成后要清理read op
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }

    小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理

  • 相关阅读:
    Socket
    UIView的layoutSubviews和drawRect方法何时调用
    断点续传
    IOS后台执行
    应用程序的状态
    ASIHTTPRequest的使用
    XCode5 使用AutoLayout情况下改变控件的 方法
    从指定的view中截图 返回UIImage
    找到当前视图的父视图控制器
    07.网络总结(面试方面)
  • 原文地址:https://www.cnblogs.com/solq111/p/7059788.html
Copyright © 2011-2022 走看看