zoukankan      html  css  js  c++  java
  • Java线程池详解及常用方法

    前言

    最近被问到了线程池的相关问题。于是准备开始写一些多线程相关的文章。这篇将介绍一下线程池的基本使用。

    Executors

    Executors是concurrent包下的一个类,为我们提供了创建线程池的简便方法。
    Executors可以创建我们常用的四种线程池:
    (1)newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。不设上限,提交的任务将立即执行。
    (2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    (3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    (4)newSingleThreadExecutor 创建一个单线程化的线程池执行任务。

    Executors的坏处

    正常来说,我们不应该使用这种方式创建线程池,应该使用ThreadPoolExecutor来创建线程池。Executors创建的线程池也是调用的ThreadPoolExcutor的构造函数。通过原来可以看出。

    我们也看到了这里面的LinkedBlockingQueue并没有指定队列的大小是一个无界队列,这样可能会造成oom。所以我们要使用ThreadPoolExecutor这种方式。

    ThreadPoolExecutor

    通过源码看到ThreadPoolExecutor比较全的构造函数如下:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    分别解释一下参数的意义
    corePoolSize:线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
    maximumPoolSize:线程数的上限
    keepAliveTime:空闲的时间,超过这个空闲时间,线程将被回收
    unit:空闲时间的时间单位
    workQueue:任务的排队队列,当线程都运行的时候,有空的线程将从队列汇总进行拿取
    threadFactroy:当核心线程小于满线程的时候,又需要多加线程,则需要从工厂中获取线程
    handler:拒绝策略,当线程过多的时候的策略

    线程池针对于任务的执行顺序

    首先任务过来之后,看看corePoolSize是否有空闲的,有的话就执行。没有的话,放入任务队列里面。然后任务队列会通知线程工厂,赶紧造几个线程,来执行。当任务超过了最大的线程数,就执行拒绝策略,拒绝执行。

    submit方法

    线程池建立完毕之后,我们就需要往线程池提交任务。通过线程池的submit方法即可。
    submit方法接收两种Runable和Callable。
    区别如下:
    Runable是实现该接口的run方法,callable是实现接口的call方法。
    callable允许使用返回值。
    callable允许抛出异常。

    提交任务的方式

    Future submit(Callable task):这种方式可以拿到返回的结果。
    void execute(Runnable command):这种方式拿不到。
    Future<?> submit(Runnable task):这种方式可以get,但是永远是null。

    blockqueue的限制

    我们在创建线程池的时候,如果使用Executors。创建的是无界队列,容易造成oom。所以我们要自己执行queue的大小。

    BlockingQueue queue = new ArrayBlockingQueue<>(512)
    

    拒绝策略

    当任务队列的queue满了的时候,在提交任务,就要触发拒绝策略。队列中默认的拒绝策略是 AbortPolicy。是直接抛出异常的一种策略。
    如果是想实现自定义的策略,可以实现RejectedExecutionHandler 接口。
    线程池提供了如下的几种策略供选择。
    AbortPolicy:默认策略,抛出RejectedExecutionException
    DiscardPolicy:忽略当前提交的任务
    DiscardOldestPolicy:丢弃任务队列中最老的任务,给新任务腾出地方
    CallerRunsPolicy:由提交任务者执行这个任务

    ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
    				0, TimeUnit.SECONDS, 
    				new ArrayBlockingQueue<>(512), 
    				new ThreadPoolExecutor.DiscardPolicy());
    

    捕捉异常

    如之前所说Callable接口的实现,可以获取到结果和异常。通过返回的Future的get方法即可拿到。

    ExecutorService executorService = Executors.newFixedThreadPool(5);
    Future<Object> future = executorService.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                throw new RuntimeException("exception");// 该异常会在调用Future.get()时传递给调用者
            }
        });
    	
    try {
      Object result = future.get();
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    

    正确构造线程池的方式

    int poolSize = Runtime.getRuntime().availableProcessors() * 2;
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
    RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
    executorService = new ThreadPoolExecutor(poolSize, poolSize,
        0, TimeUnit.SECONDS,
                queue,
                policy);
    

    获取单个结果

    通过submit提交一个任务后,可以获取到一个future,调用get方法会阻塞并等待执行结果。get(long timeout, TimeUnit unit)可以指定等待的超时时间。

    获取多个结果

    可以使用循环依次调用,也可以使用ExecutorCompletionService。该类的take方式,会阻塞等待某一任务完成。向CompletionService批量提交任务后,只需调用相同次数的CompletionService.take()方法,就能获取所有任务的执行结果,获取顺序是任意的,取决于任务的完成顺序。

    void solve(Executor executor, Collection<Callable<Result>> solvers)
       throws InterruptedException, ExecutionException {
       
       CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 构造器
       
       for (Callable<Result> s : solvers)// 提交所有任务
           ecs.submit(s);
    	   
       int n = solvers.size();
       for (int i = 0; i < n; ++i) {// 获取每一个完成的任务
           Result r = ecs.take().get();
           if (r != null)
               use(r);
       }
    }
    

    这个类是对线程池的一个包装,包装完后,听过他进行submit和take。

    单个任务超时

    Future.get(long timeout, TimeUnit unit)。方法可以指定等待的超时时间,超时未完成会抛出TimeoutException。

    多个任务超时

    等待多个任务完成,并设置最大等待时间,可以通过CountDownLatch完成:

    public void testLatch(ExecutorService executorService, List<Runnable> tasks) 
    	throws InterruptedException{
          
        CountDownLatch latch = new CountDownLatch(tasks.size());
          for(Runnable r : tasks){
              executorService.submit(new Runnable() {
                  @Override
                  public void run() {
                      try{
                          r.run();
                      }finally {
                          latch.countDown();// countDown
                      }
                  }
              });
          }
    	  latch.await(10, TimeUnit.SECONDS); // 指定超时时间
      }
    

    await是总的时间,即使100个任务,需要跑20分钟。我10s超时了 也停止了。

  • 相关阅读:
    DEV GridView显示行号
    winfrom Log4Net 代码(二) 记录格式log_info.txt和log_error.txt,只产生两个文本,里面分别记录提示信息和报错信息
    VB.NET使用Log4Net
    Log4Net使用中loginfo.IsInfoEnabled=false问题解决方法
    VB.NET 根据当前日期获取星期几
    VB.NET 发送outLook邮件body基于Html样式
    VB.NET NPOI快速导入导出Excel
    python使用cx_oracle连接oracle数据库
    物理STANDBY库创建还原点(打开为READ WRITE后再变回STANDBY库)
    oracle RAC和RACOneNode之间的转换
  • 原文地址:https://www.cnblogs.com/jichi/p/12560235.html
Copyright © 2011-2022 走看看