Executor
它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。
Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
依赖与继承:
ThreadPoolExecutor
extends AbstractExecutorService
AbstractExecutorService
implements ExecutorService
interface ExecutorService
extends Executor
newFixThreadPool
创建固定数目线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public class Demo2{ public static void main(String[] args) { //10个线程处理大量任务 ExecutorService pool = Executors.newFixedThreadPool(10); while (true){ pool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程
newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行
public class Demo2{ public static void main(String[] args) { //10个线程处理大量任务 ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); while (true){ pool.schedule(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }, 5, TimeUnit.SECONDS); //5秒后执行 } } }
newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
ForkJoinPool
Executors.newWorkStealingPool();
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
ForkJoinPool execute()
Runnable()包装成了ForkJoinTask
public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); }
举例:
1 public class Demo2{ 2 3 4 public static void main(String[] args) { 5 ExecutorService pool = Executors.newWorkStealingPool(); 6 7 while (true){ 8 pool.execute(new Runnable() { 9 10 @Override 11 public void run() { 12 System.out.println(Thread.currentThread().getName()); 13 try { 14 Thread.sleep(100); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 } 19 }); 20 } 21 } 22 23 }
ExecutorService 的 submit()
<T> Future<T> submit(Callable<T> task);
submit(Callable):与submit(Callable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()
方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()
方法是void
的,没有返回值。
1 Future<Integer> f = pool.submit(new Callable<Integer>() { 2 3 4 @Override 5 public Integer call() throws Exception { 6 return null; 7 } 8 });
换成Runnable()
submit(Runnable)方法:返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕。
1 Future<?> f = pool.submit(new Runnable() { 2 3 @Override 4 public void run() { 5 6 } 7 });
public class ThreadPoolExecutor extends AbstractExecutorService
AbstractExecutorService submit()
runnable() 封装成了 RunnableFuture
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
interface RunnableFuture<V> extends Runnable, Future<V>
Callable同样也是RunnableFuture
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。
完