zoukankan      html  css  js  c++  java
  • Callable+Future+newFixedThreadPool的应用

      最近在处理很多的数据,数据量比较大,但是处理的相对简单一些,没有什么复杂的业务逻辑,然后就使用了多线程去处理。因为一直停留在Thread和Runnable的知识中,项目中使用Callable,刚好可以学习新的东西,就使用了Callable和Future结合加上Executors.newFixedThreadPool()。

    一、Callable和Future基础知识

      Thread和Runnable这2个很多人都知道并且使用过,可能Callable相对陌生一些,future应该更加陌生,他们2个一个生成结果一个接受结果。Thread和Runnable实现的线程不会返回结果,Callable相对特殊一些,他会返回结果,这个结果可以被Future拿到,也就是说,Future可以拿到异步执行任务的结果。我们先看一下Callable类:

    package java.util.concurrent;
    
    /**
     *一个带有返回结果并可能引发异常的任务.实现定义了一个没有调用参数
     *的方法call。
     *Callable接口类似于{@link java.lang.Runnable},因为它们都是为其
     *实例可能被另一个线程执行的类设计的。然而,Runnable不返回结果,
     *也不能抛出被检查的异常。{@link Executors}类包含从其他常用形式
     *转换为Callable类的实用程序方法。
      */
    public interface Callable<V> {
    
        /**
        * 计算一个结果,如果不能这样做,就会抛出一个异常。.
        *
        * @return 结算结果
        * @throws 如果无法计算结果,则抛出异常
        */
        V call() throws Exception;
    }

      Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future接口:

    public interface Future<V> {
        
        /**
        * 尝试取消执行此任务。 如果任务已经完成,已经被取消或由于某种其他原因而无法取消,则此尝试将失败。
        * 如果成功,并且调用<tt> cancel </ tt>时,此任务尚未启动,则此任务不应运行。 如果任务已经开始,
        * 那么<tt> mayInterruptIfRunning </ tt>参数决定了执行该任务的线程是否应该被中断以试图停止该任务。
        */
        boolean cancel( boolean mayInterruptIfRunning );
    
        /**
         * 如果此任务在正常完成之前已被取消,返回true
         * @return <tt>true</tt> 如果此任务在正常完成之前已被取消,返回true
         */
        boolean isCancelled();
    
        /**
         * 如果任务已经完成返回true
         * 完成可能是由于正常终止,异常或异常 - 在所有这些情况下,此方法都将返回<tt> true </ tt>。
         * @return <tt>true</tt> 如果任务已经完成返回true
         */
        boolean isDone();
    
        /**
         * 等待计算完成,然后获得其结果。
         *
         */
        V get() throws InterruptedException, ExecutionException;
    
        /**
         * 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。
         *
         */
        V get( long timeout, TimeUnit unit )
        throws InterruptedException, ExecutionException, TimeoutException;
    }

    二、线程池之固定线程池newFixedThreadPool

      创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。Executors.newFixedThreadPool(10)的实现如下:

    public static ExecutorService newFixedThreadPool( int nThreads )
    {
        return(new ThreadPoolExecutor( nThreads, nThreads,
                           0L, TimeUnit.MILLISECONDS,
                           new LinkedBlockingQueue<Runnable>() ) );
    }

      nThreads :是固定线程数,在了解newFixedThreadPool之前我们先了解一下ThreadPoolExecutor,ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。ThreadPoolExecutor的构造方法如下:

    public ThreadPoolExecutor( int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue )
    {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public ThreadPoolExecutor( int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue,
                   ThreadFactory threadFactory,
                   RejectedExecutionHandler handler )
    {
        if ( corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0 )
            throw new IllegalArgumentException();
        if ( workQueue == null || threadFactory == null || handler == null )
            throw new NullPointerException();
        this.corePoolSize    = corePoolSize;
        this.maximumPoolSize    = maximumPoolSize;
        this.workQueue        = workQueue;
        this.keepAliveTime    = unit.toNanos( keepAliveTime );
        this.threadFactory    = threadFactory;
        this.handler        = handler;
    }

    ThreadPoolExecutor构造方法参数讲解: 

    ThreadPoolExecutor构造方法参数
    参数名 作用
    corePoolSize 核心线程池大小
    maximumPoolSize 最大线程池大小
    keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
    TimeUnit keepAliveTime时间单位
    workQueue 阻塞任务队列
    threadFactory 新建线程工厂
    RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

      这样我们在回过头来看newFixedThreadPool的实现,核心线程池大小和最大线程池大小都是传入进去的数字,keepAliveTime为0,时间单位为TimeUnit.MILLISECONDS毫秒,对列为LinkedBlockingQueue,线程池工厂为默认,RejectedExecutionHandler为默认。这样我们就知道newFixedThreadPool的代码实现了。关于ThreadPoolExecutor自己的构建请自行了解。
    三、实际应用

    package com.roc.thread;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class FixedThreadPoolTest {
        public static void main( String[] args )
        {
            FixedThreadPoolTest fixedThreadPoolTest = new FixedThreadPoolTest();
            fixedThreadPoolTest.execut();
        }
    
        private void execut()
        {
            ExecutorService        executorService = Executors.newFixedThreadPool( 10 );/* 创建为10个线程的固定线程池 */
            List<Integer>        datas        = new ArrayList<Integer>( 100 );
            List<Future<Integer> >    results        = new ArrayList<Future<Integer> >();
            int            count        = 0;
            for ( int i = 0; i < 100; i++ )/* 实际项目中数据可以查数据库或者文件,这里仅仅表示模拟 */
            {
                datas.add( i );
            }
            for ( int i = 0; i < datas.size(); i++ )
            {
                results.add( executorService.submit( new executTask( datas.get( i ) ) ) );
            }
            try {
                for ( Future<Integer> future : results )
                {
                    count += future.get();
                }
            } catch ( InterruptedException e ) {
                /* TODO Auto-generated catch block */
                e.printStackTrace();
            } catch ( ExecutionException e ) {
                /* TODO Auto-generated catch block */
                e.printStackTrace();
            }
            System.out.println( Thread.currentThread() + "处理数据总数:" + count );
        }
    
    
        class executTask implements Callable<Integer> {
            private int data;
    
            public executTask( int data )
            {
                this.data = data;
            }
    
    
            @Override
            public Integer call() throws Exception
            {
                try {
                    System.out.println( Thread.currentThread() + "处理完数据:" + data ); /* 实际项目中这里可以处理业务逻辑 */
                } catch ( Exception e ) {
                    return(-1);
                }
                return(1);
            }
        }
    }            

    结果:

    Thread[pool-1-thread-10,5,main]处理完数据:9
    Thread[pool-1-thread-4,5,main]处理完数据:97
    Thread[pool-1-thread-9,5,main]处理完数据:95
    Thread[pool-1-thread-3,5,main]处理完数据:96
    Thread[pool-1-thread-7,5,main]处理完数据:94
    Thread[pool-1-thread-4,5,main]处理完数据:98
    Thread[pool-1-thread-5,5,main]处理完数据:99
    Thread[main,5,main]处理数据总数:100

  • 相关阅读:
    【Linux】Apache服务配置
    【Linux】LAMP环境搭建(简易版)
    【Linux】网络应用
    【Linux】系统管理
    【Linux】Linux(一)Linux常用命令
    【php】PDO
    【php】COOKIE和SESSION
    【php】面向对象(五)
    【php】面向对象(四)
    【php】面向对象(三)
  • 原文地址:https://www.cnblogs.com/liaoweipeng/p/7440581.html
Copyright © 2011-2022 走看看