参考
https://www.bennyhuo.com/2019/05/07/coroutine-suspend/
深入协程挂起
从下边的函数说起;
public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) /* * For non-atomic cancellation we setup parent-child relationship immediately * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but * properly supports cancellation. */ cancellable.initCancellability() block(cancellable) cancellable.getResult() }
上边函数是delay,join等suspend函数内部都会调用的。
suspendCoroutineUninterceptedOrReturn 这个方法调用的源码是看不到的,因为它根本没有源码:P 它的逻辑就是帮大家拿到 Continuation 实例,而此Continuation 实例就是编译器自动生成的包装了我们的lambda的SuspendLambda对象,真的就只有这样。
不过这样说起来还是很抽象,因为有一处非常的可疑:suspendCoroutineUninterceptedOrReturn 的返回值类型是 T,而传入的 lambda 的返回值类型是 Any?, 也就是我们看到的 cancellable.getResult() 的类型是 Any?,而这个结果值就是挂起的关键所在。
简单来说就是,对于 suspend 函数,不是一定要挂起的,可以在需要的时候挂起,也就是要等待的协程还没有执行完的时候,等待协程执行完再继续执行;而如果在开始 join 或者 await 或者其他 suspend 函数,如果目标协程已经完成,那么就没必要等了,直接拿着结果走人即可。那么这个神奇的逻辑就在于 cancellable.getResult() 究竟返回什么了,且看:
internal fun getResult(): Any? { ... if (trySuspend()) return COROUTINE_SUSPENDED // ① 触发挂起逻辑 ... if (state is CompletedExceptionally) // ② 异常立即抛出 throw recoverStackTrace(state.cause, this) return getSuccessfulResult(state) // ③ 正常结果立即返回 }
这段代码 ① 处就是挂起逻辑了,表示这时候目标协程还没有执行完,需要等待结果,②③是协程已经执行完可以直接拿到异常和正常结果的两种情况。②③好理解,关键是 ①,它要挂起,这返回的是个什么东西?
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
这是 1.3 的实现,1.3 以前的实现更有趣,就是一个白板 Any。其实是什么不重要,关键是这个东西是一个单例,任何时候协程见到它就知道自己该挂起了。
delay
public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } }
cont.context.delay.scheduleResumeAfterDelay 这个操作,你可以类比 JavaScript 的 setTimeout,Android 的 handler.postDelay,本质上就是设置了一个延时回调,时间一到就调用 cont 的 resume 系列方法让协程继续执行。
深入理解协程的状态转移
其实kotlin的协程就是一个状态机回调。
SuspendFunctions.kt文件:
suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{ continuation -> thread { Thread.sleep(1000) continuation.resume("Return suspended.") } COROUTINE_SUSPENDED } suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{ log(1) "Return immediately." }
kotlin调用
suspend fun main() { log(1) log(returnSuspended()) log(2) delay(1000) log(3) log(returnImmediately()) log(4) }
结果:
08:09:37:090 [main] 1
08:09:38:096 [Thread-0] Return suspended.
08:09:38:096 [Thread-0] 2
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4
其实kotlin就是把协程体中每次遇到挂起函数时,作为一个分界点(也就是下边的label),那么每次在执行resumeWith时都会根据当前label的值来重新继续执行之前挂起位置后的代码。
其实看java代码更好理解,这里把kotlin转换成java,并做了一些修改,便于理解:
public class ContinuationImpl implements Continuation<Object> { private int label = 0; private final Continuation<Unit> completion; public ContinuationImpl(Continuation<Unit> completion) { this.completion = completion; } @Override public CoroutineContext getContext() { return EmptyCoroutineContext.INSTANCE; } @Override public void resumeWith(@NotNull Object o) { try { Object result = o; switch (label) { case 0: { LogKt.log(1); result = SuspendFunctionsKt.returnSuspended( this); label++; if (isSuspended(result)) return; } case 1: { LogKt.log(result); LogKt.log(2); result = DelayKt.delay(1000, this); label++; if (isSuspended(result)) return; } case 2: { LogKt.log(3); result = SuspendFunctionsKt.returnImmediately( this); label++; if (isSuspended(result)) return; } case 3:{ LogKt.log(result); LogKt.log(4); } } completion.resumeWith(Unit.INSTANCE); } catch (Exception e) { completion.resumeWith(e); } } private boolean isSuspended(Object result) { return result == IntrinsicsKt.getCOROUTINE_SUSPENDED(); } }
我们定义了一个 Java 类 ContinuationImpl,它就是一个 Continuation 的实现。
实际上如果你愿意,你还真得可以在 Kotlin 的标准库当中找到一个名叫 ContinuationImpl 的类,只不过,它的 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是我们的协程体,通常也就是一个 Lambda 表达式 —— 我们通过 launch启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。
有了这个类我们还需要准备一个 completion 用来接收结果,这个类仿照标准库的 RunSuspend 类实现,如果你有阅读前面的文章,那么你应该知道 suspend main 的实现就是基于这个类:
public class RunSuspend implements Continuation<Unit> { private Object result; @Override public CoroutineContext getContext() { return EmptyCoroutineContext.INSTANCE; } @Override public void resumeWith(@NotNull Object result) { synchronized (this){ this.result = result; notifyAll(); // 协程已经结束,通知下面的 wait() 方法停止阻塞 } } public void await() throws Throwable { synchronized (this){ while (true){ Object result = this.result; if(result == null) wait(); // 调用了 Object.wait(),阻塞当前线程,在 notify 或者 notifyAll 调用时返回 else if(result instanceof Throwable){ throw (Throwable) result; } else return; } } } }
java的调用:
public static void main(String... args) throws Throwable { RunSuspend runSuspend = new RunSuspend(); ContinuationImpl table = new ContinuationImpl(runSuspend); table.resumeWith(Unit.INSTANCE); runSuspend.await(); }
我们看到,作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImpl 的 resumeWtih 的最后才会被调用,因此它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转完毕才会停止阻塞,此时进程也就运行完毕正常退出了。
我们看到,这段普通的 Java 代码与前面的 Kotlin 协程调用完全一样。那么我这段 Java 代码的编写根据是什么呢?就是 Kotlin 协程编译之后产生的字节码。当然,字节码是比较抽象的,我这样写出来就是为了让大家更容易的理解协程是如何执行的,看到这里,相信大家对于协程的本质有了进一步的认识:
l 协程的挂起函数本质上就是一个回调,回调类型就是 Continuation
l 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像我们前面例子中的 label 不断的自增来实现状态流转一样
如果能够把这两点认识清楚,那么相信你在学习协程其他概念的时候就都将不再是问题了。如果想要进行线程调度,就按照我们讲到的调度器的做法,在 resumeWith 处执行线程切换就好了,其实非常容易理解的。官方的协程框架本质上就是在做这么几件事儿,如果你去看源码,可能一时云里雾里,主要是因为框架除了实现核心逻辑外还需要考虑跨平台实现,还需要优化性能,但不管怎么样,这源码横竖看起来就是五个字:状态机回调。
协程的创建与启动
https://www.jianshu.com/p/2979732fb6fb
从最常用的launch来说明协程的创建和启动。
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
接着会调用到AbstractCoroutine.start
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
然后再调用start方法,此时发现没法跟下去了,因为没有要找的方法了,解决方法就是用debug模式断点跟踪,最后发现原来进到CoroutineStart.invoke了
@InternalCoroutinesApi public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit // will start lazily }
原来因为CoroutineStart自定义了“()”操作符,
另外注意completion是上边的StandaloneCoroutine。
接着由于是CoroutineStart.DEFAULT,所以调用到Cancellable.kt中的startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
这其中的三个方法是协程的创建、启动 以及 线程调度的关键。
createCoroutineUnintercepted
先说createCoroutineUnintercepted,这个方法和上边挂起函数的suspendCoroutineUninterceptedOrReturn 类似应该都是委托给编译器来处理一部分逻辑,所以是看不到里边是怎么实现的,不过可以通过注释来知道它是干嘛的:
/** * Creates unintercepted coroutine without receiver and with result type [T]. * This function creates a new, fresh instance of suspendable computation every time it is invoked. * * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. * The [completion] continuation is invoked when coroutine completes with result or exception. ... */ public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { ... }
这个方法会在每次调用时创建一个可挂起的计算逻辑对象(其实可以认为时SuspendLambda的实例),为了启动这个协程,会去调用返回对象(SuspendLambda的实例)的resume(Unit),而completion(可以认为是StandaloneCoroutine)会在协程执行完或出异常时执行(调用其resume)。
resumeCancellableWith
接着intercepted()方法就是进行线程调度的,这个放到下边说,假设这里没有线程调度,那么直接执行最后的方法resumeCancellableWith:
@InternalCoroutinesApi public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) }
因为没有线程调度所以执行else,
那么就会调用到Continuation.resumeWith,以SuspendLambda为例:
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any?
可以看到会调用到invokeSuspend,而这个方法就是编译器把我们的lambda生成的代码,生成的代码就是状态机,在最后用一个例子说明。
协程的线程调度
接着上边的startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
intercepted()
如果有线程调度时,
@SinceKotlin("1.3") public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
createCoroutineUnintercepted方法返回的是SuspendLambda的实例,SuspendLambda实现了ContinuationImpl,
接着看ContinuationImpl.intercepted():
public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
标红的就是判断context中是否有调度器,如果有就执行其interceptContinuation(this)方法并返回其包装结果,注意把SuspendLambda传递进去了;没有就返回SuspendLambda。
接着就会调用到CoroutineDispatcher.interceptContinuation()
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
这里边创建了一个DispatchedContinuation对象,并把调度器自身和SuspendLambda传递进去包装了一下。
看这个类的名字就知道他也是个Continuation,那么后边肯定会调用它的resumeWith来进行线程调度。
resumeCancellableWith
接着调用最后的方法resumeCancellableWith
@InternalCoroutinesApi public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) }
此时的this就是DispatchedContinuation了,所以会执行第一个判断。
DispatchedContinuation.resumeCancellableWith:
@Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { // 如果不需要调度,直接在当前线程执行协程运算 executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWith(result) } } } }
此方法会先判断是否需要进行调度,如果需要,就把this传递进去进行调度,dispatch传递的this是Runnable,而DispatchedContinuation是实现了Runnable的。
接下来就不再跟了,猜想一下就知道,肯定是调度器空闲时会把传递进去的拿出来执行,也就会调用回SuspendLambda.resume。
协程的挂起和恢复例子
fun main() { runBlocking { log("1") delay(100) log("2") delay(100) log("3") } log("4") }
生成的java代码:
public static final void main() { BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) { private CoroutineScope p$; Object L$0; int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { label17: { Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); CoroutineScope $this$runBlocking; switch(this.label) { case 0: ResultKt.throwOnFailure($result); $this$runBlocking = this.p$; HahaKt.log("1"); this.L$0 = $this$runBlocking; this.label = 1; if (DelayKt.delay(100L, this) == var3) { return var3; } break; case 1: $this$runBlocking = (CoroutineScope)this.L$0; ResultKt.throwOnFailure($result); break; case 2: $this$runBlocking = (CoroutineScope)this.L$0; ResultKt.throwOnFailure($result); break label17; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } HahaKt.log("2"); this.L$0 = $this$runBlocking; this.label = 2; if (DelayKt.delay(100L, this) == var3) { return var3; } } HahaKt.log("3"); return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); var3.p$ = (CoroutineScope)value; return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 1, (Object)null); log("4"); }
由上边协程创建与启动可知,我们在协程中的代码会被编译器生成一个SuspendLambda对象,最后会调用到SuspendLambda.resumeWith,而resumeWith会调用到invokeSuspend。其中invokeSuspend就是我们的协程中的代码,只不过按照遇到的挂起函数把代码分成了多段。
上边有行代码是挂起的判断依据:
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); if (DelayKt.delay(100L, this) == var3) { return var3; }
delay方法的返回值如果是var3 ,那么就表示delay还没有完成,就直接return了。
但它return后就不执行后边的代码了,那怎么办?
其实在调用delay时把当前的对象传进去了,而接收的参数类型是Continuation,那么就可以推断,在delay执行完成后会调用这个Continuation.resumeWith来重新进入invokeSuspend,为了能跳到下个case代码段,label在上次delay前就加一了,所以再次进入就直接往后执行了,接着如果在遇到挂起函数,那么再进行类似的判断即可。
总结
通过一步步的分析,慢慢发现协程其实有三层包装。
l 常用的launch和async返回的Job、Deferred,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自AbstractCoroutine,它是协程的第一层包装。
l 第二层包装是编译器生成的SuspendLambda的子类,封装了协程的真正运算逻辑,继承自BaseContinuationImpl,其中completion属性就是协程的第一层包装。
l 第三层包装是前面分析协程的线程调度时提到的DispatchedContinuation,封装了线程调度逻辑,包含了协程的第二层包装。
三层包装都实现了Continuation接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。
suspendCoroutineUninterceptedOrReturn
https://www.jianshu.com/p/2857993af646
在异步编程中,回调是非常常见的写法,那么如何将回调 转换为 协程中的挂起函数呢?
可以通过两个挂起函数suspendCoroutine{}或suspendCancellableCoroutine{}。
下面看如何将 OkHttp 的网络请求转换为挂起函数:
suspend fun <T> Call<T>.await(): T = suspendCoroutine { cont -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>, response: Response<T>) { if (response.isSuccessful) { cont.resume(response.body()!!) } else { cont.resumeWithException(ErrorResponse(response)) } } override fun onFailure(call: Call<T>, t: Throwable) { cont.resumeWithException(t) } }) }
上面的await()的扩展函数调用时,首先会挂起当前协程,然后执行enqueue将网络请求放入队列中,当请求成功时,手动调用cont.resume(response.body()!!)来恢复之前的协程。
再来看下suspendCoroutine{}和suspendCancellableCoroutine{}的定义:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T = suspendCoroutineUninterceptedOrReturn { c: Continuation<T> -> val safe = SafeContinuation(c.intercepted()) block(safe) safe.getOrThrow() } public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) // 和 suspendCoroutine 的区别就在这里,如果协程已经被取消或者已完成,就会抛出 CancellationException 异常 cancellable.initCancellability() block(cancellable) cancellable.getResult() }
suspendCoroutineUninterceptedOrReturn 这个方法调用的源码是看不到的,因为它根本没有源码,它的逻辑就是帮大家拿到 Continuation 实例,而此Continuation 实例就是编译器自动生成的包装了我们的lambda的SuspendLambda对象。会把此Continuation 传递到block中(也就是我们的lambda中),我们在执行完成后手动调用Continuation的resume相关函数来唤醒之前挂起的协程。
在上边的协程调度中分析可知c.intercepted()调用后会去找当前上下文中是否有调度器,如果有就返回一个包装了调度器的DispatchedContinuation对象,这样就能保证在我们手动调用resume来在原线程/线程池中恢复 挂起的协程。
协程中还有两个常见的挂起函数使用到了suspendCoroutineUninterceptedOrReturn()函数,分别是delay()和yield()。
delay 的实现
public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } } /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay internal actual val DefaultDelay: Delay = DefaultExecutor
yield 的实现
yield()的作用是挂起当前协程,然后将协程分发到 Dispatcher 的队列,这样可以让该协程所在线程或线程池可以运行其他协程逻辑,然后在 Dispatcher 空闲的时候继续执行原来协程。简单的来说就是让出自己的执行权,给其他协程使用,当其他协程执行完成或也让出执行权时,一开始的协程可以恢复继续运行。
yield()需要依赖协程的线程调度器,如果没有调度器那么就不会让协程挂起。
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val context = uCont.context // 检测协程是否已经取消或者完成,如果是的话抛出 CancellationException context.checkCompletion() // 如果协程没有线程调度器,或者像 Dispatchers.Unconfined 一样没有进行调度,则直接返回 val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit // dispatchYield(Unit) 最终会调用到 dispatcher.dispatch(context, block) 将协程分发到调度器队列中,这样线程可以执行其他协程 cont.dispatchYield(Unit) COROUTINE_SUSPENDED }