zoukankan      html  css  js  c++  java
  • 线程池

    为什么要使用线程池呢?因为创建线程和销毁线程的开销很大,如果来一个请求就要创建一个线程,处理完之后又要销毁,这样频繁的创建和销毁会消耗大量的系统资源。
    java线程池的原理:创建一个ThreadPool,核心线程数为m,最大线程数为n,还有一个阻塞队列BlockingQueue。当来一个请求的时候,就创建一个线程,直到达到m。此时如果继续有请求到来,则不会创建线程了,而是把任务放到阻塞队列中。当请求继续到来,直到阻塞队列满了的时候,才会在m的基础上继续创建线程处理请求,直到达到n。如果此时还有请求到来,则采用拒绝策略。如果后面请求越来越少了,那么n-m的请求就会回收掉(在等待了keepAliveTime时间后),而核心线程数m会继续保留在pool中。
    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    • 阻塞队列
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于                                        ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
    • 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。。
       
     
    那么要如何向线程池中提交任务呢?
    可以使用execute提交任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。
    01 threadsPool.execute(new Runnable() {
    02 @Override
    03  
    04 public void run() {
    05  
    06 // TODO Auto-generated method stub
    07  
    08 }
    09  
    10 });
     
    我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
    try {
    Object s = future.get();
    catch (InterruptedException e) {
    catch (ExecutionException e) {
    finally {
    executor.shutdown();
    }
     
     
    类的层次结构:
    public interface Executor {
     
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         */
        void execute(Runnable command);
    }
     
    public interface ExecutorService extends Executor {
     
       
        void shutdown();
     
        
        boolean isShutdown();
     
        
        boolean isTerminated();
     
       
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
     
       
        <T> Future<T> submit(Callable<T> task);
     
        
        <T> Future<T> submit(Runnable task, T result);
     
        
        Future<?> submit(Runnable task);
     
       
    }
     
    public class ThreadPoolExecutor implements ExecutorService{
     

    Executors工具类:

    创建固定大小的线程池,核心线程数和最大线程数一样大。

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
     
     
     
    如何关闭线程池呢?

    public void shutdown()

    (1)线程池的状态变成SHUTDOWN状态,此时不能再往线程池中添加新的任务,否则会抛出RejectedExecutionException异常。

    (2)线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 

    注意这个函数不会等待提交的任务执行完成

    public List<Runnable> shutdownNow()

    (1)线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。

    (2)终止等待执行的线程,并返回它们的列表;

    (3)试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是大家知道,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

    要如何合理的配置线程池呢?

    需要考虑的因素有:

      1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
      2. 任务的优先级:高,中和低。
      3. 任务的执行时间:长,中和短。
      4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。

    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行。

     
  • 相关阅读:
    [转]HD钱包的助记词与密钥生成原理
    [转]简单科普私钥、地址、助记词、Keystore的区别
    [转]Sequelize 中文API文档-4. 查询与原始查询
    [转]Node.JS使用Sequelize操作MySQL
    [转]OmniLayer / omnicore API 中文版
    [转]usdt omnicore testnet 测试网络
    [转]USDT与omniCore钱包
    [转]BTC RPC API GetTransaction
    [转]比特币测试链——Testnet介绍
    [转]BTC手续费计算,如何设置手续费
  • 原文地址:https://www.cnblogs.com/james111/p/6995801.html
Copyright © 2011-2022 走看看