Android 源码解析AsyncTask(二) - Go语言中文社区

Android 源码解析AsyncTask(二)


      上篇文章简单说明了AsyncTask的使用,如果你还未了解,请先看这篇文章,Android 源码解析AsyncTask(一)。今天这篇文章详细的从源码的角度解析AsyncTask。

下面通过源码来看看AsyncTask,本文是基于android-24,因为不同平台的AsyncTask源码可能会有区别。

1.首先看AsyncTask的构造函数,

 /**
     * Creates a new asynchronous task. This constructor must be invoked on the UI thread.
     */
    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
 Creates a new asynchronous task. This constructor must be invoked on the UI thread.
由此看见,AsyncTask的创建必须在UI线程中。

构造函数中,初始化了两个对象,mWorker和mFuture对象,

    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;

其中mWorker是WorkerRunnable类,WorkerRunnable类实现了Callable接口,

    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

该类有一个mParams属性,mParams是一个数组,用于保存传递的参数,Callable接口只有一个call方法,

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
再回到初始化的地方,

 public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
实现了call方法,该方法中设置mTaskInvoked为true,表示当前任务已经被执行过,设置线程的优先权,然后调用doInBackground(mParams),将返回的result传递给postResult(result),最终将执行结果返回。可以看到在call方法中执行了doInBackground()方法。下面看看postResult()方法,

    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }
很明显,这个地方使用了消息机制,通过Handler向外发送了一个Message,Message的what等于MESSAGE_POST_RESULT,Message的obj为AsyncTaskResult对象,

    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }
AsyncTaskResult类很简单,只是将参数封装给自己。

那么我们就需要找找接受消息的Handler,

    private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

  private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }
可以看出InternalHandler是运行在主线程中的,再来看看它的handleMessage()方法,如果msg.what等于MESSAGE_POST_RESULT,则调用该AsyncTask的finish()方法;如果msg.what等于MESSAGE_POST_PROGRESS,则调用AsyncTask的onProgressUpdate()方法。下面看看AsyncTask的finish()方法,

    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }
可以看到,如果调用了isCancelled()方法,则执行onCancelled(result)方法,否则调用onPostExecute(result)方法。可以看到这儿执行了onPostExecute(result)方法
至此,有关mWorker的功能基本就完成了。下面再看看mFuture对象,

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };

重写了done()方法,

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }
如果wasTaskInvoked等于false,则执行postResult(result)。

至此,有关mWorker和mFuture对象的初始化就介绍完了。

2.下面看AsyncTask如何执行的,我们都知道执行AsyncTask,需要调用execute()方法,

    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

    executeOnExecutor(sDefaultExecutor, params)方法传递了两个参数,第3-13行,是判断该AsyncTask的状态,如果状态不等于Status.PENDING,则会抛异常,这就是为什么一个AsyncTask只能被执行一次的原因。

首先看sDefaultExecutor,

	public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
	private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

可以得知SERIAL_EXECUTOR是一个常量类,所以一个应用程序都会共用同一个SerialExecutor对象,

   private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
sDefaultExecutor是SerialExecutor的一个实例,SerialExecutor内部有一个任务队列。

接下来看它的execute(),mTasks.offer(Runnable r)表示将Runnable对象添加到队列的尾部;mTasks.poll()表示检索并移除队列的头部。

当执行execute()方法时,首先将一个Runnable加入到队列的尾部,接着判断mActive是否等于null,如果mActive等于null,然后执行 scheduleNext()方法,该方法从队列的头部获取一个任务,如果该任务不等于null,则执行 THREAD_POOL_EXECUTOR.execute(mActive)。如果是第一次运行,会执行THREAD_POOL_EXECUTOR.execute(mActive)方法。那么THREAD_POOL_EXECUTOR.execute(mActive)是什么东西呢?

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // We want at least 2 threads and at most 4 threads in the core pool,
    // preferring to have 1 less than the CPU count to avoid saturating
    // the CPU with background work
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
哦!原来THREAD_POOL_EXECUTOR是一个线程池啊!最大线程数量,核心线程数量,核心线程的空闲存活时间(有关线程池,详情请看, Android(线程二) 线程池详解)。

我们再次梳理一下AsyncTask的execute()方法整个流程。

(1).调用execute()方法,实际上调用的是executeOnExecutor(sDefaultExecutor, params);

(2).executeOnExecutor(sDefaultExecutor, params),首先执行onPreExecute()方法,接着执行exec.execute(mFuture)方法即sDefaultExecutor.execute(),而sDefaultExecutor是SerialExecutor对象,那么也就是调用SerialExecutor.execute()方法,SerialExecutor.execute()方法是将Runnable加入到队列的尾部。第一次运行,那么就启动线程池去运行队列头部任务,执行完毕后,再执行 scheduleNext()方法,该方法判断队列头部是否还有要执行的任务,如果有,那么将继续执行执行线程池;如果有新的任务要执行,那么需要等待上一个任务执行完毕后才执行。

由此,我们可以得知AsyncTask虽然内部有线程池,但是其实还是一个线程一个线程的执行,并不是并发执行的。

以上就是有关AsyncTask的execute()方法的整体描述,下面还是通过一张图说明它的执行过程,


线程池执行的Runnable其实mFuture对象,mFuture对象前面我们已经简单介绍了。下面我们看看mFuture的run()方法,

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

其中第6行的callable参数是我们传递的参数mWorker对象,第11行调用了mWorker的call(),也就是会调用我们在AsyncTask的构造方法中初始化mWorker时实现的call()方法,又回到我们之前介绍过的call()方法了。第18、19行,如果ran = true,则执行set(V v)方法,

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }
最终会执行finishCompletion()方法,

   private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
