zoukankan      html  css  js  c++  java
  • JUC回顾之-ThreadPoolExecutor的原理和使用

    Spring中的ThreadPoolTaskExecutor是借助于JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。

    ThreadPoolExecutor的构建参数:

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }

    1. 参数解释
    corePoolSize:         核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
    maximumPoolSize: 线程池维护线程的最大数量
    keepAliveTime:      线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
    unit: 线程池维护线程所允许的空闲时间的单位、可选参数值为:TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
    workQueue: 线程池所使用的缓冲队列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue

    解释:

    用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    • priorityBlockingQuene:具有优先级的无界阻塞队列; 

    handler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。

    解释:

    线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

    2.ThreadPoolExecutor内运转机制:

    具体流程如下:当一个任务通过execute(Runnable)方法欲添加到线程池时:

    • 当池子大小小于corePoolSize就新建线程,并且处理请求。
    • 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理。
    • 当workQueue放不下新任务时,新建线程入池,并处理请求,如果池子的大小撑到maximumPoolSize就用RejectedExecutionHandler拒绝处理。
    • 另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。

     

    3.Exectors

    Exectors工厂类提供了线程池的初始化接口,主要有如下几种:

    newFixedThreadPool

     

    初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。

    注意:由于LinkedBlockingQuene队列的大小是整型的最大值,队列是无界队列,所以不建议直接使用,newFixedThreadPool可能导致内存爆满的问题;

    newCachedThreadPool

     
     

    1、初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
    2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;

    注意:使用该线程池时,一定要注意控制并发的任务数,因为最大的线程数可以达到整型的最大值,线程数是无限的,当并发的任务很多时候,会同时创建大量的线程,会导致严重的性能问题。

    newSingleThreadExecutor 

     

    初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。

    注意:由于LinkedBlockingQuene队列的大小是整型的最大值,队列是无界队列,所以不建议直接使用,newSingleThreadExecutor可能导致内存爆满的问题;

     

    newScheduledThreadPool
     
     

    初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

     

     

     

    3.spingMVC中使用线程池,用做批处理

     

    <?xml version="1.0" encoding="UTF-8"?>
    
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:oscache="http://www.springmodules.org/schema/oscache"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
               http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
               http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
               http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
               http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
               http://www.springmodules.org/schema/oscache http://www.springmodules.org/schema/cache/springmodules-oscache.xsd">
        <!-- spring 加载资源文件的配置 -->
        <context:property-placeholder
            location="classpath:conf/custom/env/config.properties"
            ignore-unresolvable="true" />
                
        <!-- spring 线程池配置 start -->
        <!-- insurance线程池 -->
        <bean id="insuranceTaskExecutor"
            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="${insurance.taskexecutor.corePoolSize}" />
            <property name="maxPoolSize" value="${insurance.taskexecutor.maxPoolSize}" />
            <property name="keepAliveSeconds" value="${insurance.taskexecutor.keepAliveSeconds}" />
            <property name="queueCapacity" value="${insurance.taskexecutor.queueCapacity}" />
        </bean>
    
    </beans>

     config里面对于线程池的配置:

    #insurance task setting
    insurance.taskexecutor.corePoolSize=2
    insurance.taskexecutor.maxPoolSize=2
    insurance.taskexecutor.keepAliveSeconds=30
    insurance.taskexecutor.queueCapacity=1000

    Controller 层部分代码: 

     

    /**
         * 
         * sendInsuranceBatch
         * 
         * @Title: sendInsuranceBatch
         * @Description: TODO 投保批处理
         * @param pojo
         * @param errors
         * @param request
         * @return
         */
        @RequestMapping("/ihotel_insurance_deliver")
        @ResponseBody
        public String sendInsuranceBatch(
                @ModelAttribute("pojo") IhotelBatchPojo pojo, Errors errors,
                HttpServletRequest request) {
            String remoteIp = WebUtils.getRemoteIpAddress(request);
            String serverIP = IPUtil.getServerIp();
            BaseResultInfo baseResultInfo = null;
            // 验证入参是否正确
            try {
                // 1,校验参数
                pojo.validate(pojo, errors);
                if (errors.hasErrors()) {
                    baseResultInfo = new BaseResultInfo(
                            ResultType.PARAMETER_VERIFY_FAILUE);
                    baseResultInfo.appendRetdesc("#" + getErrMsg(errors));
                    return JsonUtil.BeanToJson(baseResultInfo);
                }
                ihotelInsuranceBatchService.processIhotelInsuredItem(pojo, remoteIp);
                baseResultInfo = new BaseResultInfo();
                baseResultInfo.setRetcode(0);
                baseResultInfo.setRetdesc("success");
                baseResultInfo.setServerIP(serverIP);
                return JSON.toJSONString(baseResultInfo);
            } catch (Exception e) {
                IHotelLoggerUtil.error("[国际酒店保险投保批处理异常]", e);
                throw e;
            }
        }

    Service层部分代码:  

    @Service("ihotelInsuranceBatchService")
    public class IhotelInsuranceBatchService {
        // 保险线程池
        @Resource
        private ThreadPoolTaskExecutor insuranceTaskExecutor;
        
    
    public void processIhotelInsuredItem(IhotelBatchPojo pojo, String remoteIp) {
            // 根据入参条件取出要投保批处理的数据
            List<InsuredHotelItem> isuredHotelList = this.findListByCond(pojo);
            for (InsuredHotelItem ihotelItem : isuredHotelList) {
    //将任务放入线程池 insuranceTaskExecutor.execute(
    new IhotelInsureDeliverTask( ihotelItem, this, remoteIp)); } }
    }

     线程类

    /**
     * @Title: IhotelInsureDeliverTask.java
     * @Package com.elong.ihotel.service.insurance
     * @Description: TODO
     * Copyright: Copyright (c) 2014 
     * Email: songbin0819@163.com
     * 
     * @author user
     * @date 2014年11月7日 下午4:04:02
     * @version V1.0
     */
    
    package com.elong.ihotel.service.insurance;
    
    import com.elong.ihotel.model.insurance.InsuredHotelItem;
    import com.elong.ihotel.util.IHotelLoggerUtil;
    
    /**
     * IhotelInsureDeliverTask
     * 
     * @Title: IhotelInsureDeliverTask
     * @Description: TODO 保险投保批处理线程类
     * @author Peng.Li
     * @date 2014年11月7日 下午4:04:02
     *
     */
    
    public class IhotelInsureDeliverTask implements Runnable {
        /**
         * 需要批处理的保险中间表实体
         */
        private InsuredHotelItem ihotelItem;
    
        private IhotelInsuranceBatchService ihotelInsuranceBatchService;
        /**
         * ip
         */
        private String remoteIp;
    
        /**
         * Constructor for IhotelInsureDeliverTask.
         * <p>
         * Title:
         * </p>
         * <p>
         * Description:
         * </p>
         */
        public IhotelInsureDeliverTask() {
        }
    
        /**
         * Constructor for IhotelInsureDeliverTask.
         * <p>
         * Title:
         * </p>
         * <p>
         * Description:
         * </p>
         * 
         * @param ihotelItem
         * @param ihotelInsuranceBatchService
         * @param remoteIp
         */
        public IhotelInsureDeliverTask(InsuredHotelItem ihotelItem,
            IhotelInsuranceBatchService ihotelInsuranceBatchService, String remoteIp) {
        this.ihotelItem = ihotelItem;
        this.ihotelInsuranceBatchService = ihotelInsuranceBatchService;
        this.remoteIp = remoteIp;
        }
    
        /**
         * 
         * override run
         * <p>
         * Title: run
         * </p>
         * <p>
         * Description:
         * </p>
         */
        @Override
        public void run() {
        try {
            ihotelInsuranceBatchService.sendInsuranceBatch(ihotelItem, remoteIp);
        } catch (Exception e) {
            IHotelLoggerUtil.error("获取保险确认号失败", e);
        }
    
        }
    
    }

     

     
  • 相关阅读:
    配置步骤
    swap区
    Oracle的left join中on和where的区别
    drop与truncate
    关于trace
    oracle执行计划连接方式
    oracle系统结构
    查询存档
    oracle统计信息
    分区索引
  • 原文地址:https://www.cnblogs.com/200911/p/4309512.html
Copyright © 2011-2022 走看看