zoukankan      html  css  js  c++  java
  • Executor框架

    服务器应用中,串行处理机制通常无法提供高吞吐率或快速响应性。通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。

    public class ThreadPerTaskWebServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                final Socket connection = socket.accept();
                Runnable task = new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                };
                new Thread(task).start();
            }
        }
    
        private static void handleRequest(Socket connection) {
            //do something
        }
    }

    对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。

    在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量线程时:

    1. 线程生命周期的开销非常高。线程的创建和销毁不是没有代价的。
    2. 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
    3. 稳定性。在可创建线程的数量上存在一个限制,这个限制值将随着平台的不同而不同,并且受多个因素制约。

    Executor接口

    public interface Executor{
        void executor(Runnable command);
    }

    线程池

    Executors 静态工厂方法创建

    • newFixedThreadPool 创建一个固定长度的线程池
    • newCachedThreadPool 创建一个可缓存的线程池 规模不存在限制
    • newSingleThreadPool 创建单个工作者线程来执行任务 异常后会替代
    • newScheduledThreadPool 创建一个固定长度的线程池 以延时或定时方式来执行

    以上实际都是通过ThreadPoolExecutor来创建的

    public class Executors {
        public static ExecutorService newFixedThreadPool(int var0) {
            return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        }
    
        public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
            return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
        }
    
        public static ExecutorService newSingleThreadExecutor() {
            return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
        }
    
        public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
            return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
        }
        //...
    }

    ThreadPoolExecutor 核心线程池的内部实现

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
    1. corePoolSize 指定了线程池中线程数量
    2. maximumPoolSize 指定了线程池中最大线程数量
    3. keepAliveTime 当线程池数量超过corePoolSize时,多余的空闲线程的存活时间
    4. unit keepAliveTime 的单位
    5. workQueue 任务队列 被提交但尚未被执行的任务
    6. threadFactory 线程工厂 用于创建线程,一般用默认的即可
    7. handler 拒绝策略。当任务太多来不及处理,如何拒绝任务

    workQueue 可以是以下几种:
    直接提交的队列:SynchronousQueue
    有界的任务队列:ArrayBlockingQueue
    无界的任务队列:LinkedBlockingQueue
    优先任务队列:PriorityBlockingQueue

    扩展线程池(extends java.util.concurrent.ThreadPoolExecutor)

    ThreadFactory 线程工厂

    每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。

    public interface ThreadFactory{
        Thread newThread(Runnable r);
    }

    Executors中提供一个默认线程工厂的实现DefaultThreadFactory

      static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
       }

    Executor的生命周期

    为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用户生命周期管理的方法(还有一些用于任务提交的便利方法)。

    public interface ExecutorService extends Executor{
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        <T> Future<T> submit(Callable<T> task);
        // .....
    }

    ExecutorService的生命周期有三种状态:运行,关闭和已终止

    平缓的关闭方式–shutdown:不再接受新的任务,同时等待已经提交的任务执行完成( 包括那些还未开始执行的任务 )。
    粗暴的关闭方式–shutdownNow:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

    JVM只有在所有非守护线程全部终止后才会退出,如果无法正确地关闭Executor,那么JVM将无法结束。

    ================================== 赵客缦胡缨,吴钩霜雪明。 银鞍照白马,飒沓如流星。 ==================================
  • 相关阅读:
    Golang flag包使用详解(一)
    string rune byte 的关系
    int在64位操作系统中占多少位?
    32位和64位系统区别及int字节数
    /etc/fstab修改及mkfs(e2label)相关应用与疑问
    nginx + fastcgi + c/c++
    MYSQL优化
    mysqlhighavailability
    woodmann--逆向工程
    jdaaaaaavid --github
  • 原文地址:https://www.cnblogs.com/lucare/p/9312662.html
Copyright © 2011-2022 走看看