在上一章中,我们了解 suspend 函数的原理,在里面我们看到了很多 kotlin 协程标准库里面的类 Continuation CoroutineContext 等等,这一章中我们就来详细介绍它们。
一. Continuation 接口
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
从上一章内容知道,这个 Continuation 实例非常重要,它代表着调用方协程体,通过 resumeWith 方法让调用方从挂起点恢复,继续执行下面的代码。
二. CoroutineContext 接口
在 Continuation 中有一个重要成员 context ,是 CoroutineContext 实例,即协程上下文对象。
/**
* Persistent context for the coroutine. It is an indexed set of [Element] instances.
* An indexed set is a mix between a set and a map.
* Every element in this set has a unique [Key].
*/
@SinceKotlin("1.3")
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
/**
* Accumulates entries of this context starting with [initial] value and applying [operation]
* from left to right to current accumulator value and each element of this context.
*/
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
/**
* Returns a context containing elements from this context, but without an element with
* the specified [key].
*/
public fun minusKey(key: Key<*>): CoroutineContext
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
*/
public interface Key<E : Element>
/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
乍一看 CoroutineContext 接口,给人的感觉非常奇怪, 就好像 String 实现了 List<String> 。
其实 CoroutineContext 本身就相当于 list 或者 map 的混合体,
2.1 Key 接口
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
*/
public interface Key<E : Element>
CoroutineContext 储存着 Element 集合,而 Element 有一个 key 成员属性,它的类型就是 Key 。 CoroutineContext 的 get(key: Key<E>): E? 方法通过 Element 的 key 获取对应的 Element 实例。
2.2 Element 接口
/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
协程上下文 CoroutineContext 对象中存储的就是 Element 索引集合,索引就是 Element 的 key 。
而 Element 是 CoroutineContext 的子接口,表示 Element 本身也是一个协程上下文 CoroutineContext 对象,也有一个 Element 索引集合,但是这个集合中只有一个值,就是 Element 本身,是一个单元素索引集合。
-
val key: Key<*>:Element有一个重要属性就是key,它是Key类型。 -
get方法
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
因为 Element 是一个单元素集合,所以当 key 相同的时候,返回 Element 本身,否则就是返回 null
-
fold方法
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
因为是单元素集合,进行累加操作时,只需要将 Element 本身传递给 operation 方法就行了。
-
minusKey方法
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
因为是单元素集合,当 key 相同的时候,将集合中唯一的元素删除了,返回一个空集合对象 EmptyCoroutineContext ,否则就什么都不做,返回原集合。
2.3 CoroutineContext 的重要方法
2.3.1 get 方法
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
根据 key 从协程上下文元素集合中,获取对应的元素 Element, 如果元素不存在,就返回空。
注: 这个
key值本身就是元素Element中的成员属性,这就是协程上下文元素集合即像List又像Map的原因。
2.3.2 fold 方法
/**
* Accumulates entries of this context starting with [initial] value and applying [operation]
* from left to right to current accumulator value and each element of this context.
*/
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
相当于一个累加器的作用。
- 给定一个初始值
initial, 与上下文元素集合第一个元素进行operation的操作,得到和初始值相同类型R累加值sum - 让这个
sum与上下文元素集合第二个元素继续上述操作,直到上下文元素集合所有元素都如此操作,得到最后的累加值,并返回它。
从这里来看,我们知道CoroutineContext是一个有序集合,能够从头到尾地遍历集合元素。
2.3.3 minusKey 方法
/**
* Returns a context containing elements from this context, but without an element with
* the specified [key].
*/
public fun minusKey(key: Key<*>): CoroutineContext
返回删除特定 key 元素后上下文 CoroutineContext
2.3.4 plus 方法
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
两个协程上下文相加,合并两个上下文中的 Element 集合,如果两个集合中有相同元素,即 Element 的 key 相等,那么后一个上下文集合的 Element 元素会覆盖前一个上下文集合中对应的 Element 元素。
方法流程分析:
- 当
context是EmptyCoroutineContext, 即协程上下文集合中元素为空,那么就直接返回自身。 - 调用
context的fold方法,即将遍历后一个上下文集合元素,与前一个上下文集合进行操作。 - 通过
val removed = acc.minusKey(element.key)方法,移除上下文集合中相同元素。 - 如果
removed === EmptyCoroutineContext表示累加集合为空了,就直接返回element, 否则就是返回CombinedContext(removed, element)合并集合。
CombinedContext相当于Element的递归列表,具有不变性,在多线程环境下没有并发问题。
- 如果集合中有
ContinuationInterceptor元素,一般我们会将它移动到集合尾部。
这个方法是 CoroutineContext 最重要的方法,从两个协程上下文相加,得到一个新的协程上下文。具体有三个特性:
- 两个上下文集合元素合并,如果有相同元素,则前一个上下文集合对应元素删除,后一个上下文集合添加进去。
客观上实现了后一个集合元素覆盖前一个集合元素,并且改变了元素的在集合汇总的顺序。这个是非常重要的,因为上下文集合是一个有序集合,很注重元素在集合中的位置。
- 协程拦截器
ContinuationInterceptor类型元素毕竟特殊,如果集合中有这个类型元素,我们会将它移动到集合末尾。
2.4 示例
data class FirstCoroutineContext(val name: String)
: AbstractCoroutineContextElement(FirstCoroutineContext) {
companion object Key : CoroutineContext.Key<FirstCoroutineContext>
override fun toString() = "$name"
}
data class SecondCoroutineContext(val name: String)
: AbstractCoroutineContextElement(SecondCoroutineContext) {
companion object Key : CoroutineContext.Key<SecondCoroutineContext>
override fun toString() = "$name"
}
data class MyContinuationInterceptor(val name: String)
: ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = MyContinuationInterceptor
companion object Key : CoroutineContext.Key<MyContinuationInterceptor>
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = continuation
override fun toString() = "$name"
}
fun main() {
val firstContext = FirstCoroutineContext("firstContext")
val secondContext = SecondCoroutineContext("secondContext")
var context = firstContext + secondContext
println("$context")
val firstContext1 = FirstCoroutineContext("firstContext1")
context += firstContext1
println("$context")
val interceptor = MyContinuationInterceptor("interceptor")
context += interceptor
println("$context")
context += secondContext
println("$context")
}
执行结果:
[firstContext, secondContext]
[secondContext, firstContext1]
[secondContext, firstContext1, interceptor]
[firstContext1, secondContext, interceptor]
结果分析:
-
正常相加,且没有相同
KEY的元素,所以集合中元素顺序就是[firstContext, secondContext] -
因为新增的
firstContext1的类型是FirstCoroutineContext,会覆盖集合中相同类型元素,并改变顺序;集合中元素顺序就是[secondContext, firstContext1] -
添加一个
ContinuationInterceptor元素interceptor,集合顺序就是[secondContext, firstContext1, interceptor] -
再次添加元素
secondContext, 会删除之前集合中存在的secondContext元素,又因为interceptor必须放在集合末尾,所以集合顺序就是[firstContext1, secondContext, interceptor]。
三. SafeContinuation 类
在上一章中,经常看到将协程体 Continuation 通过 SafeContinuation 对象包装一下,我们来看看 SafeContinuation 类的作用。
3.1 SafeContinuation
@PublishedApi
@SinceKotlin("1.3")
internal expect class SafeContinuation<in T> : Continuation<T> {
internal constructor(delegate: Continuation<T>, initialResult: Any?)
@PublishedApi
internal constructor(delegate: Continuation<T>)
@PublishedApi
internal fun getOrThrow(): Any?
override val context: CoroutineContext
override fun resumeWith(result: Result<T>): Unit
}
我们发现 SafeContinuation 有三个成员属性:
-
context: 从Continuation中继承而来的,协程上下文对象 -
delegate: 被代理的协程体对象 -
initialResult: 初始值
有两个方法:
-
resumeWith方法: 从Continuation继承,恢复协程体的方法。 -
getOrThrow方法: 获取结果值的方法。
3.2 SafeContinuationJvm
SafeContinuationJvm 是 SafeContinuation jvm 平台实现类
@PublishedApi
@SinceKotlin("1.3")
internal actual class SafeContinuation<in T>
internal actual constructor(
private val delegate: Continuation<T>,
initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
@PublishedApi
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
public actual override val context: CoroutineContext
get() = delegate.context
@Volatile
private var result: Any? = initialResult
private companion object {
@Suppress("UNCHECKED_CAST")
@JvmStatic
private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
)
}
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? =
null
override fun toString(): String =
"SafeContinuation for $delegate"
}
-
context: 直接获取的是代理协程体delegate.context的上下文对象 -
result:结果值,一开始是初始化值initialResult,默认就是UNDECIDED
有三个特殊值:
UNDECIDED表示不确定的值;COROUTINE_SUSPENDED表示挂起值;RESUMED表示恢复。
-
RESULT:通过它实现对result原子性更新操作
3.2.1 resumeWith 方法
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
- 当
result = UNDECIDED, 表示SafeContinuation在初始状态下调用了resumeWith, 那么将result值设置成resumeWith的参数值就行了。 - 当
result = COROUTINE_SUSPENDED, 表示被代理的协程delegate已经挂起,将result原子性更新成RESUMED,更新成功,就调用协程delegate的resumeWith方法恢复被代理协程;更新失败,说明有其他线程更新成功。
3.2.2 getOrThrow 方法
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
- 当
result === UNDECIDED时,表示没有调用SafeContinuation的resumeWith方法,即协程需要挂起。将result的值设置成COROUTINE_SUSPENDED,并返回COROUTINE_SUSPENDED值,让被代理协程挂起。 - 当
result === RESUMED, 表示被代理协程已经恢复了,让它上游的协程继续挂起。 - 当
result is Result.Failure, 结果值失败,就抛出异常 - 最后返回成功的结果值。
四. ContinuationImpl 类
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
它是 BaseContinuationImpl 的子类,所有 suspend 挂起函数的状态机都是继承这个类。
有三个成员属性:
-
completion:当本协程完成之后,会调用completion的resumeWith方法唤醒对应协程。 -
_context和context: 协程上下文对象 -
intercepted: 协程拦截器对象
有两个方法:
-
intercepted():获取设置的协程拦截器对象intercepted,或者从协程上下文context获取协程拦截器对象,并调用interceptContinuation方法,来封装本ContinuationImpl对象。如果没有协程拦截器对象,那么不做任何操作,直接返回ContinuationImpl对象。 -
releaseIntercepted(): 调用协程拦截器对象的releaseInterceptedContinuation方法,进行释放操作。
五. BaseContinuationImpl 类
@SinceKotlin("1.3")
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
public override fun toString(): String =
"Continuation at ${getStackTraceElement() ?: this::class.java.name}"
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = completion as? CoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement? =
getStackTraceElementImpl()
}
它是 Continuation 的子类,我们只需要分析 resumeWith 方法;
在 resumeWith 方法中主要是调用 invokeSuspend(param) 方法
- 如果返回值是
COROUTINE_SUSPENDED, 表示本协程被挂起了,直接返回。如果返回值不是COROUTINE_SUSPENDED,就得到结果值outcome。 - 如果
completion是BaseContinuationImpl的子类,那么就循环向上调用,直到completion不是BaseContinuationImpl的子类, 那么就代表最顶层,直接调用resumeWith方法返回结果值。













网友评论