zoukankan      html  css  js  c++  java
  • 线程池的使用

    1-  线程池参数的配置

    executor:
      taskTimeSeconds: 0.1
      tasksParSecond: 100
      taskWaitTimeSeconds: 0.01
      uCPU: 0.8
      taskResponseTimeSeconds: 0.02
      keepAliveTime: 60
      rejectedType: 0
      queueType: 0
      unitType: 0

    2- 注入线程池

    package com.test.domi.config;
    
    import com.test.domi.executor.ThreadPoolUtil;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    public class executorConfig {
    
        @Value("${executor.taskResponseTimeSeconds}")
        private double taskResponseTimeSeconds;
        @Value("${executor.taskWaitTimeSeconds}")
        private double taskWaitTimeSeconds;
    
        @Value("${executor.taskTimeSeconds}")
        private double taskTimeSeconds;
        @Value("${executor.tasksParSecond}")
        private int tasksParSecond;
        @Value("${executor.uCPU}")
        private double uCPU;
    
        @Value("${executor.keepAliveTime}")
        private long keepAliveTime;
        @Value("${executor.rejectedType}")
        private int rejectedType;
        @Value("${executor.queueType}")
        private int queueType;
        @Value("${executor.unitType}")
        private int unitType;
    
        @Bean("executor")
        public ThreadPoolExecutor getThreadPoolExecutor(){
            ThreadPoolUtil threadPoolUtil = new ThreadPoolUtil(taskTimeSeconds, tasksParSecond, uCPU, taskResponseTimeSeconds,
                    taskWaitTimeSeconds,  keepAliveTime,  unitType, queueType, rejectedType);
            return threadPoolUtil.getThreadPoolExecutorForCPU();
        }
    }

    3- 自定义线程工厂

    package com.test.domi.executor;
    
    import org.apache.commons.lang.StringUtils;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class WarmThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        WarmThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            poolName = StringUtils.isBlank(poolName) ? "-" : (poolName + "-");
            namePrefix = poolName + "pool-" +  poolNumber.getAndIncrement() +  "-thread-";
        }
    
        @Override
        public Thread newThread(Runnable r) {
            String rName = r.toString();
            String threadName = namePrefix + threadNumber.getAndIncrement() + "-" + rName.substring((rName.indexOf("$") + 1),rName.indexOf("["));
            Thread t = new Thread(group, r,threadName,0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    
    }

    4- 自定义拒绝策略

    package com.test.domi.executor;
    
    import com.alibaba.fastjson.JSON;
    
    import java.util.Random;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
        private static final AbortTaskQueue abortTaskQueue = AbortTaskQueue.newInstance();
    
        public AbortPolicyWithReport() {}
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //todo 记录任务至于DB,MQ,等,实现最终一致性.
            abortTaskQueue.add(r);
            String param = JSON.toJSONString(r);
    //        if (new Random().nextInt(6) > 1) {
    //            Runnable poll = abortTaskQueue.poll();
    //            System.out.println("我消费了一个");
    //        }
            System.out.println("线程" + Thread.currentThread().getName() + "丢弃任务,参数为==》" + param);
            throw new RejectedExecutionException("Task-" + r.toString() + " -rejected from- " + e.toString());
        }
    
    
    }

    5- 自定义拒绝队列之外的任务队列(单例,安全)

    package com.test.domi.executor;
    
    import java.io.Serializable;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class AbortTaskQueue implements Serializable {
        private static  AbortTaskQueue aborttaskqueue = null;
        private static final ConcurrentLinkedQueue<Runnable> concurrentLinkedQueue = new ConcurrentLinkedQueue();
        private static final int MAX_LENGTH = 10000;
        private final AtomicInteger count = new AtomicInteger(0);
    
        private AbortTaskQueue() {}
        public Runnable poll() {
            Runnable poll = concurrentLinkedQueue.poll();
            if (poll == null) {
                count.set(0);
            }
            count.decrementAndGet();
            return poll;
        }
    
        public void add(Runnable r) {
            int total = count.get();
            if (total < MAX_LENGTH) {
                concurrentLinkedQueue.add(r);
                count.getAndIncrement();
            }
        }
    
        public static AbortTaskQueue newInstance(){
            if (aborttaskqueue == null) {
                synchronized (AbortTaskQueue.class) {
                    if (aborttaskqueue == null) {
                        aborttaskqueue = new AbortTaskQueue();
                    }
                }
            }
            return aborttaskqueue;
        }
    
        public int size(){
            return count.get();
        }
    }

    6- 自定线程池

    package com.test.domi.executor;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class WarmThreadPoolExecutor extends ThreadPoolExecutor {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        private final ThreadLocal<Long> startTime = new ThreadLocal<>();
        private final AtomicLong numTasks = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();
    
        public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                  TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                  TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory);
        }
        public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
        public WarmThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
        }
    
    
        protected void beforeExecute(Thread t,Runnable r) {
            super.beforeExecute(t,r);
            log.info(String.format("Thread %s: start======> %s",t,r));
            startTime.set(System.nanoTime());
        }
    
        protected void afterExecute(Runnable r,Throwable t) {
            try {
                long endTime = System.nanoTime();
                long taskTime = endTime - startTime.get();
                numTasks.incrementAndGet();
                totalTime.addAndGet(taskTime);
                log.info(String.format("Thread %s: end=====> %s, time = %dns", t, r, taskTime));
            }finally {
                super.afterExecute(r,t);
            }
        }
    
        protected void terminated() {
            try {
                log.info(String.format("Terminated: avg time ======> %dns", totalTime.get() / numTasks.get()));
            }finally {
                super.terminated();
            }
        }
    }

    7- 注入线程池的工具类

    package com.test.domi.executor;
    
    import java.math.BigDecimal;
    import java.util.concurrent.*;
    
    public class ThreadPoolUtil {
        private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
        private static final String POOL_NAME_PRE = "cpu-task";
    
        private double taskResponseTimeSeconds;
        private double taskWaitTimeSeconds;
        private double taskTimeSeconds;
        private int tasksParSecond;
        private double uCPU;
        private long keepAliveTime;
        private int rejectedType;
        private int queueType;
        private int unitType;
    
    
        public ThreadPoolUtil(double taskTimeSeconds,int tasksParSecond,double uCPU,double taskResponseTimeSeconds,
                              double taskWaitTimeSeconds, long keepAliveTime, int unitType,int queueType,int rejectedType){
          this.taskTimeSeconds = taskTimeSeconds;
          this.tasksParSecond = tasksParSecond;
          this.uCPU = uCPU;
    
          this.taskResponseTimeSeconds = taskResponseTimeSeconds;
          this.taskWaitTimeSeconds = taskWaitTimeSeconds;
    
          this.keepAliveTime = keepAliveTime;
          this.unitType = unitType;
          this.queueType = queueType;
          this.rejectedType = rejectedType;
        }
    
    
        public ThreadPoolExecutor getThreadPoolExecutorForIO() {
            return getThreadPoolExecutor(2);
        }
    
        public ThreadPoolExecutor getThreadPoolExecutorForCPU() {
            return getThreadPoolExecutor(1);
    
        }
    
        private ThreadPoolExecutor getThreadPoolExecutor(int executorForType) {
            int corePoolSize = getCorePoolSize();
            int maxPoolSize = getMaxPoolSize(corePoolSize,1);
            TimeUnit timeUnit = getTimeUnit(unitType);
            BlockingQueue queue = getBlockingQueue(queueType,corePoolSize);
            System.out.println("创建线程池的参数为corePoolSize=" + corePoolSize + "maxPoolSize = " + maxPoolSize + "queueSize=" + getQueueSize(corePoolSize));
            ThreadPoolExecutor warmThreadPoolExecutor = new WarmThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
                    queue, new WarmThreadFactory(POOL_NAME_PRE), new AbortPolicyWithReport());
            return warmThreadPoolExecutor;
        }
    
        private int getMaxPoolSize(int corePoolSize,int executorForType) {
            int maxPoolSize;
            if (executorForType == 1) {
                maxPoolSize = getMaxCorePoolSizeForCPUtask();
            } else {
                maxPoolSize = getMaxCorePoolSizeForIOtask();
            }
            return maxPoolSize < corePoolSize ? corePoolSize : maxPoolSize;
        }
    
    
        private TimeUnit getTimeUnit(int timeUnit){
            TimeUnit resultTimeUnit = TimeUnit.MILLISECONDS;
            switch (timeUnit){
                case 1:
                    resultTimeUnit =  TimeUnit.SECONDS;
                    break;
                case 2:
                    resultTimeUnit =  TimeUnit.MICROSECONDS;
                    break;
                case 3:
                    resultTimeUnit =  TimeUnit.NANOSECONDS;
                    break;
                case 4:
                    resultTimeUnit =  TimeUnit.MINUTES;
                    break;
                case 5:
                    resultTimeUnit =  TimeUnit.HOURS;
                    break;
                case 6:
                    resultTimeUnit =  TimeUnit.DAYS;
                    break;
                default:
                    break;
            }
            return resultTimeUnit;
        }
    
        private BlockingQueue getBlockingQueue(int queueType, int corePoolSize){
            BlockingQueue queue = new ArrayBlockingQueue(getQueueSize(corePoolSize));
            switch (queueType){
                case 1:
                    queue =  new LinkedBlockingQueue();
                    break;
                case 2:
                    queue =  new SynchronousQueue();
                    break;
                default:
                    break;
            }
            return queue;
        }
    
        private RejectedExecutionHandler getRejectedExecutionHandler(int rejectedType){
            RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicyWithReport();
            switch (rejectedType) {
                case 1:
                    rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
                    break;
                case 2:
                    rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
                    break;
                case 3:
                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                    break;
                default:
                    break;
            }
            return rejectedExecutionHandler;
        }
    
        private int getCorePoolSize(){
            double size = tasksParSecond * taskTimeSeconds;
            return getIntFromDouble(size);
        }
    
        private int getMaxCorePoolSizeForCPUtask(){
            return PROCESSORS + 1;
        }
    
        private int getMaxCorePoolSizeForIOtask(){
            if (uCPU <= 0 || taskTimeSeconds <= 0 || taskWaitTimeSeconds <= 0) {
                return 2 * PROCESSORS;
            }
            double size = PROCESSORS * uCPU * (taskWaitTimeSeconds / taskTimeSeconds);
            return getIntFromDouble(size);
        }
    
        private int getQueueSize(int corePoolSize){
            double size = corePoolSize / taskTimeSeconds * taskResponseTimeSeconds;
            return getIntFromDouble(size);
        }
    
        private int getIntFromDouble(double size) {
            BigDecimal bsize = new BigDecimal(size).setScale(0, BigDecimal.ROUND_HALF_UP);
            return Integer.parseInt(bsize.toString());
        }
    
    
    
    }

    8- 封装cpu密集型的Task

    package com.test.domi.task;
    
    import java.util.concurrent.Callable;
    
    /**
     * 如何设计一个任务:
     * 1- 是否具备可取消操作
     * 2- 是否能响应中断,中断是实现取消的最佳实践
     * 3- call方法中要允许线程的退出
     */
    public class TestFutureTask implements Callable {
        private static final String TASK_NAME = "TEST_TASK";
        //取消标志位
        private volatile boolean cancelled;
        private String param1;
        private String param2;
    
        public TestFutureTask(String param1,String param2) {
            this.param1 = param1;
            this.param2 = param2;
        }
    
        @Override
        public String call() throws Exception {
            Thread.sleep(2000);
            //TODO 设置允许线程退出的方法
            return param2 + param1;
        }
    
        public void cancel() {
            cancelled = true;
        }
    
        public String getParam() {
            return param1 + param2;
        }
    
    }

    9- 封装IO密集型的阻塞,可相应中断的task:  https://blog.csdn.net/xrr0520blog/article/details/8912139

    10- 测试

    package com.test.domi;
    
    import com.test.domi.executor.AbortTaskQueue;
    import com.test.domi.task.TestFutureTask;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = TestModulServiceApplication.class)
    public class TestDemo {
    
        @Autowired
        @Qualifier("executor")
        private ThreadPoolExecutor threadPoolExecutor;
    
    //    private CountDownLatch  count = new CountDownLatch(1);
    
        @Test
        public void redisTest() throws Exception{
    
            List<Future<String> > list = new ArrayList<>(32);
            for (int i = 0; i < 60; i++) {
                Future<String> submit = threadPoolExecutor.submit(new TestFutureTask("nihao","Task"));
                try {
                    String s = submit.get(3000, TimeUnit.MILLISECONDS);
                    //点进去看该方法会抛出3种异常
                    list.add(submit);
                } catch (TimeoutException e) {
                    System.out.println("任务执行失败msg=" + e.getMessage());
                    e.printStackTrace();
                }catch (ExecutionException e) {
                   throw new Exception("");
                }catch (InterruptedException e) {
                }finally {
                    submit.cancel(true);
                }
            }
    //        count.countDown();
            for (Future<String> stringFuture : list) {
                try {
                    System.out.println("result=" + stringFuture.get());
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("FUTURN返回失败");
                }
            }
            System.out.println("任务执行完毕queueSize = " + threadPoolExecutor.getQueue().size());
            threadPoolExecutor.shutdown();
            AbortTaskQueue abortTaskQueue = AbortTaskQueue.newInstance();
            int size = abortTaskQueue.size();
            System.out.println("任务执行完毕queueSize = ");
    
    //        if (threadPoolExecutor != null) {
    //            int activeCount = threadPoolExecutor.getActiveCount();
    //            int corePoolSize = threadPoolExecutor.getCorePoolSize();
    //            String s = threadPoolExecutor.toString();
    //            long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS);
    //            int poolSize = threadPoolExecutor.getPoolSize();
    //            int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
    //            BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
    //            RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
    //        }
        }
    }
  • 相关阅读:
    Learning NFS/NIS 2nd 读书笔记-Chapter3 NIS Operation
    Linux Enterprise Cluster Notes Ch11 LVS Introduction Theory
    Linux Enterprise Cluster NOtes Ch7 A Sample HA config
    Linux Enterprise Cluster Notes Ch10 build a Linux cluster
    Linux Enterprise Cluster NOtes Ch8 Heartbeat配置和维护
    当被监控的应用发生问题时,heartbeat会failover么?
    Linux Enterprise Cluster NOtes Ch9 Stonith and IPFail
    Linux Enterprise Cluster NOtes Ch6 Heartbeat介绍和原理
    客户端不支持javascript怎么办
    js 返回对象|js返回多个值的方法|js如何返回多个值
  • 原文地址:https://www.cnblogs.com/domi22/p/10017682.html
Copyright © 2011-2022 走看看