第21行会执行 done(),也就是会调用我们在AsyncTask的构造方法中初始化mFuture时实现的done()方法,done()方法这里主要是验证postResult()是否被调用了,如果没有被调用,则调用postResult()方法。

通过对FutureTask的说明可以知道,mFuture是mWorker的操作类,它不仅用来实现对AsyncTask任务的操作(cancel,get等),最主要的,实现了mWorker执行结束的操作。

好了,我们基于源码分析AsyncTask就结束了!微笑但是,等等,好像还有一个方法没有分析呢!onProgressUpdate()方法,那么它是如何被调用的呢?首先我们看publishProgress()方法的源码,

    protected final void publishProgress(Progress... values) {
        if (!isCancelled()) {
            getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }
可以看到,该方法依旧只是,通过Handler向外发送了一个Message,Message的what等于MESSAGE_POST_PROGRESS,Message的obj为AsyncTaskResult对象,又回到之前我们介绍过的InternalHandler了,如果what等于MESSAGE_POST_PROGRESS,则调用AsyncTask的onProgressUpdate()方法,而onProgressUpdate()方法需要我们自己去实现。

   protected void onProgressUpdate(Progress... values) {
    }

当手动调用publishProgress()方法时,便会发送一个消息,最终会调用AsyncTask的onProgressUpdate()方法。

通过源码分析完AsyncTask,可以知道AsyncTask内部其实就是线程和Handler的封装,并且它的内部虽然有线程池,但是其实也是同一时刻只有一个线程运行。有关Handler的详情介绍,请看 Android 源码解析Handler处理机制(二)
下面依旧通过一张图来说明,AsyncTask整个的运行过程,


3. 实例演示AsyncTask中的execute()和executeOnExecutor()。

(1).执行execute();

同时执行多个AsyncTask。

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "MainActivity";
    TextView tv;
    Button btn;


    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        tv = (TextView) findViewById(R.id.tv);
        btn = (Button) findViewById(R.id.btn);


        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                start();
            }
        });
    }


    class DownloadTask extends AsyncTask<Void, Void, Void> {

        String name;

        public DownloadTask(String name) {
            this.name = name;
        }

        @Override
        protected Void doInBackground(Void... params) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }




        @Override
        protected void onPostExecute(Void s) {
            super.onPostExecute(s);
            SimpleDateFormat simpleDateFormat=new SimpleDateFormat("HH:mm:ss");
            Log.e(TAG, name+"----->执行完毕时间:"+ simpleDateFormat.format(new Date())+"" );
        }
    }

    private void start() {
        for (int i = 0; i < 10; i++) {
            new DownloadTask("AsyncTask"+i).execute();
        }
    }
}
运行后的截图如下,

可以看到当点击按钮时,同时执行了10个AsyncTask,这10个AsyncTask是串行运行的,按顺序一个一个执行。

(2).执行executeOnExecutor()。

    private void start() {
        for (int i = 0; i < 10; i++) {
            new DownloadTask("AsyncTask"+i).executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
        }
    }
执行结果截图,

AsyncTask.THREAD_POOL_EXECUTOR,这个是系统提供的一个线程池,核心为5条,最大128条。

可以看到当点击按钮时,同时执行了10个AsyncTask,首先同时执行了5个AsyncTask(因为核心线程数量是5,),这5个AsyncTask执行完毕后,接着在执行剩下的5个AsyncTask。

可以看到使用executeOnExecutor(),可以达到并发处理的效果。具体的并发处理逻辑是依靠传递的线程池参数。当然,线程池我们可以使用系统的或者自定义。有关线程池,详情请看 Android(线程二) 线程池详解

4.总结。

1.创建AsyncTask,必须在主线程中;

2.执行AsyncTask的 execute()方法,要在主线程中;

3.不要手动调用AsyncTask的几个方法,onPreExecute(),doInBackground(),onProgressUpdate(),onPostExecute();

4.一个任务实例只能执行一次,否则再次执行将抛出异常;

5.在Android1.6之前,AsyncTask是串行执行任务的,Android1.6的时候AsyncTask开始采用线程池理财并行任务,但是从Android3.0开始,为了避免AsyncTask所带来的并发错误,AsyncTask又采用一个线程来串行执行任务。尽管如此,在Android3.0以及后续的版本中,我们仍然可以通过AsyncTask的executeOnExecutor方法并行地执行任务;

6.AsyncTask本质对线程和Handler的封装,它里面虽然有线程池,但是同时只有一个线程在运行,所以AsyncTask并不适合进行特别耗时的后台任务,对于特别耗时的任务来说,建议使用线程池。


PS:异步任务是一个匿名内部类,因此它对当前Activity有一个隐式引用。如果Activity在销毁之前,任务还未完成, 那么将导致Activity的内存资源无法回收,造成内存泄漏。那么该怎么解决呢?代码如下所示,

    static class MyAsyncTask extends AsyncTask<Void,Void,Void>{
        private WeakReference<Context> mReference;

        public MyAsyncTask(Context context) {
            mReference = new WeakReference<Context>(context);
        }
        @Override
        protected Void doInBackground(Void... voids) {
            //执行任务
            return null;
        }

        @Override
        protected void onPostExecute(Void aVoid) {
            super.onPostExecute(aVoid);
            MainActivity activity=(MainActivity) mReference.get();
            if(activity!=null){
                //执行操作
            }
        }
    }



版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/zxw136511485/article/details/53036525
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-08 14:50:19
  • 阅读 ( 776 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