在Android中,Handler和AsyncTask都是与异步执行相关的类,Handler主要负责各线程间接收、传递消息,而AsyncTask则主要负责异步操作的处理,其中,doInBackGround()方法负责异步任务的处理、onProgressUpdate()方法则在异步执行过程中调用publishProgress()时回调,以随时更新UI。onPostExecute()方法则是当异步执行完毕后对UI的更新操作。现从源码的角度分析一下AsyncTask中线程池的调度过程。
AsyncTask用到的几个Java线程类(接口):
Future接口:Future多用于耗时线程的计算,主线程可以在完成自己的任务后,再去查询该Future是否执行完毕并获取结果。他有一个回调函数protected void done(),当任务结束时,该回调函数会被触发。因此,只需重载该函数,即可在线程刚结束时做一些事情。(调用onPostExecute()方法)
public interface Future<V> { /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run. If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return <tt>true</tt>. Subsequent calls to {@link #isCancelled} * will always return <tt>true</tt> if this method returned <tt>true</tt>. * * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return <tt>false</tt> if the task could not be cancelled, * typically because it has already completed normally; * <tt>true</tt> otherwise */ boolean cancel(boolean mayInterruptIfRunning); /** * Returns <tt>true</tt> if this task was cancelled before it completed * normally. * * @return <tt>true</tt> if this task was cancelled before it completed */ boolean isCancelled(); /** * Returns <tt>true</tt> if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * <tt>true</tt>. * * @return <tt>true</tt> if this task completed */ boolean isDone(); /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future接口的主要方法:
cancel:取消一个任务,不过当一个任务已经完成时,取消就会失败,这个方法有一个参数mayInterruptIfRunning,如果传入的是true,那么即使你的任务已经开始,那么也会试图中断任务所在线程
isDone:如果一个任务执行完毕,那么这里返回true
get:该方法有两个重载方法,首先看没有参数的,这个方法时一个同步方法,只到任务成功执行完毕才返回,如果任务取消或者中断,都会抛出异常,有参数的get方法就是限定了等待时间,如果在指定时间内没有返回,那么就不再等待。
FutureTask类: FutureTask继承了RunnableFuture接口,RunnableFuture接口又继承了Runnable、Future接口,可以当做一个Runnable直接调用,也可通过重写done()方法定义当线程执行完毕后的具体的操作。
/** * Protected method invoked when this task transitions to state * <tt>isDone</tt> (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { }
通过注释可知,done方法是任务状态变为isdone时调用,该任务可能正常执行,也可能取消了。
下面是线程池的一些接口:
Executor接口:仅提供了一个execute方法,execute方法负责将任务保存起来,可能在未来某一个时刻执行(可能在一个新的线程,或者一个线程池中执行)。至于具体是在新的线程中执行还是在线程池中执行就是有实现该接口的类决定的。
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>Executor</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService :继承自Executor,线程池的主要接口,我的其它博客里有讲。
public interface ExecutorService extends Executor {
/**
*/
void shutdown();
List<Runnable> shutdownNow();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
ThreadPoolExecuter:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
参数说明:
corePoolSize
:池中所保存的线程数,包括空闲线程
maximumPoolSize:池中允许的最大线程数
keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
unit:keepAliveTime 参数的时间单位
workQueue:执行前用于保持任务的队列
这里我们要重点理解corePoolSize,maximumPoolSize,workQueue这三个参数的意义。
当我们向线程池中添加一个任务时
1、如果线程数<corePoolSize,那么创建一个线程,不管此时是否有线程空闲
2、如果线程数=corePoolSize,如果有空闲线程,那么空闲线程处理加入的任务,如果没有空闲线程,那么加入workQueue中。只到workQueue线程已经满了,才会创建新的线程来处理新加入的任务,如果此时创建的线程数超过了maximumPoolSize,那么就会抛RejectedExecutionException异常。
3、如果线程数>corePoolSize时,那么说明workQueue已经满了。
通过以上的描述,说明如果workQueue是一个无界队列,那么maximumPoolSize就没有意义了。
AsyncTask源码分析:
private static final String LOG_TAG = "AsyncTask"; private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); /** * An {@link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
结合上面介绍的线程池相关知识,可以知道如下几点:
1、AsyncTask中的线程池,保存的线程数量是CPU核数+1个,大于CPU_COUNT的线程如果空闲10s中就会销毁掉。
2、最大的线程数为CPU_COUNT*2 +1 个,任务队列的容量是128,按照前面的分析,最多可以添加CPU_COUNT*2+129个任务,再添加任务时,程序就会崩溃
3、线程池是 static的,也就是说所有的AsyncTask公用一个线程池(一个应用之类的AsyncTask)。
下面看看AsyncTask的构造方法
public AsyncTask() { mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked return postResult(doInBackground(mParams)); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occured while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> { Params[] mParams; }
Callable类似于Runnable,只不过带有自己的返回值,在AsyncTask的构造函数中,定义了一个匿名内部类,并改写了call方法(可以理解改写了run方法),在call方法里,就是调用了doInBackground方法,所以doInBackgound是在后台线程执行。
private void postResultIfNotInvoked(Result result) { final boolean wasTaskInvoked = mTaskInvoked.get(); if (!wasTaskInvoked) { postResult(result); } } private Result postResult(Result result) { @SuppressWarnings("unchecked") Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT, new AsyncTaskResult<Result>(this, result)); message.sendToTarget(); return result; }
当mWorker加入线程池后,call方法会被适时调用,即doInBackground方法被,当任务状态变为isdone时,就会向Handler发送消息,这个Handler是InternalHandler类型。
private static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch (msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } }
从源码中可见,InternalHandler首先调用主线程的Looper,即操作都是在UI线程中完成。InternalHandler处理了两种消息类型,一种是消息已处理完成,此时会调用finish()方法,finish()方法则又调用onPostExecute()方法。另一种则是在执行过程中调用onProgressUpdate(),可见,onProgressUpdate()和onPostExecute()方法都是在UI线程中执行。
接下来使用mWorker为参数创建了一个FutureTask对象,并重写了done方法,前面我们已经介绍了done方法是在任务状态变为isdone时调用的,在done方法里面,我们会通过Future的get方法拿到结果。然后通过postResultIfNotInvoke()将结果发送到UI线程处理,这里Handler处理的消息可能是MESSAGE_POST_CANCEL(取消),也可能是MESSAGE_POST_RESULT(正常完毕)。其中Message的obj就是一个Result类型,这个类型就是包装了当前AsyncTask和返回结果的。
AsyncTask通过execute方法进行启动,我们来看看execute方法都做了哪些事情。
默认的execute方法会调用AsyncTask内部的线程池。并调用 executeOnExecutor(Executor exec ,Params... params)方法来实现。
public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); }
自定义线程池的execute方法。mStatus默认值就是Status.PENDING,如果mStatus不等于这个值,那么就会抛出异常,证明已经execute,如果等于这个值,那么就会将mStatus变为Status.RUNNING。然后调用onPreExecute方法,由于execute通常是在主线程执行,所以onPreExecute就是在UI线程中调用的。onPreExecute执行完毕后将mFuture加入到了线程池。
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; exec.execute(mFuture); return this; }
AsyncTask内部的线程池SerialExecutor实现了Executor接口,内部封装了一个线程队列,实现了按顺序向全局线程池execute的功能。在AsyncTask类中被实例成static final的(整个类只有一份静态化实例)。即AsyncTask的所有任务都会通过该类,向THREAD_POOL_EXECUTOR顺序输送消息并执行。
private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }