zoukankan      html  css  js  c++  java
  • 四、多线程基础-线程池的创建和使用

    1 、ThreadPoolExecutor理解
      Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因此java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布)
      Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其实也只是ThreadPoolExecutor的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。
    corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
    maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
    keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止;
    unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性;

    2、线程池的创建和使用(示列4个)
     Java5 中并发库中,线程池创建线程大致可以分为下面四种:
    1)创建固定大小的线程池 ExecutorService(接口):表示一个异步执行机制,使我们能够在后台执行任务

    ExecutorService fPool = Executors.newFixedThreadPool(2);ExecutorService 很类似于一个线程池表示使用 newFixedThreadPool() 工厂方法创建一个 ExecutorService, 这里创建了一个2个线程执行任务的线程池从下面代码的运行看:Thread 类都是在线程池中运行的,线程池再执行 execute 方法来执行 Thread 类;
    中的 run 方法。不管 execute 执行几次,线程池始终都会使用 2 个线程来处理。不会再去创建出其他线程来处理run 方法执行。这就是固定大小线程池
    2)创建缓存大小的线程池(即创建可变任务线程池)
    ExecutorService cPool = Executors.newCachedThreadPool();
    从下面代码的运行看:可变任务线程池在执行 execute 方法来执行 Thread 类中的 run 方法。这里 execute 执行多次,线程池就会创建出多个线程来处理 Thread 类中 run 方法。所有我们看到连接池会根据执行的情况,在程序运行时创建多个线程来处理,这里就是可变连接池的特点。
    3)创建单一的线程池
    ExecutorService sPool = Executors.newSingleThreadExecutor();
    下面代码的运行看:单任务线程池在执行 execute 方法来执行 Thread 类中的 run 方法。不管 execute 执行几次,线程池始终都会使用单个线程来处理。
    备注:在 java 的多线程中,一但线程关闭,就会成为死线程。关闭后死线程就没有办法在启动了。再次启动就会出现异常信息:Exception in thread "main" java.lang.IllegalThreadStateException。那么如何解决这个问题呢?我们这里就可以使用 Executors.newSingleThreadExecutor()来再次启动一个线程
    4)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    public class ThreadPoolTest {
        public static void main(String[] args) {
            /*
            //1)创建一个可重用固定线程数的线程池
            ExecutorService pool = Executors.newFixedThreadPool(2);
            */
            /*
            //2)创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
            ExecutorService pool = Executors.newSingleThreadExecutor();
            */
            //3)创建可变连接池来运行该线程
            ExecutorService pool = Executors.newCachedThreadPool();
            
            // 创建实现了 Runnable 接口对象,Thread 对象当然也实现了 Runnable 接口
            Thread t1 = new MyThread();
            Thread t2 = new MyThread();
            Thread t3 = new MyThread();
            Thread t4 = new MyThread();
            Thread t5 = new MyThread();
            //将线程放入池中进行执行
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            pool.execute(t4);
            pool.execute(t5);
            // 关闭线程池
            pool.shutdown();
        }
    }
    class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "正在执行。。。");
        }
    }
    View Code

    3、ExecutorService 使用

    有几种不同的方式来将任务委托给 ExecutorService 去执行:
    1)execute(Runnable):方法要求一个 java.lang.Runnable 对象,然后对它进行异步执行。

    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    public class ThreadPoolSchedule {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。ScheduledExecutorService(接口)
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
            //创建实现了 Runnable 接口对象,Thread 对象当然也实现了 Runnable 接口
            Thread t1 = new MyThread();
            Thread t2 = new MyThread();
            Thread t3 = new MyThread();
            Thread t4 = new MyThread();
            Thread t5 = new MyThread();
            //将线程放入池中进行执行
            t1.setName("t1");
            t2.setName("t2");
            t3.setName("t3");
            t4.setName("t4");
            t5.setName("t5");
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            //使用定时执行风格的方法
            pool.schedule(t4, 10, TimeUnit.SECONDS); //t4 和 t5 在 10 秒后执行
            pool.schedule(t5, 10, TimeUnit.SECONDS);
            //关闭线程池
            pool.shutdown();
            }
        }
    class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "正在执行。。。");
        }
    }
    View Code

    2)submit(Runnable):方法也要求一个 Runnable 实现类,但它返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。
    3)submit(Callable):Callable 实例除了它的 call() 方法能够返回一个结果之外和一个 Runnable 很相像。Runnable.run() 不能够返回一个结果。Callable 的结果可以通过 submit(Callable) 方法返回的 Future 对象进行获取。

    import java.util.Date;
    import java.util.concurrent.Callable;
    
    public class MyCallable implements Callable<Object> {
        private String taskNum;
    
        MyCallable(String taskNum) {
         this.taskNum = taskNum;
        }
    
        public Object call() throws Exception {
         System.out.println(">>>" + taskNum + "任务启动");
         Date dateTmp1 = new Date();
         Thread.sleep(1000);
         Date dateTmp2 = new Date();
         long time = dateTmp2.getTime() - dateTmp1.getTime();
         System.out.println(">>>" + taskNum + "任务终止");
         return taskNum + "----任务返回运行结果,当前任务时间【" + time + "毫秒】";
        }
    }
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class TestMyCallable {
        public static void main(String[] args) throws ExecutionException,
                InterruptedException {
            System.out.println("----程序开始运行----");
            Date date1 = new Date();
            int taskSize = 5;
            /*
             * ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
             */
            // 1、创建一个线程池
            ExecutorService pool = Executors.newFixedThreadPool(taskSize);//创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
            // 创建多个有返回值的任务
            List<Future> list = new ArrayList<Future>();
            for (int i = 0; i < taskSize; i++) {
                Callable c = new MyCallable(i + " ");
                // 执行任务并获取 Future 对象
                Future f = pool.submit(c);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
                list.add(f);
            }
            // 关闭线程池
            pool.shutdown();
            // 获取所有并发任务的运行结果
            for (Future f : list) {
                // 从 Future 对象上获取任务的返回值,并输出到控制台
                System.out.println(">>>" + f.get().toString());//f.get() 如有必要,等待计算完成,然后获取其结果。
            }
            Date date2 = new Date();
            System.out.println("----程序结束运行----,程序运行时间【"
                    + (date2.getTime() - date1.getTime()) + "毫秒】");
        }
    }
    View Code

    4)invokeAny(…):方法要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的 结果。无法保证返回的是哪个 Callable 的结果 – 只能表明其中一个已执行结束。

    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    public class ThreadPoolShiYong {
        public static void main(String[] args) throws InterruptedException,
                ExecutionException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Set<Callable<String>> callables = new HashSet<Callable<String>>();
            callables.add(new Callable<String>() {
                public String call() throws Exception {
                    return "Task 1";
                }
            });
            callables.add(new Callable<String>() {
                public String call() throws Exception {
                    return "Task 2";
                }
            });
            callables.add(new Callable<String>() {
                public String call() throws Exception {
                    return "Task 3";
                }
            });
            /* submit(Callable)的使用:
             * String result = executorService.invokeAny(callables);
             * System.out.println("result = " + result); executorService.shutdown();
             */
            
            List<Future<String>> futures = executorService.invokeAll(callables);
            for (Future<String> future : futures) {
                System.out.println("future.get = " + future.get());
            }
            executorService.shutdown();
            
        }
    
    }
    View Code

    5)invokeAll(…) :方法将调用你在集合中传给 ExecutorService 的所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以 获取每个 Callable 的执行结果。记住,一个任务可能会由于一个异常而结束,因此它可能没有 "成功"。无法通过一个 Future 对象来告知我们是两种结束中的哪一种

    备注:shutdown 和 shutdownNow 可以关闭线程池:

    shutdown 只是将空闲的线程 interrupt() 了,shutdown()之前提交的任务可以继续执行直到结束;
    shutdownNow 是 interrupt 所有线程, 因此大部分线程将立刻被中断。之所以是大部分,而不是全部 ,是因为 interrupt()方法能力有限;

    细水长流,打磨濡染,渐趋极致,才是一个人最好的状态。
  • 相关阅读:
    Python入门-函数进阶
    Python入门-初始函数
    Leetcode300. Longest Increasing Subsequence最长上升子序列
    Leetcode139. Word Break单词拆分
    Leetcode279. Perfect Squares完全平方数
    Leetcode319. Bulb Switcher灯泡开关
    Leetcode322. Coin Change零钱兑换
    二叉树三种遍历两种方法(递归和迭代)
    Leetcode145. Binary Tree Postorder Traversal二叉树的后序遍历
    Leetcode515. Find Largest Value in Each Tree Row在每个树行中找最大值
  • 原文地址:https://www.cnblogs.com/jiarui-zjb/p/9600755.html
Copyright © 2011-2022 走看看