zoukankan      html  css  js  c++  java
  • 详细讲讲netty的pipiline!

    前言

    提到 Netty 首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到 NioEventLoopGroup 这个线程池,接下来进入正题。

    线程模型

    首先来看一段 Netty 的使用示例

    package com.coding.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public final class SimpleServer {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new SimpleServerHandler())
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                            }
                        });
    
                ChannelFuture f = b.bind(8888).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("channelActive");
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                System.out.println("channelRegistered");
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                System.out.println("handlerAdded");
            }
        }
    }
    
    

    下面将分析第一、二行代码,看下 NioEventLoopGroup 类的构造函数干了些什么。其余的部分将在其他博文中分析。

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    从代码中可以看到这里使用了两个线程池 bossGroup 和 workerGroup,那么为什么需要定义两个线程池呢?这就要说到 Netty 的线程模型了。

    Netty 的线程模型被称为 Reactor 模型,具体如图所示,图上的 mainReactor 指的就是 bossGroup,这个线程池处理客户端的连接请求,并将 accept 的连接注册到 subReactor 的其中一个线程上;图上的 subReactor 当然指的就是 workerGroup,负责处理已建立的客户端通道上的数据读写;图上还有一块 ThreadPool 是具体的处理业务逻辑的线程池,一般情况下可以复用 subReactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的 ThreadPool。

    NioEventLoopGroup 构造函数

    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, null);
    }
    
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        super(nThreads, threadFactory, selectorProvider);
    }
    

    NioEventLoopGroup 类中的构造函数最终都是调用的父类 MultithreadEventLoopGroup 如下的构造函数:

    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    

    从上面的构造函数可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()来创建对象,即不指定线程个数,则 netty 给我们使用默认的线程个数,如果指定则用我们指定的线程个数。

    默认线程个数相关的代码如下:

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    

    而 SystemPropertyUtil.getInt 函数的功能为:得到系统属性中指定 key(这里:key =”io.netty.eventLoopThreads”)所对应的 value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu 的核数的2倍。

    结论:如果没有设置程序启动参数(或者说没有指定 key=”io.netty.eventLoopThreads”的属性值),那么默认情况下线程的个数为 cpu 的核数乘以 2。

    继续看,由于 MultithreadEventLoopGroup 的构造函数是调用的是其父类 MultithreadEventExecutorGroup 的构造函数,因此,看下此类的构造函数

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }
    
        children = new SingleThreadEventExecutor[nThreads];
        //根据线程个数是否为2的幂次方,采用不同策略初始化chooser
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }
            //产生nTreads个NioEventLoop对象保存在children数组中
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                    //如果newChild方法执行失败,则对前面执行new成功的几个NioEventLoop进行shutdown处理
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    }
    

    该构造函数干了如下三件事:

    1. 产生了一个线程工场:threadFactory = newDefaultThreadFactory();

    MultithreadEventExecutorGroup类

    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());//getClass()为:NioEventLoopGroup.class
    }
    
    

    DefaultThreadFactory类

    public DefaultThreadFactory(Class<?> poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);
    }
    
    1. 根据线程个数是否为 2 的幂次方,采用不同策略初始化 chooser
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
    
    1. 产生 nTreads 个 NioEventLoop 对象保存在 children 数组中 ,线程都是通过调用 newChild 方法来产生的。
    @Override
    protected EventExecutor newChild(
            ThreadFactory threadFactory, Object... args) throws Exception {
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
    }
    

    这里传给 NioEventLoop 构造函数的参数为:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。

    NioEventLoop 构造函数分析

    既然上面提到来 new 一个 NioEventLoop 对象,下面我们就看下这个类以及其父类。

    NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
        super(parent, threadFactory, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }
    

    继续看父类 SingleThreadEventLoop 的构造函数

    protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
        super(parent, threadFactory, addTaskWakesUp);
    }
    

    又是直接调用来父类 SingleThreadEventExecutor 的构造函数,继续看

    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
    
        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;//false
    
        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                //调用NioEventLoop类的run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }
    
                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }
    
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    
        taskQueue = newTaskQueue();
    }
    protected Queue<Runnable> newTaskQueue() {
        return new LinkedBlockingQueue<Runnable>();
    }
    

    主要干如下两件事:

    1. 利用 ThreadFactory 创建来一个 Thread,传入了一个 Runnable 对象,该 Runnable 重写的 run 代码比较长,不过重点仅仅是调用 NioEventLoop 类的 run 方法。

    2. 使用 LinkedBlockingQueue 类初始化 taskQueue 。

    其中newThread 方法的代码如下:

    DefaultThreadFactory类

    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
    
        try {
        //判断是否是守护线程,并进行设置
            if (t.isDaemon()) {
                if (!daemon) {
                    t.setDaemon(false);
                }
            } else {
                if (daemon) {
                    t.setDaemon(true);
                }
            }
                //设置其优先级
            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
    
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(r, name);
    }
    

    FastThreadLocalThread类

    public FastThreadLocalThread(Runnable target, String name) {
        super(target, name);// FastThreadLocalThread extends Thread
    }
    

    到这里,可以看到底层还是借助于类似于Thread thread = new Thread(r)这种方式来创建线程。

    关于NioEventLoop对象可以得到的点有,初始化了如下4个属性。

    1. NioEventLoopGroup (在父类SingleThreadEventExecutor中)

    2. selector

    3. provider

    4. thread (在父类SingleThreadEventExecutor中)

    结束

    识别下方二维码!回复: 入群 ,扫码加入我们交流群!

    点赞是认可,在看是支持

    欢迎关注我的公众号!里面可以加入微信技术交流群!
  • 相关阅读:
    Promise.all和Promise.race区别,和使用场景
    使用Promise解决多层异步调用的简单学习【转】
    前端性能优化-缓存
    Node.js机制及原理理解初步【转】
    微信小程序 canvas 字体自动换行(支持换行符)
    百度地图-鼠标悬停样式
    文件I/O相关函数
    获取系统限制信息
    标准C头文件
    数据库系统小结:(不包括详细知识点,更像一个大纲)
  • 原文地址:https://www.cnblogs.com/1ssqq1lxr/p/15153386.html
Copyright © 2011-2022 走看看