zoukankan      html  css  js  c++  java
  • 重写ThreadFactory方法和拒绝策略

    最近项目中要用到多线程处理任务,自然就用到了ThreadPoolTaskExecutor这个对象,这个是spring对于Java的concurrent包下的ThreadPoolExecutor类的封装,对于超出等待队列大小的任务默认是使用RejectedExecutionHandler去处理拒绝的任务,而这个Handler的默认策略是AbortPolicy,直接抛出RejectedExecutionException异常,这个不符合我们的业务场景,

    业务需求:我希望是对于超出的任务,主线程进行阻塞,直到有可用线程,简单的代码如下

    package com.quant.dev.modules.dev.enetity;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.URL;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @program: dev
     * @description:
     * @author: Mr.EternityZhang
     * @create: 2019-07-08 17:41
     */
    @Slf4j
    public class TestThread {
    
    
        static class ThreadFactoryCustom implements ThreadFactory{
            private final AtomicInteger threadNum=new AtomicInteger(1);
            private final String namePrefix;
    
            private ThreadFactoryCustom(String namePrefix){
                this.namePrefix=namePrefix+"-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
                if(t.isDaemon()){
                    t.setDaemon(true);
                }
                if(t.getPriority()!=Thread.NORM_PRIORITY){
                    t.setPriority(Thread.NORM_PRIORITY);
                }
                return t;
            }
        }
    
        static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
            private final String threadName;
    
            private final URL url;
    
            public AbortPolicyWithReport(String threadName, URL url) {
                this.threadName = threadName;
                this.url = url;
            }
    
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                String msg = String.format("Provider端线程池满!" +
                                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
                        threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                        e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
                log.warn(msg);
                if (!e.isShutdown()) {
                    try {
                        log.info("start get queue");
                        e.getQueue().put(r);
                        log.info("end get queue");
                    } catch (InterruptedException ee) {
                        log.error(ee.toString(), ee);
                        Thread.currentThread().interrupt();
                    }
                }
            }
    
        }
    
        public static ThreadFactory getThreadFactoryCustom(String name){
            return new ThreadFactoryCustom(name);
        }
    
        public static void main(String[] args) {
            String poolName="eternity";
            ThreadFactory factory=getThreadFactoryCustom(poolName);
            log.info("核数={}",Runtime.getRuntime().availableProcessors());
            ThreadPoolExecutor executor=
                    new ThreadPoolExecutor(100,400,5,
                            TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
            Long begin=System.currentTimeMillis();
            CountDownLatch count=new CountDownLatch(2000);
            AtomicInteger integer=new AtomicInteger(1);
            for(int i=0;i<2000;i++){
                executor.execute(()->{
                    try {
                        log.info("当前线程为={},数值={}",Thread.currentThread().getName(),integer);
                        integer.getAndIncrement();
                        Thread.sleep(500);
                        count.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            try {
                count.await();
                log.info("阻塞数值={}",count.getCount());
                log.info("活跃数量={}",executor.getActiveCount());
                if(executor.getActiveCount()==0){
                    executor.shutdown();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            log.info("耗时={}------结果={}",System.currentTimeMillis()-begin,integer);
        }
    }
    
    

    阻塞原理:

    之所以能实现阻塞,是基于BlockingQueue的put方法来实现的,当阻塞队列满时,put方法会一直等待

    参考

    https://www.jianshu.com/p/3cfd943996a1

  • 相关阅读:
    白盒测试方法
    单元测试 集成测试 系统测试
    快慢指针原理和应用
    实例方法,类方法,静态方法区别
    查找算法
    排序算法整理
    Oracle sql developer 删表时遇到问题unique/primary keys in table referenced by foreign keys
    剑指 Offer 18. 删除链表的节点(简单)
    剑指 Offer 17. 打印从1到最大的n位数(简单)
    Cyberdebut的补题列表
  • 原文地址:https://www.cnblogs.com/eternityz/p/12238755.html
Copyright © 2011-2022 走看看