zoukankan      html  css  js  c++  java
  • 线程池的问题

    面试-线程池的成长之路      

     

    背景

    相信大家在面试过程中遇到面试官问线程的很多,线程过后就是线程池了。从易到难,都是这么个过程,还有就是确实很多人在工作中接触线程池比较少,最多的也就是创建一个然后往里面提交线程,对于一些经验很丰富的面试官来说,一下就可以问出很多线程池相关的问题,与其被问的晕头转向,还不如好好学习。此时不努力更待何时。

    什么是线程池?

    线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。

    如果每个请求都创建一个线程去处理,那么服务器的资源很快就会被耗尽,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

    如果用生活中的列子来说明,我们可以把线程池当做一个客服团队,如果同时有1000个人打电话进行咨询,按照正常的逻辑那就是需要1000个客服接听电话,服务客户。现实往往需要考虑到很多层面的东西,比如:资源够不够,招这么多人需要费用比较多。正常的做法就是招100个人成立一个客服中心,当有电话进来后分配没有接听的客服进行服务,如果超出了100个人同时咨询的话,提示客户等待,稍后处理,等有客服空出来就可以继续服务下一个客户,这样才能达到一个资源的合理利用,实现效益的最大化。

    Java中的线程池种类

    1. newSingleThreadExecutor

    创建方式:

    1. ExecutorService pool = Executors.newSingleThreadExecutor();

    一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    使用方式:

    1. import java.util.concurrent.ExecutorService;
    2. import java.util.concurrent.Executors;
    3. public class ThreadPool {
    4. public static void main(String[] args) {
    5. ExecutorService pool = Executors.newSingleThreadExecutor();
    6. for (int i = 0; i < 10; i++) {
    7. pool.execute(() -> {
    8. System.out.println(Thread.currentThread().getName() + " 开始发车啦....");
    9. });
    10. }
    11. }
    12. }

    输出结果如下:

    1. pool-1-thread-1 开始发车啦....
    2. pool-1-thread-1 开始发车啦....
    3. pool-1-thread-1 开始发车啦....
    4. pool-1-thread-1 开始发车啦....
    5. pool-1-thread-1 开始发车啦....
    6. pool-1-thread-1 开始发车啦....
    7. pool-1-thread-1 开始发车啦....
    8. pool-1-thread-1 开始发车啦....
    9. pool-1-thread-1 开始发车啦....
    10. pool-1-thread-1 开始发车啦....

    从输出的结果我们可以看出,一直只有一个线程在运行。

    2.newFixedThreadPool

    创建方式:

    1. ExecutorService pool = Executors.newFixedThreadPool(10);

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    使用方式:

    1. import java.util.concurrent.ExecutorService;
    2. import java.util.concurrent.Executors;
    3. public class ThreadPool {
    4. public static void main(String[] args) {
    5. ExecutorService pool = Executors.newFixedThreadPool(10);
    6. for (int i = 0; i < 10; i++) {
    7. pool.execute(() -> {
    8. System.out.println(Thread.currentThread().getName() + " 开始发车啦....");
    9. });
    10. }
    11. }
    12. }

    输出结果如下:

    1. pool-1-thread-1 开始发车啦....
    2. pool-1-thread-4 开始发车啦....
    3. pool-1-thread-3 开始发车啦....
    4. pool-1-thread-2 开始发车啦....
    5. pool-1-thread-6 开始发车啦....
    6. pool-1-thread-7 开始发车啦....
    7. pool-1-thread-5 开始发车啦....
    8. pool-1-thread-8 开始发车啦....
    9. pool-1-thread-9 开始发车啦....
    10. pool-1-thread-10 开始发车啦....

    3. newCachedThreadPool

    创建方式:

    1. ExecutorService pool = Executors.newCachedThreadPool();

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务。

    使用方式如上2所示。

    4.newScheduledThreadPool

    创建方式:

    1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

    此线程池支持定时以及周期性执行任务的需求。

    使用方式:

    1. import java.util.concurrent.Executors;
    2. import java.util.concurrent.ScheduledExecutorService;
    3. import java.util.concurrent.TimeUnit;
    4. public class ThreadPool {
    5. public static void main(String[] args) {
    6. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
    7. for (int i = 0; i < 10; i++) {
    8. pool.schedule(() -> {
    9. System.out.println(Thread.currentThread().getName() + " 开始发车啦....");
    10. }, 10, TimeUnit.SECONDS);
    11. }
    12. }
    13. }

    上面演示的是延迟10秒执行任务,如果想要执行周期性的任务可以用下面的方式,每秒执行一次

    1. //pool.scheduleWithFixedDelay也可以
    2. pool.scheduleAtFixedRate(() -> {
    3. System.out.println(Thread.currentThread().getName() + " 开始发车啦....");
    4. }, 1, 1, TimeUnit.SECONDS);

    5.newWorkStealingPool
    newWorkStealingPool是jdk1.8才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的ForkJoinPool来实现的。ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

    说说线程池的拒绝策略

    当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

    • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
    1. public static class AbortPolicy implements RejectedExecutionHandler {
    2. public AbortPolicy() { }
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4. throw new RejectedExecutionException("Task " + r.toString() +
    5. " rejected from " +
    6. e.toString());
    7. }
    8. }
    • CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
    2. public CallerRunsPolicy() { }
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4. if (!e.isShutdown()) {
    5. r.run();
    6. }
    7. }
    8. }
    • DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    2. public DiscardOldestPolicy() { }
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4. if (!e.isShutdown()) {
    5. e.getQueue().poll();
    6. e.execute(r);
    7. }
    8. }
    9. }
    • DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
    1. public static class DiscardPolicy implements RejectedExecutionHandler {
    2. public DiscardPolicy() { }
    3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    4. }
    5. }

    除了JDK默认为什么提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可

    比如Spring integration中就有一个自定义的拒绝策略CallerBlocksPolicy,将任务插入到队列中,直到队列中有空闲并插入成功的时候,否则将根据最大等待时间一直阻塞,直到超时

    1. package org.springframework.integration.util;
    2. import java.util.concurrent.BlockingQueue;
    3. import java.util.concurrent.RejectedExecutionException;
    4. import java.util.concurrent.RejectedExecutionHandler;
    5. import java.util.concurrent.ThreadPoolExecutor;
    6. import java.util.concurrent.TimeUnit;
    7. import org.apache.commons.logging.Log;
    8. import org.apache.commons.logging.LogFactory;
    9. public class CallerBlocksPolicy implements RejectedExecutionHandler {
    10. private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
    11. private final long maxWait;
    12. /**
    13. * @param maxWait The maximum time to wait for a queue slot to be
    14. * available, in milliseconds.
    15. */
    16. public CallerBlocksPolicy(long maxWait) {
    17. this.maxWait = maxWait;
    18. }
    19. @Override
    20. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    21. if (!executor.isShutdown()) {
    22. try {
    23. BlockingQueue<Runnable> queue = executor.getQueue();
    24. if (logger.isDebugEnabled()) {
    25. logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
    26. }
    27. if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
    28. throw new RejectedExecutionException("Max wait time expired to queue task");
    29. }
    30. if (logger.isDebugEnabled()) {
    31. logger.debug("Task execution queued");
    32. }
    33. }
    34. catch (InterruptedException e) {
    35. Thread.currentThread().interrupt();
    36. throw new RejectedExecutionException("Interrupted", e);
    37. }
    38. }
    39. else {
    40. throw new RejectedExecutionException("Executor has been shut down");
    41. }
    42. }
    43. }

    定义好之后如何使用呢?光定义没用的呀,一定要用到线程池中呀,可以通过下面的方式自定义线程池,指定拒绝策略。

    1. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
    2. ThreadPoolExecutor executor = new ThreadPoolExecutor(
    3. 10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());

    execute和submit的区别?

    在前面的讲解中,我们执行任务是用的execute方法,除了execute方法,还有一个submit方法也可以执行我们提交的任务。

    这两个方法有什么区别呢?分别适用于在什么场景下呢?我们来做一个简单的分析。

    execute适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了

    1. public class ThreadPool {
    2. public static void main(String[] args) {
    3. ExecutorService pool = Executors.newFixedThreadPool(10);
    4. pool.execute(() -> {
    5. System.out.println(Thread.currentThread().getName() + " 开始发车啦....");
    6. });
    7. }
    8. }

    submit方法适用于需要关注返回值的场景,submit方法的定义如下:

    1. public interface ExecutorService extends Executor {
    2.   ...
    3.   <T> Future<T> submit(Callable<T> task);
    4.   <T> Future<T> submit(Runnable task, T result);
    5.   Future<?> submit(Runnable task);
    6.   ...
    7. }

    其子类AbstractExecutorService实现了submit方法,可以看到无论参数是Callable还是Runnable,最终都会被封装成RunnableFuture,然后再调用execute执行。

    1. /**
    2. * @throws RejectedExecutionException {@inheritDoc}
    3. * @throws NullPointerException {@inheritDoc}
    4. */
    5. public Future<?> submit(Runnable task) {
    6. if (task == null) throw new NullPointerException();
    7. RunnableFuture<Void> ftask = newTaskFor(task, null);
    8. execute(ftask);
    9. return ftask;
    10. }
    11. /**
    12. * @throws RejectedExecutionException {@inheritDoc}
    13. * @throws NullPointerException {@inheritDoc}
    14. */
    15. public <T> Future<T> submit(Runnable task, T result) {
    16. if (task == null) throw new NullPointerException();
    17. RunnableFuture<T> ftask = newTaskFor(task, result);
    18. execute(ftask);
    19. return ftask;
    20. }
    21. /**
    22. * @throws RejectedExecutionException {@inheritDoc}
    23. * @throws NullPointerException {@inheritDoc}
    24. */
    25. public <T> Future<T> submit(Callable<T> task) {
    26. if (task == null) throw new NullPointerException();
    27. RunnableFuture<T> ftask = newTaskFor(task);
    28. execute(ftask);
    29. return ftask;
    30. }

    下面我们来看看这三个方法分别如何去使用:

    submit(Callable task);

    1. public class ThreadPool {
    2. public static void main(String[] args) throws Exception {
    3. ExecutorService pool = Executors.newFixedThreadPool(10);
    4. Future<String> future = pool.submit(new Callable<String>() {
    5. @Override
    6. public String call() throws Exception {
    7. return "Hello";
    8. }
    9. });
    10. String result = future.get();
    11. System.out.println(result);
    12. }
    13. }

    submit(Runnable task, T result);

    1. public class ThreadPool {
    2. public static void main(String[] args) throws Exception {
    3. ExecutorService pool = Executors.newFixedThreadPool(10);
    4. Data data = new Data();
    5. Future<Data> future = pool.submit(new MyRunnable(data), data);
    6. String result = future.get().getName();
    7. System.out.println(result);
    8. }
    9. }
    10. class Data {
    11. String name;
    12. public String getName() {
    13. return name;
    14. }
    15. public void setName(String name) {
    16. this.name = name;
    17. }
    18. }
    19. class MyRunnable implements Runnable {
    20. private Data data;
    21. public MyRunnable(Data data) {
    22. this.data = data;
    23. }
    24. @Override
    25. public void run() {
    26. data.setName("yinjihuan");
    27. }
    28. }

    Future<?> submit(Runnable task);
    直接submit一个Runnable是拿不到返回值的,返回值就是null.

    五种线程池的使用场景

    • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。

    • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。

    • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。

    • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。

    • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。

    线程池的关闭

    关闭线程池可以调用shutdownNow和shutdown两个方法来实现

    shutdownNow:对正在执行的任务全部发出interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表

    1. public class ThreadPool {
    2. public static void main(String[] args) throws Exception {
    3. ExecutorService pool = Executors.newFixedThreadPool(1);
    4. for (int i = 0; i < 5; i++) {
    5. System.err.println(i);
    6. pool.execute(() -> {
    7. try {
    8. Thread.sleep(30000);
    9. System.out.println("--");
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. });
    14. }
    15. Thread.sleep(1000);
    16. List<Runnable> runs = pool.shutdownNow();
    17. }
    18. }

    上面的代码模拟了立即取消的场景,往线程池里添加5个线程任务,然后sleep一段时间,线程池只有一个线程,如果此时调用shutdownNow后应该需要中断一个正在执行的任务和返回4个还未执行的任务,控制台输出下面的内容:

    1. 0
    2. 1
    3. 2
    4. 3
    5. 4
    6. [fs.ThreadPool$$Lambda$1/990368553@682a0b20,
    7. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
    8. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
    9. fs.ThreadPool$$Lambda$1/990368553@682a0b20]
    10. java.lang.InterruptedException: sleep interrupted
    11. at java.lang.Thread.sleep(Native Method)
    12. at fs.ThreadPool.lambda$0(ThreadPool.java:15)
    13. at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
    14. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    15. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    16. at java.lang.Thread.run(Thread.java:745)

    shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务

    1. public class ThreadPool {
    2. public static void main(String[] args) throws Exception {
    3. ExecutorService pool = Executors.newFixedThreadPool(1);
    4. for (int i = 0; i < 5; i++) {
    5. System.err.println(i);
    6. pool.execute(() -> {
    7. try {
    8. Thread.sleep(30000);
    9. System.out.println("--");
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. });
    14. }
    15. Thread.sleep(1000);
    16. pool.shutdown();
    17. pool.execute(() -> {
    18. try {
    19. Thread.sleep(30000);
    20. System.out.println("--");
    21. } catch (Exception e) {
    22. e.printStackTrace();
    23. }
    24. });
    25. }
    26. }

    上面的代码模拟了正在运行的状态,然后调用shutdown,接着再往里面添加任务,肯定是拒绝添加的,请看输出结果:

    1. 0
    2. 1
    3. 2
    4. 3
    5. 4
    6. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
    7. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    8. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    9. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    10. at fs.ThreadPool.main(ThreadPool.java:24)

    还有一些业务场景下需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,isShutdown只是返回你是否调用过shutdown的结果。

    1. public class ThreadPool {
    2. public static void main(String[] args) throws Exception {
    3. ExecutorService pool = Executors.newFixedThreadPool(1);
    4. for (int i = 0; i < 5; i++) {
    5. System.err.println(i);
    6. pool.execute(() -> {
    7. try {
    8. Thread.sleep(3000);
    9. System.out.println("--");
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. });
    14. }
    15. Thread.sleep(1000);
    16. pool.shutdown();
    17. while(true){
    18. if(pool.isTerminated()){
    19. System.out.println("所有的子线程都结束了!");
    20. break;
    21. }
    22. Thread.sleep(1000);
    23. }
    24. }
    25. }

    自定义线程池

    在实际的使用过程中,大部分我们都是用Executors去创建线程池直接使用,如果有一些其他的需求,比如指定线程池的拒绝策略,阻塞队列的类型,线程名称的前缀等等,我们可以采用自定义线程池的方式来解决。

    如果只是简单的想要改变线程名称的前缀的话可以自定义ThreadFactory来实现,在Executors.new…中有一个ThreadFactory的参数,如果没有指定则用的是DefaultThreadFactory。

    自定义线程池核心在于创建一个ThreadPoolExecutor对象,指定参数

    下面我们看下ThreadPoolExecutor构造函数的定义:

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) ;
    • corePoolSize
      线程池大小,决定着新提交的任务是新开线程去执行还是放到任务队列中,也是线程池的最最核心的参数。一般线程池开始时是没有线程的,只有当任务来了并且线程数量小于corePoolSize才会创建线程。
    • maximumPoolSize
      最大线程数,线程池能创建的最大线程数量。
    • keepAliveTime
      在线程数量超过corePoolSize后,多余空闲线程的最大存活时间。
    • unit
      时间单位
    • workQueue
      存放来不及处理的任务的队列,是一个BlockingQueue。
    • threadFactory
      生产线程的工厂类,可以定义线程名,优先级等。
    • handler
      拒绝策略,当任务来不及处理的时候,如何处理, 前面有讲解。

    了解上面的参数信息后我们就可以定义自己的线程池了,我这边用ArrayBlockingQueue替换了LinkedBlockingQueue,指定了队列的大小,当任务超出队列大小之后使用CallerRunsPolicy拒绝策略处理。

    这样做的好处是严格控制了队列的大小,不会出现一直往里面添加任务的情况,有的时候任务处理的比较慢,任务数量过多会占用大量内存,导致内存溢出。

    当然你也可以在提交到线程池的入口进行控制,比如用CountDownLatch, Semaphore等。

    1. /**
    2. * 自定义线程池<br>
    3. * 默认的newFixedThreadPool里的LinkedBlockingQueue是一个无边界队列,如果不断的往里加任务,最终会导致内存的不可控<br>
    4. * 增加了有边界的队列,使用了CallerRunsPolicy拒绝策略
    5. * @author yinjihuan
    6. *
    7. */
    8. public class FangjiaThreadPoolExecutor {
    9. private static ExecutorService executorService = newFixedThreadPool(50);
    10. private static ExecutorService newFixedThreadPool(int nThreads) {
    11. return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    12. new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());
    13. }
    14. public static void execute(Runnable command) {
    15. executorService.execute(command);
    16. }
    17. public static void shutdown() {
    18. executorService.shutdown();
    19. }
    20. static class DefaultThreadFactory implements ThreadFactory {
    21. private static final AtomicInteger poolNumber = new AtomicInteger(1);
    22. private final ThreadGroup group;
    23. private final AtomicInteger threadNumber = new AtomicInteger(1);
    24. private final String namePrefix;
    25. DefaultThreadFactory() {
    26. SecurityManager s = System.getSecurityManager();
    27. group = (s != null) ? s.getThreadGroup() :
    28. Thread.currentThread().getThreadGroup();
    29. namePrefix = "FSH-pool-" +
    30. poolNumber.getAndIncrement() +
    31. "-thread-";
    32. }
    33. public Thread newThread(Runnable r) {
    34. Thread t = new Thread(group, r,
    35. namePrefix + threadNumber.getAndIncrement(),
    36. 0);
    37. if (t.isDaemon())
    38. t.setDaemon(false);
    39. if (t.getPriority() != Thread.NORM_PRIORITY)
    40. t.setPriority(Thread.NORM_PRIORITY);
    41. return t;
    42. }
    43. }
    44. }
  • 相关阅读:
    有点忙啊
    什么是协程
    HDU 1110 Equipment Box (判断一个大矩形里面能不能放小矩形)
    HDU 1155 Bungee Jumping(物理题,动能公式,弹性势能公式,重力势能公式)
    HDU 1210 Eddy's 洗牌问题(找规律,数学)
    HDU1214 圆桌会议(找规律,数学)
    HDU1215 七夕节(模拟 数学)
    HDU 1216 Assistance Required(暴力打表)
    HDU 1220 Cube(数学,找规律)
    HDU 1221 Rectangle and Circle(判断圆和矩形是不是相交)
  • 原文地址:https://www.cnblogs.com/chengjun/p/9035854.html
Copyright © 2011-2022 走看看