zoukankan      html  css  js  c++  java
  • 线程池:Execution框架

    每问题每线程:在于它没有对已创建线程的数量进行任何限制,除非对客户端能够抛出的请求速率进行限制。

     

    下边 有些图片看不到,清看原地址:http://www.360doc.com/content/10/1027/21/495229_64583490.shtml

     

    无限制创建线程的缺点:

    1.线程生命周期的开销:线程的创建和关闭并不是“免费的”。

    2.资源消耗量:活动线程会消耗系统资源,尤其是内存。

    3.稳定性。

     

    1 线程池(Thread Pool

    在java中,任务执行的首要抽象不是Thread,而是Executor

     

    public interface Executor {

     

        /**

         * Executor只是一个简单的接口,但是他却为一个灵活而且强大的框架创造了基础。

         */

        void execute(Runnable command);

    }

    这个框架可以用于异步任务执行,而且支持很多不同类型的任务执行策略。它还为任务提交和任务执行之间的解耦提供了标准的方法。另外,还提供了对生命周期的支持以及钩子函数,可以添加诸如统计收集、应用程序管理机制和监视器等扩展。

     

    1.1 创建/关闭

    1.newCachedThreadPool

    创建可缓存的线程池,多的话回收,少的话增加,没有限制

    N2.ewFixedThreadPool(int nThreads) 

    创建定长的线程池,每提交一个任务就创建一个线程,直到最大。如果某个以外终止,会补充一个新的

    3.newScheduledThreadPool(int corePoolSize) 

    定长线程池,而且支持定时的以及周期性的任务执行。相当于timer

    4.newSingleThreadExecutor() 

    单线程化的executor,只创建唯一的工作线程来之心吧任务。如果它意外结束,会有另一个取代它。它会保证任务队列所规定的顺序执行。

    5.newSingleThreadScheduledExecutor() 

     创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

     

    private static final Executor 

    exec=Executors.newFixedThreadPool(100);

     

    ServerSocket socket=new ServerSocket(80);

    while(true){

    final Socket connection=socket.accept();

    Runnable task=new Runnable(){

    public void run(){

    handleRequest(connection);

    }

    };

    exec.execute(exec);

    }

     

    1.2 ExecutorService

     

    线程如果无法正常关闭,则会阻止JVM的结束。线程池中的任务,可能已经完成,可能正在运行,其他还有在队列中等待执行。关闭的时候可能平缓的关闭,到唐突的关闭(拔掉电源)。为了解决生命周期的问题,ExecutorService扩展了Executor,并且添加了一些用于生命周期管理的方法。

    public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)

            throws InterruptedException;

    。。。。。

    }

     

    可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorServiceshutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。 

     

    exec.shutdown();

     

    ExecutorService线程池

    java并发编程-Executor框架

    1.3 延迟,并且周期性的任务,newScheduledThreadPool

    Timer的问题:

    1.只能创建唯一的线程来执行所有timer任务。

    2.如果一个timerTask很耗时,会导致其他TimerTask的时效准确性出问题

    3.如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为。Timer不会重新恢复。另外一个Timer中的Task出现异常以后,后面再给这个Timer的任务也将会无法执行。

    1.4 结合DelayQueue

    如果要自己构建调度服务,那还可以考虑使用DelayQueue,它里面的每个对象低耦合一个延迟时间有关联,只有过期以后,DelayQueue才能让你执行take操作获取元素。那么当它里面的对象是FutureTask的时候,就可以构成一个简单的调度队列。

    2 RunnableFutureCallBack

    2.1 Runnable

    Executor框架让定制一个执行策略变得简单,不过想要使用它,你的任务还必须实现Runnable接口。在许多服务器请求中,都存在一个情况,那就是:单一的客户请求。它能执行一些简单的任务,但是他不能返回一个值或者抛出受检查的异常。

     

    2.2 Callback

    很多任务都会引起计算延迟,包括执行数据库查询、从网络上获取资源、进行复杂的计算。这些任务Callback抽象更好。它也可以被Executor框架执行

     
     

     

    Executors包含很多静态方法,可以吧RunnablePrivilegedAction封装为Callable

     

     

    2.3 FutureTask

    RunnableCallable描述的是抽象的计算性任务,这些任务通常是有限的,他们有开始,而且最终会结束。

     

    一个Executor执行的任务有4个周期,创建、提交、开始、完成。由于任务的执行会花很长时间,我们也希望可以取消任务。

     

    Future描述了任务的生命周期,并提供了相关的方法来获得任务的结果、取消任务以及检验任务是否完成还是取消。对应的isDoneisCancelled() 方法,它不能后退,一旦完成,就永远停在完成状态上。用get(等待)获得结果。

     

     

     

        /**  

         * @param args  

         * @throws InterruptedException   

         * @throws ExecutionException   

         */  

    public static void main(String[] args) throws 

    InterruptedException {

    int threadCounts = 19;// 使用的线程数

    long sum = 0;

    ExecutorService exec = Executors.newFixedThreadPool(threadCounts);

    List<Callable<Long>> callList = new ArrayList<Callable<Long>>();

    // 生成很大的List

    List<Integer> list = new ArrayList<Integer>();

    for (int i = 0; i <= 1000000; i++) {

    list.add(i);

    }

    int len = list.size() / threadCounts;// 平均分割List

    // List中的数量没有线程数多(很少存在)

    if (len == 0) {

    threadCounts = list.size();// 采用一个线程处理List中的一个元素

    len = list.size() / threadCounts;// 重新平均分割List

    }

    for (int i = 0; i < threadCounts; i++) {

    final List<Integer> subList;

    if (i == threadCounts - 1) {

    subList = list.subList(i * len, list.size());

    else {

    subList = list.subList(i * len,

    len * (i + 1) > list.size() ? list.size() : len

    * (i + 1));

    }

    // 采用匿名内部类实现

    callList.add(new Callable<Long>() {

    public Long call() throws Exception {

    long subSum = 0L;

    for (Integer i : subList) {

    subSum += i;

    }

    System.out.println("分配给线程:"

    + Thread.currentThread().getName()

    + "那一部分List的整数和为: SubSum:" + subSum);

    return subSum;

    }

    });

    }

    List<Future<Long>> futureList = exec.invokeAll(callList);

    for (Future<Long> future : futureList) {

    sum += future.get();

    }

    exec.shutdown();

    System.out.println(sum);

    }   

      

     

     

    2.4 和Service的结合

    RunnableCallable类都可以通过Servicesubmit方法提交,并且返回一个Future,它表示这个任务,可以获得该任务的执行结果或者取消它。

     

    另外,可以给为Runnable/Callable显示的实例化一个FutureTask

    Callable pAccount = new PrivateAccount();

    FutureTask futureTask = new FutureTask(pAccount);

    3 CompletionService

    向Executor提交一个批处理任务,并且希望获得结果,那么你将会使用Future,然后不断的调用isDone来检验是否完成,这样太麻烦,还有更好的方法,那就是完成服务,CompletionServicepoll方法不会等待,返回nulltake方法会等待

     

    它整合了ExecutorBlockingQueue的功能,你可以将Callable任务交给他执行,然后使用类似于队列中的takepoll方法,在结果完整时可用时获得这个结果。ExecutorCompletionService是它的实现类

     

    它的实现也比较简单,在构造函数中创建一个BlockingQueue,用它保存结果:

    private final BlockingQueue<Future<V>> completionQueue;

     

    提交的任务被包装成QueueFuture

    private class QueueingFuture extends FutureTask<Void> {

            QueueingFuture(RunnableFuture<V> task) {

                super(task, null);

                this.task = task;

            }

            protected void done() { completionQueue.add(task); }

            private final Future<V> task;

        }

    覆写done方法,将结果置入BlockingQueue

     

    它与上面获得一堆FutureTask,然后遍历的去get等返回还不一样。它只能一个个获取,代表有一个拿一个。FutureTaskget可能后面的FutureTask都已经好了,可是有一个还没好,那就卡在中间了

  • 相关阅读:
    Java 常见异常种类
    Oracle存储过程记录异常
    oracle定时器
    oracle生成单据号
    oracle计算时间秒数差
    oracle分组后取每组第一条数据
    was部分更新
    数据库分区
    JTA事务管理
    docker
  • 原文地址:https://www.cnblogs.com/wzhanke/p/4422027.html
Copyright © 2011-2022 走看看