zoukankan      html  css  js  c++  java
  • netty

    netty的主从多线程模型,主线程(线程池中就一个线程)主要负责接收客户端的链接,然后转发给从线程(8个容量的线程池)进行实际的业务处理

    MainReactor = NioServerBoss
    SubReactor = NioWorker

     

     

    从图上看NioWorker 和 NioServer都继承AbstractNioSelector,主要的业务逻辑都会在这里进行处理

    在初始化的时候,会把主从的线程池进行初始化,最后都会执行得到AbstractNioSelector的run方法:

    主从 区别在于processTaskQueue和process的实现一个是boss,一个是worker

    worker的processTaskQueue,调用nioworker里面的Register类的run方法,主要几件事:设置channel非阻塞,注册selector,触发链接客户端(pipleline的上行链接事件)

    public void run() {
                SocketAddress localAddress = channel.getLocalAddress();
                SocketAddress remoteAddress = channel.getRemoteAddress();
    
                if (localAddress == null || remoteAddress == null) {
                    if (future != null) {
                        future.setFailure(new ClosedChannelException());
                    }
                    close(channel, succeededFuture(channel));
                    return;
                }
    
                try {
                    if (server) {
                        channel.channel.configureBlocking(false);
                    }
    
                    channel.channel.register(
                            selector, channel.getInternalInterestOps(), channel);
    
                    if (future != null) {
                        channel.setConnected();
                        future.setSuccess();
                    }
    
                    if (server || !((NioClientSocketChannel) channel).boundManually) {
                        fireChannelBound(channel, localAddress);
                    }
                    fireChannelConnected(channel, remoteAddress);
                } catch (IOException e) {
                    if (future != null) {
                        future.setFailure(e);
                    }
                    close(channel, succeededFuture(channel));
                    if (!(e instanceof ClosedChannelException)) {
                        throw new ChannelException(
                                "Failed to register a socket to the selector.", e);
                    }
                }
            }

     process方法是nioworker里的process,开始轮询selector里面已经就绪链接,通过nioworker读取bytebuffer-》转成channelbuffer,并触发消息处理机制(fireMessageReceived)它会调用我们的在pipeline中添加的各种handler处理

    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {
                        // Connection already closed - no need to handle write.
                        continue;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);
                }
            } catch (CancelledKeyException e) {
                close(k);
            }
    
            if (cleanUpCancelledKeys()) {
                break; // break the loop to avoid ConcurrentModificationException
            }
        }
    }
    protected boolean read(SelectionKey k) {
            final SocketChannel ch = (SocketChannel) k.channel();
            final NioSocketChannel channel = (NioSocketChannel) k.attachment();
    
            final ReceiveBufferSizePredictor predictor =
                channel.getConfig().getReceiveBufferSizePredictor();
            final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
            final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
    
            int ret = 0;
            int readBytes = 0;
            boolean failure = true;
    
            ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
            try {
                while ((ret = ch.read(bb)) > 0) {
                    readBytes += ret;
                    if (!bb.hasRemaining()) {
                        break;
                    }
                }
                failure = false;
            } catch (ClosedChannelException e) {
                // Can happen, and does not need a user attention.
            } catch (Throwable t) {
                fireExceptionCaught(channel, t);
            }
    
            if (readBytes > 0) {
                bb.flip();
    
                final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
                buffer.setBytes(0, bb);
                buffer.writerIndex(readBytes);
    
                // Update the predictor.
                predictor.previousReceiveBufferSize(readBytes);
    
                // Fire the event.
                fireMessageReceived(channel, buffer);
            }
    
            if (ret < 0 || failure) {
                k.cancel(); // Some JDK implementations run into an infinite loop without this.
                close(channel, succeededFuture(channel));
                return false;
            }
    
            return true;
        }

    再看一下boss的这边,可以肯定的是,会链接客户端因为这是入口,在一个就是来链接之后如何交给nioworker处理,并且看一下是如何分配的,

    processTaskQueue的方法是由nioserverboss的registerTask类中的run方法,下面会有什么时候把这个task放到队列:

    public void run() {
                boolean bound = false;
                boolean registered = false;
                try {
    //绑定地址端口 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound
    = true; future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress());
    //注册accept事件
    channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); registered
    = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!registered && bound) { close(channel, future); } } }

    nioserverboss的process方法

     protected void process(Selector selector) {
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                return;
            }
    //轮询触发的链接
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment(); try { // accept connections in a for loop until no new connection is ready for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket == null) { break; } registerAcceptedChannel(channel, acceptedSocket, thread); } } catch (CancelledKeyException e) { // Raised by accept() when the server socket was closed. k.cancel(); channel.close(); } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException // raised. } catch (ClosedChannelException e) { // Closed as requested. } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn( "Failed to accept a connection.", t); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // Ignore } } } }
    private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
    Thread currentThread) {
    try {
    ChannelSink sink = parent.getPipeline().getSink();
    ChannelPipeline pipeline =
    parent.getConfig().getPipelineFactory().getPipeline();
    //轮询每个nioworker,workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]
    NioWorker worker = parent.workerPool.nextWorker();
    //这里把客户端的socket绑定到了nioworker上,
    worker.register(new NioAcceptedSocketChannel(
    parent.getFactory(), pipeline, parent, sink
    , acceptedSocket,
    worker, currentThread), null);
    } catch (Exception e) {
    if (logger.isWarnEnabled()) {
    logger.warn(
    "Failed to initialize an accepted socket.", e);
    }

    try {
    acceptedSocket.close();
    } catch (IOException e2) {
    if (logger.isWarnEnabled()) {
    logger.warn(
    "Failed to close a partially accepted socket.",
    e2);
    }
    }
    }
    }

    abstractNioselector的register
    public void register(Channel channel, ChannelFuture future) {
      //这里的register又会走一遍nioserverboss的registerTask类中的run方法
    Runnable task = createRegisterTask(channel, future);
    registerTask(task);
    }

    protected final void registerTask(Runnable task) {
    //添加任务到队列
    taskQueue.add(task);

    Selector selector = this.selector;

    if (selector != null) {
    //cas操作,设置唤醒selector
    if (wakenUp.compareAndSet(false, true)) {
    selector.wakeup();
    }
    } else {
    if (taskQueue.remove(task)) {
    // the selector was null this means the Worker has already been shutdown.
    throw new RejectedExecutionException("Worker has already been shutdown");
    }
    }
    }

    再看一下NioAcceptedSocketChannel
    final class NioAcceptedSocketChannel extends NioSocketChannel {

    final Thread bossThread;

    NioAcceptedSocketChannel(
    ChannelFactory factory, ChannelPipeline pipeline,
    Channel parent, ChannelSink sink,
    SocketChannel socket, NioWorker worker, Thread bossThread) {

    super(parent, factory, pipeline, sink, socket, worker);

    this.bossThread = bossThread;

    setConnected();
    //产生链接事件,把链接交给nioworker
    fireChannelOpen(this);
    }
    }
     
    这里的register又会走一遍nioserverboss的registerTask类中的run方法
  • 相关阅读:
    在vue项目中如何关闭Eslint校验
    npm安装依赖过程中报错:Error: Can't find Python executable "python", you can set the PYTHON env variable
    如何在SAE搭建属于自己的黑盒xss安全测试平台
    掌握下面常用函数,学php不再难
    解决ThinkPHP中开启调试模式无法加载模块的问题。
    生成规则:Q+年后两位+月+日卡券类型+八位字母和数字随机
    SPI的全名为Service Provider Interface
    03、第三个阶段,Java 核心技术
    02、第二个阶段,Java 基础入门
    01、第一个阶段,环境和工具准备
  • 原文地址:https://www.cnblogs.com/aishangyizhihu/p/13946149.html
Copyright © 2011-2022 走看看