zoukankan      html  css  js  c++  java
  • Java并发和多线程3:线程调度和有条件取消调度



    在第1篇中“并发框架基本示例”,提到了Executors和ThreadPool。
    其中,还有个“定时调度”的方法,Executors.newScheduledThreadPool(10)。
    // 可执行调度命令(定时+周期性)的线程池,拥有固定的线程数
    	// 重复执行,无穷尽
    	public static void scheduledThreadPool() {
    		int initialDelay = 10;
    		int period = 10;
    		Executor executor = Executors.newScheduledThreadPool(10);
    		ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
    		scheduler.scheduleAtFixedRate(task, initialDelay, period,
    				TimeUnit.SECONDS);
    	}


    这段代码,会一直重复执行,是一种常见的场景。

    但是,想到一种“只调度N次”的需求,看了下API,没有提供。
    就网上搜索“Java 取消线程调度”,找到了1个问答,就研究了下。
    代码示例的关键是,使用了线程安全的AtomicInteger和通用同步工具CountDownLatch。

    核心逻辑就是,当超过了最大任务数N的时候,取消Future中的任务,countDownLatch中的计数器-1,变为0,“countDownLatch.await()”线程阻塞结束
    countDownLatch.countDown();
    关于CountDownLatch的进一步介绍,请参考第4篇。

    package cn.fansunion.executorframework;
    
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    //有条件地,取消调度
    public class ConditionCancelSchedulerDemo {
    
    
    	public static Runnable task = new Runnable() {
    		@Override
    		public void run() {
    			System.out.println("Execute a task");
    		}
    	};
    
    
    	// 可执行调度命令(定时+周期性)的线程池,拥有固定的线程数
    	// 重复执行,无穷尽
    	public static void scheduledThreadPool() {
    		int initialDelay = 10;
    		int period = 10;
    		Executor executor = Executors.newScheduledThreadPool(10);
    		ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
    		scheduler.scheduleAtFixedRate(task, initialDelay, period,
    				TimeUnit.SECONDS);
    	}
    
    
    	public static void main(String[] args) throws Exception {
    		scheduledThreadPool();
    		conditionCancelScheduler();
    	}
    
    
    	private static void conditionCancelScheduler() throws InterruptedException {
    		final String jobID = "my_job_1";
    		final AtomicInteger count = new AtomicInteger(0);
    		final Map<String, Future> futures = new HashMap<>();
    		// 最多执行10个任务
    		final int maxTaskSize = 10;
    		// CountDownLatch的更多用法,请参考CountDownLatchDemo
    		final CountDownLatch countDownLatch = new CountDownLatch(1);
    		ScheduledExecutorService scheduler = Executors
    				.newSingleThreadScheduledExecutor();
    
    
    		Future future = scheduler.scheduleWithFixedDelay(new Runnable() {
    			@Override
    			public void run() {
    				System.out.println(count.getAndIncrement());
    				// 当调度执行,第maxTaskSize+1个任务的时候,取消Future中的任务。第11个任务
    				if (count.get() > maxTaskSize) {
    					System.out.println("a");
    					Future f = futures.get(jobID);
    					if (f != null) {
    						f.cancel(true);
    					}
    					// countDownLatch中的计数器-1,变为0
    					// “countDownLatch.await()”线程阻塞结束
    					countDownLatch.countDown();
    				}
    			}
    		}, 0, 1, TimeUnit.SECONDS);
    
    
    		futures.put(jobID, future);
    		countDownLatch.await();
    
    
    		scheduler.shutdown();
    	}
    }




    更多代码示例:
    http://git.oschina.net/fansunion/Concurrent(逐步更新中)


    参考资料:
    java并发编程-Executor框架
    http://www.iteye.com/topic/366591


    有条件地终止 ScheduledExecutorService 中运行的定时任务
    http://www.oschina.net/question/1158769_119659?sort=time


    JDK API 文档
  • 相关阅读:
    Bot Style Tests VS Page Objects
    Qemu文档
    PlantUML
    include <xxx.h> 和 include "xxxx.h"的区别
    2021.40 喜欢当下
    2021.39 MIUI崩溃
    2021.38 聚焦
    2021.37 心流
    2021.36 负熵
    2021.35 精神熵
  • 原文地址:https://www.cnblogs.com/qitian1/p/6462509.html
Copyright © 2011-2022 走看看