zoukankan      html  css  js  c++  java
  • 企业搜索引擎开发之连接器connector(二十二)

    下面来分析线程执行类,线程池ThreadPool类

    对该类的理解需要对java的线程池比较熟悉

    该类引用了一个内部类

    /**
       * The lazily constructed LazyThreadPool instance.
       */
      private LazyThreadPool lazyThreadPool;

    该成员实现了单例模式,即该对象只有一个实例,属于懒汉式单例模式,当实例化该成员时,启用了线程同步机制

    /**
       * Shut down the {@link ThreadPool}. After this returns
       * {@link ThreadPool#submit(TimedCancelable)} will return null.
       *
       * @param interrupt {@code true} if the threads executing tasks task should
       *        be interrupted; otherwise, in-progress tasks are allowed to complete
       *        normally.
       * @param waitMillis maximum amount of time to wait for tasks to complete.
       * @return {@code true} if all the running tasks terminated and
       *         {@code false} if the some running task did not terminate.
       * @throws InterruptedException if interrupted while waiting.
       */
      synchronized boolean shutdown(boolean interrupt, long waitMillis)
          throws InterruptedException {
        isShutdown = true;
        if (lazyThreadPool == null) {
          return true;
        } else {
          return lazyThreadPool.shutdown(interrupt, waitMillis);
        }
      }
    
      /**
       * Return a LazyThreadPool.
       */
      private synchronized LazyThreadPool getInstance() {
        if (lazyThreadPool == null) {
          lazyThreadPool = new LazyThreadPool();
        }
        return lazyThreadPool;
      }

    线程提交方法如下

    /**
       * Submit a {@link Cancelable} for execution and return a
       * {@link TaskHandle} for the running task or null if the task has not been
       * accepted. After {@link ThreadPool#shutdown(boolean, long)} returns this
       * will always return null.
       */
      public TaskHandle submit(Cancelable cancelable) {
        if (isShutdown) {
          return null;
        }
        if (cancelable instanceof TimedCancelable && maximumTaskLifeMillis != 0L) {
          return getInstance().submit((TimedCancelable) cancelable);
        } else {
          return getInstance().submit(cancelable);
        }
      }

    这里针对Cancelable对象类型和TimedCancelable类型提交到了不同的方法(多态)

    提交Cancelable类型对象比较简单,提交任务后获取操作句柄

    /**
         * 提交任务2 获得操作句柄
         * Submit a {@link Cancelable} for execution and return a
         * {@link TaskHandle} for the running task or null if the task has not been
         * accepted. After {@link LazyThreadPool#shutdown(boolean, long)} returns
         * this will always return null.
         */
        TaskHandle submit(Cancelable cancelable) {
          try {
            // taskFuture is used to cancel 'cancelable' and to determine if
            // 'cancelable' is done.
            Future<?> taskFuture = completionService.submit(cancelable, null);
            return new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());
          } catch (RejectedExecutionException re) {
            if (!executor.isShutdown()) {
              LOGGER.log(Level.SEVERE, "Unable to execute task", re);
            }
            return null;
          }
        }

    而提交TimedCancelable类型对象则相对比较复杂

    基本思路是,首先启动一个延迟执行的线程,即在指定的时间延迟后执行TimedCancelable类型对象的timeout()方法,即取消另外一个线程的执行,即超时检测线程;

    然后启动另外一个线程执行TimedCancelable类型对象的run()方法,同时执行完毕后,则同时取消上面的超时检测线程(如果指定时间内未执行完毕,则由超时检测线程来取消执行)。

    这里我们可以类比于守护线程与用户线程的关系,前面的超时检测线程好比守护线程,后者好比用户线程,当用户线程执行完毕后,守护线程也就没有存在的必要了

    提交TimedCancelable类型对象方法如下

     /**
         * 提交任务1
         * Submit a {@link TimedCancelable} for execution and return a
         * {@link TaskHandle} for the running task or null if the task has not been
         * accepted. After {@link LazyThreadPool#shutdown(boolean, long)} returns
         * this will always return null.
         */
        TaskHandle submit(TimedCancelable cancelable) {
          try {
            // When timeoutTask is run it will cancel 'cancelable'.
            TimeoutTask timeoutTask = new TimeoutTask(cancelable);
    
            // Schedule timeoutTask to run when 'cancelable's maximum run interval
            // has expired.
            // timeoutFuture will be used to cancel timeoutTask when 'cancelable'
            // completes.
            //延迟执行        
            Future<?> timeoutFuture = timeoutService.schedule(timeoutTask,
                maximumTaskLifeMillis, TimeUnit.MILLISECONDS);
            
            //cancelable执行完毕之后,超时线程不再执行
            // cancelTimeoutRunnable runs 'cancelable'. When 'cancelable' completes
            // cancelTimeoutRunnable cancels 'timeoutTask'. This saves system
            // resources. In addition it prevents timeout task from running and
            // calling cancel after 'cancelable' completes successfully.
            CancelTimeoutRunnable cancelTimeoutRunnable =
                new CancelTimeoutRunnable(cancelable, timeoutFuture);
    
            // taskFuture is used to cancel 'cancelable' and to determine if
            // 'cancelable' is done.
            Future<?> taskFuture =
                completionService.submit(cancelTimeoutRunnable, null);
            TaskHandle handle =
                new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());
            
            
    
            // TODO(strellis): test/handle timer pop/cancel before submit. In
            // production with a 30 minute timeout this should never happen.
            timeoutTask.setTaskHandle(handle);
            return handle;
          } catch (RejectedExecutionException re) {
            if (!executor.isShutdown()) {
              LOGGER.log(Level.SEVERE, "Unable to execute task", re);
            }
            return null;
          }
        }

    首先构造超时检测任务对象,该类为静态内部类

    /**
       * 静态内部类  检测线程超时
       * A task that cancels another task that is running a {@link TimedCancelable}.
       * The {@link TimeoutTask} should be scheduled to run when the interval for
       * the {@link TimedCancelable} to run expires.
       */
      private static class TimeoutTask implements Runnable {
        final TimedCancelable timedCancelable;
        private volatile TaskHandle taskHandle;
    
        TimeoutTask(TimedCancelable timedCancelable) {
          this.timedCancelable = timedCancelable;
        }
    
        public void run() {
          if (taskHandle != null) {
            timedCancelable.timeout(taskHandle);
          }
        }
    
        void setTaskHandle(TaskHandle taskHandle) {
          this.taskHandle = taskHandle;
        }
      }

    然后延迟执行该线程,获得Future<?> timeoutFuture线程句柄

    // Schedule timeoutTask to run when 'cancelable's maximum run interval
            // has expired.
            // timeoutFuture will be used to cancel timeoutTask when 'cancelable'
            // completes.
            //延迟执行       
            Future<?> timeoutFuture = timeoutService.schedule(timeoutTask,
                maximumTaskLifeMillis, TimeUnit.MILLISECONDS);

    然后构造CancelTimeoutRunnable对象,传入TimedCancelable类型对象和Future<?> timeoutFuture线程句柄

    //cancelable执行完毕之后,超时线程不再执行
            // cancelTimeoutRunnable runs 'cancelable'. When 'cancelable' completes
            // cancelTimeoutRunnable cancels 'timeoutTask'. This saves system
            // resources. In addition it prevents timeout task from running and
            // calling cancel after 'cancelable' completes successfully.
            CancelTimeoutRunnable cancelTimeoutRunnable =
                new CancelTimeoutRunnable(cancelable, timeoutFuture);

    内部类LazyThreadPool的内部类CancelTimeoutRunnable

    /**
        * 内部类LazyThreadPool的内部类1
        * 执行TimedCancelable cancelable的run方法
        * 执行完毕后取消超时线程
        * A {@link Runnable} for running {@link TimedCancelable} that has been
        * guarded by a timeout task. This will cancel the timeout task when the
        * {@link TimedCancelable} completes. If the timeout task has already run,
        * then canceling it has no effect.
        */
       private class CancelTimeoutRunnable implements Runnable {
         private final Future<?> timeoutFuture;
         private final TimedCancelable cancelable;
    
         /**
          * Constructs a {@link CancelTimeoutRunnable}.
          *
          * @param cancelable the {@link TimedCancelable} this runs.
          * @param timeoutFuture the {@link Future} for canceling the timeout task.
          */
         CancelTimeoutRunnable(TimedCancelable cancelable, Future<?> timeoutFuture) {
           this.timeoutFuture = timeoutFuture;
           this.cancelable = cancelable;
         }
    
         public void run() {
           try {
             cancelable.run();
           } finally {
             timeoutFuture.cancel(true);
             timeoutService.purge();
           }
         }
       }

    上面的run方法执行cancelable对象的run方法,执行完毕后取消超时线程并清理队列任务

    后面部分是提交任务并获取线程操作句柄

    // taskFuture is used to cancel 'cancelable' and to determine if
            // 'cancelable' is done.
            Future<?> taskFuture =
                completionService.submit(cancelTimeoutRunnable, null);
            TaskHandle handle =
                new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());
            
            
    
            // TODO(strellis): test/handle timer pop/cancel before submit. In
            // production with a 30 minute timeout this should never happen.
            timeoutTask.setTaskHandle(handle);

    shutdown方法关闭线程池

    /**
         * 关闭线程池任务
         * Shut down the LazyThreadPool.
         * @param interrupt {@code true} if the threads executing tasks task should
         *        be interrupted; otherwise, in-progress tasks are allowed to
         *        complete normally.
         * @param waitMillis maximum amount of time to wait for tasks to complete.
         * @return {@code true} if all the running tasks terminated, or
         *         {@code false} if some running task did not terminate.
         * @throws InterruptedException if interrupted while waiting.
         */
        boolean shutdown(boolean interrupt, long waitMillis)
          throws InterruptedException {
          if (interrupt) {
            executor.shutdownNow();
          } else {
            executor.shutdown();
          }
          if (timeoutService != null) {
            timeoutService.shutdown();
          }
          try {
            return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
          } finally {
            completionExecutor.shutdownNow();
            if (timeoutService != null) {
              timeoutService.shutdownNow();
            }
          }
        }

    内部类LazyThreadPool的内部类CompletionTask用于获取线程结果

    /**
        * 内部类LazyThreadPool的内部类2
        * 获取结果线程
        * A task that gets completion information from all the tasks that run in a
        * {@link CompletionService} and logs uncaught exceptions that cause the
        * tasks to fail.
        */
       private class CompletionTask implements Runnable {
         private void completeTask() throws InterruptedException {
           Future<?> future = completionService.take();
           try {
             future.get();
           } catch (CancellationException e) {
             LOGGER.info("Batch terminated due to cancellation.");
           } catch (ExecutionException e) {
             Throwable cause = e.getCause();
             // TODO(strellis): Should we call cancelable.cancel() if we get an
             // exception?
             if (cause instanceof InterruptedException) {
               LOGGER.log(Level.INFO, "Batch terminated due to an interrupt.",
                          cause);
             } else {
               LOGGER.log(Level.SEVERE, "Batch failed with unhandled exception: ",
                          cause);
             }
           }
         }
    
         public void run() {
           try {
             while (!Thread.currentThread().isInterrupted()) {
               completeTask();
             }
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
           LOGGER.info("Completion task shutdown.");
         }
       }
      }

    ThreadNamingThreadFactory为静态内部类,用于构造线程对象

    /**
       * A {@link ThreadFactory} that adds a prefix to thread names assigned
       * by {@link Executors#defaultThreadFactory()} to provide diagnostic
       * context in stack traces.
       */
      private static class ThreadNamingThreadFactory implements ThreadFactory {
        private final ThreadFactory delegate = Executors.defaultThreadFactory();
        private final String namePrefix;
    
        ThreadNamingThreadFactory(String namePrefix) {
          this.namePrefix = namePrefix + "-";
        }
    
        public Thread newThread(Runnable r) {
          Thread t = delegate.newThread(r);
          t.setName(namePrefix + t.getName());
          return t;
        }
      }

    ---------------------------------------------------------------------------

    本系列企业搜索引擎开发之连接器connector系本人原创

    转载请注明出处 博客园 刺猬的温驯

    本人邮箱: chenying998179@163#com (#改为.)

    本文链接 http://www.cnblogs.com/chenying99/p/3775701.html 

  • 相关阅读:
    ASP.NET Core重写个人博客站点小结
    HoloLens开发手记
    HoloToolkit项目源码剖析
    .NET Core手记
    HoloLens开发手记
    HoloLens开发手记
    HoloLens开发手记
    对象序列化为何要定义serialVersionUID的来龙去脉
    jdbc连接sqlserver报错java.lang.ClassNotFoundException: com.microsoft.jdbc.sqlserver.SQLServerDriver
    Javascript eval()函数 基础回顾
  • 原文地址:https://www.cnblogs.com/chenying99/p/3775701.html
Copyright © 2011-2022 走看看