zoukankan      html  css  js  c++  java
  • Java并发 线程池

    Java并发 线程池

    关注点

    ThreadPoolExecutor类,创建线程池的核心类。Executors静态工厂方法,通过这个类来创建线程池。

    ExecutorService接口 An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.
    Executor接口 An object that executes submitted Runnable tasks.
    Executors类 为 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable提供工厂和工具方法,
    阻塞队列

    ThreadPoolExecutor类

    java.util.concurrent.TheadPoolExecutor 继承自 java.util.cocurrent.AbstractExecutorService
    implements 接口 Executor ExecutorService

    线程池解决两个问题:解决大量异步工作中,减少每个线程的创建开销,提供了一种执行一组任务集合的管理资源的方法,资源包括
    线程,消耗。

    构造函数,

    **ThreadPoolExecutor**(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
    

    corePoolSize 核心池的大小
    maximumPoolSize 最大线程数
    keepAliveTime 没有任务执行时 最多保持多久会 终止
    TimeUnit unit 七种时间单位 天,小时,分钟,秒
    BlockingQueue workQueue 阻塞队列
    ThreadFactory threadFactory 线程工厂
    RejectedExecutionHandler handler 拒绝处理任务时 策略,

    • 线程池大小
      • 核心线程池大小。
      • 最大线程数。

    在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,
    除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
    当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。
    当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。

    • 阻塞队列

    ArrayBlockingQueue : 由数组结构组成的有界阻塞队列。
    LinkedBlockingQueue : 由链表结构组成的有界阻塞队列。
    PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
    DelayQueue: 使用优先级队列实现的无界阻塞队列。
    SynchronousQueue: 不存储元素的阻塞队列。
    LinkedTransferQueue: 由链表结构组成的无界阻塞队列。
    LinkedBlockingDeque: 由链表结构组成的双向阻塞队列。

    • 拒绝策略 -

    ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
    ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务

    Executors

    Executors弊端:

    1)newFixedThreadPool 和 newSingleThreadExecutor:
    主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
    2)newCachedThreadPool 和 newScheduledThreadPool:
    主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

    Executor接口

    public interface Executor {
    
        void execute(Runnable command);
    }
    

    ExecutorService接口##

    public interface ExecutorService extends Executor {
    
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        invokeAll(Collection<? extends Callable<T>> tasks)
    
    

    An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

    ExecutorService 可以被停止,会导致拒绝新任务task。
    两种shutdown方法。 shutdown(),允许之前提交的任务继续执行完毕,但不再接受新任务。
    shutdownNow(); 所有任务 立即停止,返回结果。

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             *三步
             * 1
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
    
    
    
    

    任务从提交到执行的步骤:

    public void execute(Runnable command){
        //...
        //三步: 
        1.如果少于corePoolSize在运行,尝试启动一个新线程,并将command给他。
        call addWorker
        addIfUnderCorePoolSize(command)
    =
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
    
    
    }
    

    参考资料:

    从阿里Java开发手册学习线程池的正确创建方法
    线程池的原理以及java的线程池框架

    线程池的原理以及java的线程池框架

  • 相关阅读:
    奔跑的绵羊js
    13.差分
    12.子矩阵的和
    11.前缀和
    10.高精度除法
    9.高精度乘法
    8.高精度减法
    7.高精度加法
    6.数的三次方根
    5.数的范围
  • 原文地址:https://www.cnblogs.com/ironbrady/p/6442203.html
Copyright © 2011-2022 走看看