zoukankan      html  css  js  c++  java
  • Java线程池相关类

    Java线程池相关类

    // 最顶层接口,仅提供execute方法
    public interface Executor {
      void execute(Runnable command);
    }
    
    // ExecutorService,扩展自Executor,提供了对任务的管理能力
    public interface ExecutorService extends Executor {
      // 停止接受新任务
      void shutdown();
      
      // 停止在执行的任务
      List<Runnable> showdownNow();
      
      // 等待所有任务结束(完成、中断或超时)
      boolean awaitTermination(long timeout, TimeUnit unit);
      
      // 以及各种类型的submit方法,返回Future对象
      <T> Future<T> submit(Callable<T> task);
      
      // 以及批量处理任务
      <T> T invokeAny(Collection<> tasks);
      <T> List<Future<T>> invokeAll(Collection<> tasks);
    }
    
    // 其中涉及的Future接口
    public interface Future<V> {
      // 取消执行任务
      boolean cancel(boolean mayInterruptIfRunning);
      //是否完成
      boolean isDone();
      // 等待任务执行结束并返回结果
      V get();
      V get(long timeout, TimeUnit unit);
    }
    
    // ExecutorService的默认抽象实现
    public abstract class AbstractExecutorService implements ExecutorService {
      // 基础方法,用于生成Future对象
      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
      }
      
    	// submit仅把task封装为Future,再调用一次Executor.execute()方法
      public <T> Future<T> submit(Callable<T> task) {
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
      }
      
      // 还包含invokeAny、invokeAll的实现
    }
    
    // 然后说真正的线程池类
    public class ThreadPoolExecutor extends AbstractExecutorService {}
    
    // Fork/Join框架是JDK1.7开始提供的用于并行执行任务的框架,它的思想是将一个大任务分割成若干小任务,最终汇总
    // 线程池和ForkJoin池都是直接继承自AbstractExecutorService
    public class ForkJoinPool extends AbstractExecutorService {}
    
    
    public class ThreadPoolExecutor {
      // ThreadPoolExecutor构造方法
      public ThreadPoolExecutor (
        int corePoolSize, // 线程池中核心的线程数(即使空闲也保持)
        int maximumPoolSize, // 允许最多的线程数
        long keepAliveTime, // 多出的线程空闲多久回收
        TimeUnit unit, // 回收的时间单位
        BlockingQueue<Runnable> workQueue, // 工作队列,用于存放已提交的任务
        ThreadFactory threadFactory, // 线程工厂,工厂方法:Thread newThread(Runnable r);
        RejectedExecutionHandler handler // 拒绝执行时的处理回调
      ) {
        // 
      }
      
      // 提交任务,分三步走:
      // 1. 如果线程数少于核心线程数,则创建新线程
      // 2. 否则尝试加入工作队列(加入之后会再次检查是否需要创建新线程)
      // 3. 如果不能加入工作队列,则创建新线程,否则拒绝
      public void execute(Runnable command) {
        // 线程数少时直接开启新线程
        if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true)) return;
        }
        
        // 加入队列
        if (isRunning(c) && workQueue.offer(command)) {
          // double check
        } else if (!addWorker(command, false)) {
          reject(command);
        }
        
      }
      
      // 线程池的Worker继承自AQS,定义了lock、unlock等方法
      private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        //
      }
      
    }
    
    
    
    public class ForkJoinPool {
      // ForkJoinPool构造方法
      public ForkJoinPool(
        int parallelism, // 并行度,默认取CPU的核心数
        ForkJoinWorkerThreadFactory factory, // worker线程的工厂类
        UncaughtExceptionHandler handler, // 异常处理回调
        boolean asyncMode // 控制工作队列的工作模式,如果为true则使用FIFO(从尾取),否则LIFO(从头取)
      ) {}
      
      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        // externalPush();
      }
      
      public <T> ForkJoinTask<T> submit(Callable<T> task) {
        // externalPush(new ForkJoinTask.AdaptedCallable<T>(task));
      }
      
      public <T> ForkJoinTask<T> submit(Runnable<T> task) {
        // externalPush(new ForkJoinTask.AdaptedRunnableAction<T>(task));
      }
      
      // ForkJoinTask是一个抽象类
      // 递归的RecursiveTask(有返回值)
      // 递归的RecursiveAction(无返回值)
      public static class XXXTask extends RecursiveTask<Integer> {
        
        // 递归任务需实现compute方法
        // 该方法主要实现任务量的拆分(分治思想)
        @Override
        public Integer compute() {
          // this => XXXTask left, right;
          return left.join() + right.join();
        }
        
      }
      
    }
    
    
    // 最后看看Executors工具类
    public class Executors {
      // 创建拥有固定线程数的线程池
      public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(
          nThreads, // corePoolSize = nThreads
          nThreads, // maximumPoolSize = nThreads
          0L, // 不超时
          MILLISECONDS, 
          new LinkedBlockingQueue<Runnable>() // 无界队列
        );
      }
      
      // 创建ForkJoin线程池
      public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool(
        	parallelism, // 并行度
          ForkJoinPool.defaultForkJoinWorkerThreadFactory, // 默认的线程工厂
          null, // 不设置异常处理
          true // 异步模式
        );
      }
      
      
      // 创建只有一个线程的线程池
    	public static ExecutorService newSingleThreadExecutor() {
        // 
      }
      
      // 创建一个可缓存的线程池(需要时创建,不需要时回收)
      public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(
          0, // 初始线程数
          Integer.MAX_VALUE, // 不限制最大数
          60L, // 空闲60秒回收
          TimeUnit.SECONDS, 
          new SynchronousQueue<Runnable>() // 同步队列(长度为0,因为需要时直接创建线程,不需要排队)
        );
      }
      
      // 创建一个可以调度任务的线程池
      public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
      }
      
      public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(
              corePoolSize, // 
              Integer.MAX_VALUE, // 默认的可调度任务的线程池不限制最大线程数 
              0, // 而且不会过期
              NANOSECONDS,
              new DelayedWorkQueue()
            );
        }
      }
      
      // 补充一下ScheduledExecutorService
      public interface ScheduledExecutorService extends ExecutorService {
        // 延迟执行
        ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 
        
        // 固定频率执行(按开始时间)
        ScheduledFuture<?> scheduleAtFixedRate(
          Runnable command, 
          long initialDelay, // 第一次执行的延迟时间
          long period, // 每个固定的时间间隔后执行
          TimeUnit unit
        );
        
        // 固定延迟时间执行(按上一次完成时间)
        ScheduledFuture<?> scheduleWithFixedDelay(
          Runnable command, 
          long initialDelay, 
          long delay, 
          TimeUnit unit
        );
      }
      
    }
    

    最后再说一点:

    阿里开发规范为什么不允许直接使用Executors创建线程池?

    是因为Executors还是通过调用ThreadPoolExecutor实现,但是对于最大线程数和队列提供了默认方式,该方式容易被调用者忽略从而导致OOM问题,所以需要开发者根据实际情况选择合适的参数。

  • 相关阅读:
    4 linux上运行crm&uwsgi
    Vue 目录
    vue -(滚动播放-全屏展示)
    Chrome 调试技巧
    Vue 遇到的坑
    9 matplotlib
    Maven
    Dev-C++黑暗主体和代码高亮配置
    Git操作:一次push把代码提交到两个仓库
    windows server 2012 r2 快速启动 部署remoteapp 显示服务器有挂起的重启,怎么解决?
  • 原文地址:https://www.cnblogs.com/jieyuefeng/p/12064703.html
Copyright © 2011-2022 走看看