zoukankan      html  css  js  c++  java
  • RejectedExecutionException记一次job跑批失败

     1 ERROR c.l.ehr.personnel.batch.job.BaseJob - 执行任务出错Exception
     2 java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f8991b8 rejected from java.util.concurrent.ThreadPoolExecutor@1665effd[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 433]
     3 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_111]
     4 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[na:1.8.0_111]
     5 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[na:1.8.0_111]
     6 at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_111]
     7 at com.lianjia.ehr.personnel.batch.job.index.EmpIndexForOSJob.run(EmpIndexForOSJob.java:94) ~[batch-1.0-SNAPSHOT.jar:na]
     8 at com.lianjia.ehr.personnel.batch.job.BaseJob.execute(BaseJob.java:58) ~[batch-1.0-SNAPSHOT.jar:na]
     9 at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [quartz-2.2.1.jar:na]
    10 at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [quartz-2.2.1.jar:na]

    一次线上跑批失败看了下代码

     1 @QuartzJob(name = "empIndexForOSJob", cronExp = "0 30 2 * * ? *", description = "为OpenSearch服务写empIndex内容")
     2 public class EmpIndexForOSJob extends BaseJob {
     3 
     4     @Autowired
     5     private RunDataForOpenSearchService runDataForOpenSearchService;
     6 
     7     @Autowired
     8     private EmployeeService employeeService;
     9     @Autowired
    10     private PartTimeEmpService partTimeEmpService;
    11 
    12     private final static int threadCount = 10;
    13 
    14     private Logger logger = LoggerFactory.getLogger(EmpIndexForOSJob.class);
    15 
    16     private ExecutorService executor = Executors
    17             .newFixedThreadPool(threadCount,
    18                     new ThreadFactoryBuilder().setNameFormat("listen-empIndexForOSJob-thread-%d")
    19                             .setUncaughtExceptionHandler((t, e) -> {
    20                                 logger.error("empIndexForOSJob process Throw Exception!", e);
    21                             }).build());
    22 
    23     @Override
    24     protected void run(JobExecutionContext context) throws Exception {
    25 
    26         logger.info("EmpIndexForOSJob start!");
    27         long start = System.nanoTime();
    28         // 遍历 emp
    29         List<Long> empIdList = Lists.newArrayList();
    30         empIdList.addAll(employeeService.queryAllEmpId());    //查出全部人员id集合
    31         logger.info("EmpIndexForOSJob running! [empIdList.size = {}]", empIdList.size());
    32         List<Range<Integer>> rangeList = DivideListUtil.divide(empIdList.size(), 1000);
    33         CountDownLatch countDownLatch = new CountDownLatch(rangeList.size());
    34 
    35         for (Range<Integer> range : rangeList) {
    36             logger.info("EmpIndexForOSJob range[{},{}]!", range.lowerEndpoint(), range.upperEndpoint());
    37             List<Long> subEmpIdList = empIdList.subList(range.lowerEndpoint(), range.upperEndpoint());
    38             executor.submit(new Runnable() {
    39                 @Override
    40                 public void run() {
    41                     try {
    42                         for (Long empId : subEmpIdList) {
    43                             try {
    44                                 runDataForOpenSearchService.execute(empId, "employee", ActionType.UPDATE);
    45                             } catch (Throwable throwable) {
    46                                 logger.error("empIndexForOSJob process Throw Exception! [empId={}]", empId);
    47                             }
    48                         }
    49                     } catch (Throwable e) {
    50                         logger.error("empIndexForOSJob process Throw Exception! [e={}]", e);
    51                     } finally {
    52                         countDownLatch.countDown();
    53                     }
    54                 }
    55             });
    56         }
    57         countDownLatch.await();
    58         executor.shutdown();
    59         // 遍历 parttimeEmp
    60         List<Long> partTimeEmpIdList = Lists.newArrayList();
    61         partTimeEmpIdList.addAll(partTimeEmpService.queryAllPartTimeEmpId());  //查出全部相关人员id集合
    62         logger.info("EmpIndexForOSJob running! [partTimeEmpIdList.size = {}]", partTimeEmpIdList.size());
    63         List<Range<Integer>> rangeParttimeList = DivideListUtil.divide(partTimeEmpIdList.size(), 1000);
    64         CountDownLatch parttimeCountDownLatch = new CountDownLatch(rangeParttimeList.size());
    65         for (Range<Integer> range : rangeParttimeList) {
    66             logger.info("EmpIndexForOSJob range[{},{}]!", range.lowerEndpoint(), range.upperEndpoint());
    67             List<Long> subEmpIdList = partTimeEmpIdList.subList(range.lowerEndpoint(), range.upperEndpoint());
    68             executor.submit(new Runnable() {
    69                 @Override
    70                 public void run() {
    71                     try {
    72                         for (Long empId : subEmpIdList) {
    73                             try {
    74                                 runDataForOpenSearchService.execute(empId, "parttime_emp", ActionType.UPDATE);
    75                             } catch (Throwable throwable) {
    76                                 logger.error("empIndexForOSJob process Throw Exception! [parttimeEmpId={}]", empId);
    77                             }
    78                         }
    79                     } catch (Throwable e) {
    80                         logger.error("empIndexForOSJob process Throw Exception! [e={}]", e);
    81                     } finally {
    82                         parttimeCountDownLatch.countDown();
    83                     }
    84                 }
    85             });
    86         }
    87 
    88         parttimeCountDownLatch.await();
    89         executor.shutdown();
    90         long end = System.nanoTime();
    91         logger.info("EmpIndexForOSJob end! costs={}ms", (end - start) / 1000000);
    92     }
    93 
    94 }

     分析:

    看了下报错log,RejectedExecutionException 怀疑和线程池有关系

    原来问题出现在第58行 executor.shutdown();

    线程池的shutdown()方法:

    当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

    线程池的shutdownNow()方法:
    根据JDK文档描述,大致意思是:执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
    它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

    最后:因为后面还有任务要添加不要中途shutdown(),把第58行代码删掉即可,等所有任务都执行完毕在执行shutdown()。

     

     

  • 相关阅读:
    验证码处理函数
    Apache2.2下载及安装
    centos6.4、6.5、7.0环境下载及安装
    数据库实务 实务隔离级别
    InnoDB 锁
    索引常见问题处理
    数据库索引 B-Tree索引 hash索引
    JVM学习-(2)
    jvm学习-(1)
    linux杂记
  • 原文地址:https://www.cnblogs.com/wanghongsen/p/8274653.html
Copyright © 2011-2022 走看看