1.线程池执行器
Executor.java
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 线程池执行线程类 * * @author zhang * * @param <T> */ public class Executor<T> { /** * 创建线程池线程数量 */ private static final int THREAD_NUM = 10; /** * 创建固定数量的线程池 */ private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM); /** * 获取线程执行完成结果Service */ private CompletionService<T> service = new ExecutorCompletionService<T>(executor); /** * 提交一个任务 * * @param task 任务对象 * @return 单个任务执行结果 */ public Future<T> submitTask(Callable<T> task) { return service.submit(task); } /** * 获取当前执行完的线程结果 * * @return 线程池中已执行完的线程结果 * @throws InterruptedException * @throws ExecutionException */ public T takeResult() throws InterruptedException, ExecutionException { Future<T> result = service.take(); return result.get(); } /** * 等待所有线程执行完成 * * @param tasks 任务集合 * @return 所有任务返回结果集合 * @throws InterruptedException * @throws ExecutionException */ public List<T> invokeAll(List<Callable<T>> tasks) throws InterruptedException, ExecutionException { List<T> list = new ArrayList<T>(); List<Future<T>> results = executor.invokeAll(tasks); for (Future<T> result : results) { T e = result.get(); list.add(e); } return list; } }
2.任务类
Task.java
import java.util.Map; import java.util.concurrent.Callable; public abstract class Task<T> implements Callable<T> { /** * 传入参数 */ private Map<String, Object> param; public Task() { } public Task(Map<String, Object> param) { this.param = param; } @Override public T call() throws Exception { T t = invoke(param); return t; } /** * 执行任务方法 * * @param param 所需参数 * @return 任务返回结果 */ protected abstract T invoke(Map<String, Object> param); }
3.样例任务类
DemoTask.java
import java.util.Map; public class DemoTask extends Task<String> { public DemoTask(Map<String, Object> param) { super(param); } @Override protected String invoke(Map<String, Object> param) { String name = (String) param.get("name"); int millis = (Integer) param.get("millis"); try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } return name + ": " + millis; } }
4.线程池使用示例
Demo.java
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; public class Demo { public static void main(String[] args) { testTake(); // testInvoke(); } public static void testTake() { Executor<String> executor = new Executor<String>(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "1"); param.put("millis", 5000); executor.submitTask(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "2"); param.put("millis", 3000); executor.submitTask(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "3"); param.put("millis", 8000); executor.submitTask(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "4"); param.put("millis", 4000); executor.submitTask(new DemoTask(param)); try { for (int i = 0; i < 4; i++) { String s = executor.takeResult(); System.out.println(s); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void testInvoke() { Executor<String> executor = new Executor<String>(); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); Map<String, Object> param = new HashMap<String, Object>(); param.put("name", "1"); param.put("millis", 5000); tasks.add(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "2"); param.put("millis", 3000); tasks.add(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "3"); param.put("millis", 8000); tasks.add(new DemoTask(param)); param = new HashMap<String, Object>(); param.put("name", "4"); param.put("millis", 4000); tasks.add(new DemoTask(param)); try { List<String> results = executor.invokeAll(tasks); for (String s : results) { System.out.println(s); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }