zoukankan      html  css  js  c++  java
  • Reactor 模式在Netty中的应用

    Reactor 模式在Netty中的应用

    典型的Rector模式

    reactor

    mainReactor

    服务端创建成功后,会监听Accept操作,其中ServerSocketchannel中的PipeLine中现在包含3个handler
    handler
    服务端监听到连接事件后,会创建代表客户端的socketChannel,并向workgroup注册channel,监听后续该channel读写事件。一言以蔽之,bossGroup只负责接收连接请求,并创建socketChannel,分发到workgroup中执行耗时的读写操作。

    接收连接请求

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
    
    try {
        do {
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }
    
            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    

    注意ServerSocketchannel的allocHandle默认最大为1。

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            try {
                ch.close();
            } catch (Throwable t2) {
            }
        }
    
        return 0;
    }
    

    在接收到连接请求后,会触发PipeLine的firChannelRead事件。pipline中包含ServerBootstrap,read方法如下:

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    至此,将NioSocketChannel注册到workgroup中执行。

  • 相关阅读:
    Pwn-warmup_csaw_2016 writeup
    操作系统习题总结
    操作系统-存储器管理部分(待更新)
    树与二叉树之间的互相转换
    黑客攻防技术宝典-反病毒篇笔记(三)
    jaegeropentracing的Java-client完整分布式追踪链
    jaegeropentracing的Java-client
    IDEA2018.2版本注册
    Spring整合CXF webservice restful 实例
    带有WS-Security验证的webservice
  • 原文地址:https://www.cnblogs.com/dragonfei/p/9345737.html
Copyright © 2011-2022 走看看