zoukankan      html  css  js  c++  java
  • Java线程池

      数据仓库在任务调度过程中,需要控制和监控数据的下载,加载,清洗等过程,其中用线程池来管理。

     1 public class ThreadPool extends ThreadPoolExecutor {
     2     
     3     private static final Logger logger = LoggerFactory
     4             .getLogger(ThreadPool.class);
     5     private TaskPuller getter;
     6     private AtomicBoolean isStop;
     7     private Thread getterThread;
     8     private int capacity;
     9     Map<Integer,Thread> map;
    10      /** Lock held by put, offer, etc */
    11     private final ReentrantLock putLock = new ReentrantLock();
    12 
    13     /** Wait queue for waiting puts */
    14     private final Condition full = putLock.newCondition();
    15     private AtomicInteger count;
    16 
    17     public ThreadPool(int size,TaskPuller getter) {
    18         super(size, size, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(size));
    19         map=new ConcurrentHashMap<Integer,Thread>();
    20         capacity=2*size;
    21         count=new AtomicInteger(0);
    22         isStop=new AtomicBoolean(true);
    23         this.getter=getter;
    24         run();//启动任务获取线程
    25     }
    26 }

       查看源码,发现ThreadPoolExecutor继承AbstractExecutorService,AbstractExecutorService实现ExecutorService接口,ExecutorService接口继承Executor,Executor是顶层接口,只有一个void execute(Runnable command);的execute方法。

       查看ThreadPoolExecutor的主要实现类,主要有如下几个比较重要的成员变量。

    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile int corePoolSize; private volatile int maximumPoolSize;

      workQueue是存放等待执行的任务队列,mainLock是用来判断线程池主要执行状态的锁,threadFactory是用来创建线程的工厂,handler是用来处理任务超出最大限制时候的策略,keepAliveTime是线程空闲存活时间,corePoolSize是线程池核心数量,maximumPoolSize是线程池最大数量。

       查看ThreadPoolExecutor的主要execute方法。

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

      如果当前工作线程总数小于线程池核心数量,把当前命令加入到工作线程队列中。通过2次检查AtomicInteger型ctl的状态,来判断是否需要回滚操作,还有0值处理。如果添加到工作队列失败,按照拒绝策略来拒绝当前命令。

    private void run(){
            getterThread=new Thread(new Runnable() {
                
                public void run() {
    
                    while(isStop.get()){
                        List<ETLTask> tasks=getter.getTasks();
                        for (ETLTask task : tasks) {
                            if(!isStop.get())
                                return;
                            try{
                                task.readyToExecute();
                            }
                            catch (Exception e) {
                                logger.error("try to update task status to wait executing failed: "+task.toString());
                                continue;
                            }
                            execute(task);
                            logger.debug("add task to thread pool successed: "+task.toString());
                        }
                    }
                }
            });
            getterThread.start();
        }
    }

      在初始化线程池之后,调用run方法来执行任务。通过AtomicBoolean的原子型变量isStop的值,用while轮询任务,来控制任务执行的开闭。

      最后在execute方法和afterExecute方法中,使用condition实现线程的等待和通知。如果当前线程总数已满,把当前线程加入等待队列中。执行完一个线程后,唤醒一个在等待队列中的线程来执行。

        public void execute(Runnable r){
            if(!isStop.get())
                return;
            ETLTask task=(ETLTask)r;
            try {
                putLock.lockInterruptibly();
                while(count.get()==capacity-1){            
                    full.await();
                }
                count.getAndIncrement();
                super.execute(r);
            } catch (InterruptedException e1) {
                logger.error("get the lock failed during add the task to the threadpool's task queue because of interupting",e1 );
            }catch(RejectedExecutionException e2){
                logger.error("add the task to the threadpool's task queue failed",e2);
            }    
            finally {
                putLock.unlock();
            }
            
        }        
    
        public void afterExecute(Runnable r, Throwable t) {
            int c=-1;
            map.remove(r);
            try {
                putLock.lockInterruptibly();
                c=count.decrementAndGet();
                if(c<capacity)
                    full.signal();
            } catch (InterruptedException e) {
                logger.error("decrement the task count failed when get the lock" ,e);
            }
            finally{
                putLock.unlock();
            }
            
        }
  • 相关阅读:
    H01-Linux系统中搭建Hadoop和Spark集群
    L07-Linux配置ssh免密远程登录
    L06-Ubuntu系统中部署Vagrant和VirtualBox
    P03-Python装饰器
    L05-Linux部署msmtp+mutt发送邮件
    O01-Linux CentOS7中利用RDO部署OpenStack
    L03-Linux RHEL6.5系统中配置本地yum源
    LoggerFactory.getLogger用法
    maven配置本地和远程仓库
    Jmeter下载安装配置及使用(windows)
  • 原文地址:https://www.cnblogs.com/wanli002/p/10164866.html
Copyright © 2011-2022 走看看