zoukankan      html  css  js  c++  java
  • Java并发容器和线程池

    并发容器

    说明

    1. java提供的线程池使用的都是并发容器,所以想要更深入了解线程池就必须了解同步容器
    2. 并发容器主要是解决数据的脏读问题

    图片源自网络

    基本说明:

    哈希:

    Hashtable:jdk1.0提供,方法均被synchronized修饰,不建议使用
    Collections.sychronizedMap(Map):使用Collections工具返回对应的加锁容器
    ConcurrentHashMap:内部加锁队列map,java8后使用cas算法实现无锁同步;无序
    ConcurrentSkipListMap:跳表算法;有序

    列表:

    Vector:jdk1.0提供,方法均被synchronized修饰,不建议使用

    Collections.synchronizedList(List):使用Collections工具返回对应的加锁容器

    CopyOnWriteList:写时复制,无序加锁(复制已经解决了脏读问题),并发时写入操作慢,读取快,但读取数据并不能保持实时性

    队列(先进先出):

    CocurrentLinkedQueue:非阻塞同步队列
    BlockingQueue:阻塞式队列(wait/notify)
    LinkedBlockingQueue:无界队列,即对容量不加限制(实质容量为Integer.MAX_VALUE)直到内存耗尽
    ArrayBlockingQueue:有界队列,有容量限制
    TransferQueue(LinkedTransferQueue,无界阻塞队列):传递队列,可保证数据的实时传递(transfer方法)
    SynchronusQueue:无界阻塞队列,底层用得是TransferQueue,并且容量为0
    DelayQueue:延时队列,执行定时任务

    双端队列(两端都可以操作):

    ConcurrentLinkedDeque:内部加锁双端队列

    LinkedBlockingDeque:阻塞时双端队列

    部分使用案例

    ConcurrentQueue

    public static void main(String[] args) {
    
        Queue<String> queue = new ConcurrentLinkedQueue<>();
    
        for(int i=0; i<10; i++) {
            boolean offer = queue.offer("a" + i);
            System.out.println(i + "-是否插入成功-" + offer);
        }
    
        System.out.println(queue);
        System.out.println("容器元素数量:" + queue.size());
    
        queue.poll(); //取出并删除
        System.out.println("poll()后 - 容器元素数量:" + queue.size());
    
        queue.peek(); //取出不删除
        System.out.println("peek()后 - 容器元素数量:" + queue.size());
    
        //双端队列Deque,两端都可以操作
    }
    

    LinkedBlockingQueue

    // 有界
    static BlockingQueue<String> strs = new LinkedBlockingQueue<>(10);
    
    // 无界,容量为(Integer.MAX_VALUE)
    //	static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); 
    
    public static void main(String[] args) throws Exception {
    
        new Thread(() -> {
            //向容器添加100个元素
            for (int i = 0; i < 10000; i++) {
                try {
                    // 阻塞方法,如果容器设置了容量,且put满了就会阻塞,等待消费者消费
                    strs.put("a" + i); 
                    TimeUnit.MILLISECONDS.sleep(500); 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    
        strs.take();
    
        // 开启5条线程从容器中获取数据
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " - take -" + strs.take()); //如果空了,就会等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "消费者" + i).start();
        }
    
    }
    

    ArrayBlockingQueue

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
    
    static Random r = new Random();
    
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a" + i);
        }
    
        strs.put("aaa"); //满了就会阻塞
        //strs.add("aaa"); //满了就抛异常 Queue full
        //strs.offer("aaa"); //满了就不插入
        //strs.offer("aaa", 1, TimeUnit.SECONDS);//满了先等待一段时间尝试插入,超过时间不插入
    
        System.out.println(strs);
    }
    

    DelayQueue

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    
    /**
    *	DelayQueue存储的元素必须实现 Delayed接口
    */
    static class MyTask implements Delayed {
        String name;
        long nowTime;
    
        MyTask(String name, long nowTime) {
            this.name = name;
            this.nowTime = nowTime;
        }
    
        /**
    	* 元素顺序对比,左为前右为后
    	* @param o
    	* @return
    	*/
        @Override
        public int compareTo(Delayed o) {
    
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;	//排在后面
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1; //排在前面
            else 
                return 0;
        }
    
        /**
    	* 	任务执行时间
    	* 	@param unit
     	* 	@return long 返回值越小越先执行
    	 */
        @Override
        public long getDelay(TimeUnit unit) {
            // 传入时间 - 系统时间 , 即从系统时间过后xxx毫秒后执行任务
            return unit.convert(nowTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
    
        @Override
        public String toString() {
            return name;
        }
    }
    
    // 测试
    public static void main(String[] args) throws InterruptedException {
        //系统当前时间
        long now = System.currentTimeMillis();
    
        MyTask t1 = new MyTask("任务1 - 1000", now + 1000);// 1000毫秒后执行
        MyTask t2 = new MyTask("任务2 - 2000", now + 2000);// 2000毫秒后执行
        MyTask t3 = new MyTask("任务3 - 1500", now + 1500);// 1500毫秒后执行
        MyTask t4 = new MyTask("任务4 - 2500", now + 2500);// 2500毫秒后执行
        MyTask t5 = new MyTask("任务5 - 500", now + 500);  // 500毫秒后执行
    
        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);
    
        System.out.println(tasks);
    
        for(int i=0; i<5; i++) {
            //执行任务
            System.out.println(tasks.take());
        }
    }
    

    LinkedTransferQueue

    /**
     * 	注意:
     * 		1、transfer()、take()均为阻塞式方法
     * 		2、transfer()要求在被调用之前必须先调用take(),否者transfer()会一直阻塞不执行
     * 		3、transfer()的功能就是将打算存入容器中的元素直接传递给消费者
     * 		4、若不想阻塞和保证数据的实时传递则可以将transfer()改用add()、put()等方法往容器传入元素
     *	
     *	作用:可以使用transfer()和take()来完成实时数据处理
     */
    public class MyTransferQueue {
    	public static void main(String[] args) throws Exception {
    		
    		TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    		
    		condition1(transferQueue); // 现有消费者后有生产者
    //		condition2(transferQueue); // 现有生产者后有消费者
    //		getElement(transferQueue); // 获取元素
    	}
    	
    	/**
    	 * 	情况1:先有消费者后有生产者
    	 */	
    	public static void condition1(TransferQueue<String> transferQueue) throws Exception {
    
    		// 消费者线程
    		new Thread(()->{
    			try {
    				System.out.println(transferQueue.take());
    				System.out.println(transferQueue.take());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}).start();
    		
    		// 生产者
    		transferQueue.transfer("xxx1");
    		transferQueue.transfer("xxx2");
    	}
    	
    	/**
    	 * 	情况2:先有生产者后有消费者
    	 * 	结果:一直阻塞,若不阻塞又想往容器传入数据可以使用put、add等方法传入元素
    	 */
    	public static void condition2(TransferQueue<String> transferQueue) throws Exception {
    		
    		// 生产者
    		transferQueue.transfer("xxx");
    		
    		// 消费者线程
    		new Thread(()->{
    			try {
    				System.out.println(transferQueue.take());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}).start();
    		
    	}
    	
    	
    	/**
    	 * 	获取元素
    	 * @param transferQueue
    	 * @throws Exception
    	 */
    	public static void getElement(TransferQueue<String> transferQueue) throws Exception {
    		
    		transferQueue.add("111");
    		transferQueue.add("222");
    		transferQueue.add("333");
    		
    		System.out.println("容器中的元素数量:" + transferQueue.size());
    		
    		// 遍历去除容器中的元素,不会删除队列中的元素
    //		for (String string : transferQueue) {
    //			System.out.println(string);
    //		}
    		
    		// take()作用相当于poll(),去除并删除队列中的元素
    		System.out.println(transferQueue.take());
    		System.out.println(transferQueue.take());
    		System.out.println(transferQueue.take());
    		System.out.println("容器中的元素数量:" + transferQueue.size());
    	}
    	
    }
    
    

    SynchronusQueue

    public static void main(String[] args) throws InterruptedException {
    
        // 容量为0,底层用TransferQueue
        SynchronousQueue<String> strs = new SynchronousQueue<>();
    
        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    
        strs.put("aaa"); //阻塞等待消费者消费,底层使用的是transfer()
        //strs.add("aaa");
        System.out.println(strs.size());
    }
    

    线程池

    说明

    线程池的基本概念:

    预先创建特定数量的线程,存放在池中;
    
    有任务则从池中获取线程来执行任务;
    
    任务完成线程自动存放回池中;
    
    注意:所谓的线程池实质就是一个Thread数组(java.lang.ThreadGroup)
    

    Java提供的线程种类(jdk1.8)

    • Executors.newFixedThreadPool(int nThreads):指定线程数的线程池
    • Executors.newSingleThreadExecutor():单线程的线程池,相当于Executors.newFixedThreadPool(1)
    • Executors.newCachedThreadPool():缓存线程池,按照任务数创建线程,线程的生命周期时间是60s
    • Executors.newScheduledThreadPool(int corePoolSize):延时操作线程池
    • ForkJoinPool():任务拆分/合并线程池,将一个任务拆分成多个子任务,多个子任务完成后再合并
    • Executors.newWorkStealingPool():工作窃取线程池,底层用的是ForkJoinPool()

    线程池中重要接口

    Executor接口

    // 任务执行接口
    public interface Executor {
        void execute(Runnable command); // 只接受Runnable
    }
    

    ExecutorService接口

    // Executor服务接口,继承自Executor,并提供了更多的抽象方法
    public interface ExecutorService extends Executor {
    
        void shutdown(); // 关闭线程池,正常关闭,等待任务执行完后关闭
    
        List<Runnable> shutdownNow(); // 强制关闭,不等待任务执行完
    
        boolean isShutdown(); // 线程池是否已经shutdown
    
        boolean isTerminated(); // 线程池是否已经终止(线程池彻底关闭)
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // ExecutorService和Executor区别是ExecutorService还支持Callable来处理任务
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    线程池使用

    FixedThreadPool

    说明

    指定线程数的线程池

    获取线程池方法

    public static ExecutorService newFixedThreadPool(int nThreads) {
        /* 【参数】
    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
    maximumPoolSize - 池中允许的最大线程数 
    keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间 
    unit - keepAliveTime参数的时间单位 
    workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务
    */ 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()); // 无界阻塞队列
    }
    

    使用案例

    public static void main(String[] args) throws InterruptedException {
    		
    		ExecutorService service = Executors.newFixedThreadPool(5); 
    		
    		for (int i = 0; i < 6; i++) {
    			// execute(Runnable)/submit(Callable)
    			service.execute(() -> {
    				try {
    					TimeUnit.MILLISECONDS.sleep(500);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println(Thread.currentThread().getName());
    			});
    		}
    		System.out.println(service);
    		/*
    		 * [
    		 * 	Running, 运行状态 
    		 * 	pool size = 5, 线程池线程数量
    		 * 	active threads = 5, 正在执行任务的线程数
    		 * 	queued tasks = 1, 在队列中等待处理的任务数两
    		 * 	completed tasks = 0,已经完成的任务
    		 * ]
    		 */
    
    		
    		service.shutdown(); // 正常关闭线程池
    		System.out.println(service.isTerminated()); // false
    		System.out.println(service.isShutdown()); // true
    		System.out.println(service);
    		//[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    		
    		TimeUnit.SECONDS.sleep(5);
    		System.out.println(service.isTerminated());// true
    		System.out.println(service.isShutdown());// true
    		System.out.println(service);
    		//[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    
    	}
    

    SingleThreadExecutor

    说明

    等同newFixedThreadPool(1)

    获取线程池方法

        /* 【参数】
    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
    maximumPoolSize - 池中允许的最大线程数 
    keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间 
    unit - keepAliveTime参数的时间单位 
    workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务
    */ 
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, // 线程数为1,等同newFixedThreadPool(1)
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>())); // 无界阻塞队列
    }
    

    CachedThreadPool

    说明

    缓存线程池,按照任务数创建线程,线程的生命周期时间是60s

    获取线程池方法

        /* 【参数】
    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
    maximumPoolSize - 池中允许的最大线程数 
    keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间 
    unit - keepAliveTime参数的时间单位 
    workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务
    */ 
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 
                                      Integer.MAX_VALUE, // 相当于不限制线程数量
                                      60L, // 线程空闲时间超过60s自动销毁
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>()); 
        							// 同步队列,无界阻塞(实质TransferQueue),实时数据传递
    }
    

    使用案例

    public static void main(String[] args) throws InterruptedException {
    		ExecutorService service = Executors.newCachedThreadPool();
    
    		// 执行2个任务
    		for (int i = 0; i < 2; i++) {
    			service.execute(() -> {
    				try {
    					TimeUnit.MILLISECONDS.sleep(500);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			});
    		}
    		
    		System.out.println(service);
    		//[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
    		
    		TimeUnit.SECONDS.sleep(80); // 空闲80秒
    		
    		System.out.println(service);
    		//[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    		
    }
    

    ScheduledThreadPool

    说明

    延时操作线程池

    获取线程池方法

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize,  // 线程数
              Integer.MAX_VALUE, // 相当于不限制线程数量
              0, // 线程超过corePoolSize后,空闲的线程不能存活
              NANOSECONDS, // 单位是纳秒
              new DelayedWorkQueue()); // 延时队列,执行定时任务
    }
    

    使用案例

    public static void main(String[] args) {
    
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        /**
    	* 	参数1:任务
    	* 	参数2:延时多少时间后执行该任务
    	* 	参数3:间隔多少时间执行任务(到时间后会再通过延迟时间来决定执行时间)
    	* 	参数4:时间的单位
    	*/
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
    
    }
    

    ForkJoinPool

    说明

    任务拆分/合并线程池,将一个任务拆分成多个子任务(利用精灵线程来完成),多个子任务完成后再合并

    获取线程池方法

    // 默认构造器,注意:ForkJoinPool创建的是精灵线程
    public ForkJoinPool() {
        //MAX_CAP:32767 [0x7fff]
        //Runtime.getRuntime().availableProcessors():当前计算机的cpu核数
        this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, 
             null,
             false);
    }
    
    public ForkJoinPool(int parallelism, // 并行数
                        ForkJoinWorkerThreadFactory factory, // 线程工厂,注意:ForkJoinPool创建的是精灵线程
                        UncaughtExceptionHandler handler, // 异常捕获处理器
                        boolean asyncMode) { // 是否异步
    	//........................
    }
    

    使用案例

    /**
     * 计算数组nums的元素总和
     */
    public class MyForkJoinPool {
    
    	static int[] nums = new int[1000000];
    	static final int MAX_NUM = 50000; // 拆分临界值,大于该数则需要拆分任务
    	static Random r = new Random();
    
    	static {
    		// 随机往数组中存数据
    		for (int i = 0; i < nums.length; i++) {
    			nums[i] = r.nextInt(100);
    		}
    
    		System.out.println("正确答案:" + Arrays.stream(nums).sum()); // stream api
    	}
    
    	// 任务:ForkJoinPool中的任务必须实现RecursiveTask<任务处理完后返回值的类型>接口
    	static class AddTask extends RecursiveTask<Long> {
    
    		int start, end;
    
    		AddTask(int s, int e) {
    			start = s;
    			end = e;
    		}
    
    		@Override
    		protected Long compute() {
    
    			// 当end-start小于MAX_NUM就直接计算返回无需进行任务拆分
    			if (end - start <= MAX_NUM) {
    				long sum = 0L;
    				// 结算结果
    				for (int i = start; i < end; i++)
    					sum += nums[i];
    				return sum;
    			}
    
    			// 一分为二,取中间值
    			int middle = start + (end - start) / 2;
    
    			AddTask subTask1 = new AddTask(start, middle);
    			AddTask subTask2 = new AddTask(middle, end);
    			// 拆分执行
    			subTask1.fork(); 
    			subTask2.fork();
    
    			// 合并结果
    			return subTask1.join() + subTask2.join();
    		}
    
    	}
    
    	public static void main(String[] args) throws IOException {
    		ForkJoinPool forkJoinPool = new ForkJoinPool();
            // 唯一的任务task
    		AddTask task = new AddTask(0, nums.length);
    		forkJoinPool.execute(task); // 执行任务task,自动拆分合并处理
    		long result = task.join(); // 返回执行结果
    		System.out.println("拆分计算结果" + result);
    
    	}
    }
    
    

    WorkStealingPool

    说明

    工作窃取线程池,底层用的是ForkJoinPool()

    获取线程池方法

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(), // 当前计算机cpu核数
             ForkJoinPool.defaultForkJoinWorkerThreadFactory, // 默认ForkJoinWorker线程工厂
             null, // 异常捕获处理器 
             true); // 是否异步处理
    }
    

    使用案例

    public class MyWorkStealingPool {
    	public static void main(String[] args) throws IOException {
    
    		// newWorkStealingPool()会以当前计算机cpu核数来创建精灵线程
    		ExecutorService service = Executors.newWorkStealingPool();
    		System.out.println("当前计算机cpu核数:" + Runtime.getRuntime().availableProcessors());
    
    		// 精灵线程
    		// 当前计算机cpu核数是4,所以只有4个精灵线程来完成任务
    		// 以下有5个任务,也就是4个精灵线程当中完成任务最快的那条会去获取第5个任务然后来完成
    		service.execute(new R(1));
    		service.execute(new R(2));
    		service.execute(new R(3));
    		service.execute(new R(4));
    		service.execute(new R(5));
    
    		// 由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
    		System.in.read();
    	}
    
    	static class R implements Runnable {
    
    		// 睡眠时间
    		int sleepSeconds;
    
    		R(int sleepSeconds) {
    			this.sleepSeconds = sleepSeconds;
    		}
    
    		@Override
    		public void run() {
    			try {
    				System.out.println(sleepSeconds + " " + Thread.currentThread().getName());
    				TimeUnit.SECONDS.sleep(sleepSeconds);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    	}
    }
    

    ThreadPoolExecutor类

    说明

    1. 该类用于构建线程池,java提供的8种线程池(jdk1.8)都用该类构建
    2. 可用ThreadPoolExecutor类自定义线程池

    源码

    ThreadPoolExecutor

    public class ThreadPoolExecutor extends AbstractExecutorService {
    //public abstract class AbstractExecutorService implements ExecutorService{//....}
        
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) { 
            					// 注意:这都是BlockingQueue
            
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(),  // 使用默认的线程工厂创建线程
                 defaultHandler);
        }
        
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            
            // 会导致异常的操作
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            
            this.acc = System.getSecurityManager() == null ?null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
        
        // ...........
    }
    

    DefaultThreadFactory

    /**
     * The default thread factory 默认的线程工厂(Executors的内部类)
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group; // 线程组:存放线程的地方,是一个线程数组
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    

    自定义线程池

    public class T4 {
    
    	public static void main(String[] args) {
    
    		/* 
    		 * public static ExecutorService newSingleThreadExecutor() {
    		    return new FinalizableDelegatedExecutorService
    		        (new ThreadPoolExecutor(1, 1, // 线程数为1,等同newFixedThreadPool(1)
    		                                0L, TimeUnit.MILLISECONDS,
    		                                new LinkedBlockingQueue<Runnable>()));
    		}*/
    		
    		/**
    		 * 	参考SingleThreadExecutor构建一个自定义线程池
    		 * 	使用LinkedTransferQueue来存储任务(先进先出)
    		 */
    //		ExecutorService threadPool = Executors.newSingleThreadExecutor();
    		ExecutorService threadPool = 
    				new ThreadPoolExecutor(1, 1, 0L, TimeUnit.NANOSECONDS, new LinkedTransferQueue<Runnable>()); 
    		
    		// 5个任务
    		for(int i=0; i<5; i++) {
    			threadPool.execute(()->{
    				try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    				System.out.println(Thread.currentThread().getName());
    			});		
    		}
    		
    		System.out.println(threadPool);
    		//[Running, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
    		threadPool.shutdown();
    		
    		try { TimeUnit.SECONDS.sleep(15); } catch (InterruptedException e) { e.printStackTrace(); }
    		System.out.println(threadPool);
    		//[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 5]
    	}
    
    }
    
  • 相关阅读:
    Web Ajax入门一讲
    Delphi – 我的代码之简单五子棋
    闲话 纪念我的4520G
    Delphi 我的代码之窗体移动
    破文 黑客游戏
    破文 OD常用断点
    Web 简单的开始 – Ajax + XML +DOM
    工具 – XMLSPY 和 UModel 商业版 2010v12.0有注册机
    API InterlockedCompareExchange用法
    软件工程 设计模式学习之策略模式Strategy
  • 原文地址:https://www.cnblogs.com/tandi19960505/p/9715179.html
Copyright © 2011-2022 走看看