美文网首页
Kotlin Coroutine 标准库源码解析

Kotlin Coroutine 标准库源码解析

作者: wo883721 | 来源:发表于2021-07-21 19:51 被阅读0次

上一章中,我们了解 suspend 函数的原理,在里面我们看到了很多 kotlin 协程标准库里面的类 Continuation CoroutineContext 等等,这一章中我们就来详细介绍它们。

一. Continuation 接口

/**
 * Interface representing a continuation after a suspension point that returns a value of type `T`.
 */
@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

从上一章内容知道,这个 Continuation 实例非常重要,它代表着调用方协程体,通过 resumeWith 方法让调用方从挂起点恢复,继续执行下面的代码。

二. CoroutineContext 接口

Continuation 中有一个重要成员 context ,是 CoroutineContext 实例,即协程上下文对象。

/**
 * Persistent context for the coroutine. It is an indexed set of [Element] instances.
 * An indexed set is a mix between a set and a map.
 * Every element in this set has a unique [Key].
 */
@SinceKotlin("1.3")
public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key].
     */
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

    /**
     * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
     */
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

乍一看 CoroutineContext 接口,给人的感觉非常奇怪, 就好像 String 实现了 List<String>
其实 CoroutineContext 本身就相当于 list 或者 map 的混合体,

2.1 Key 接口

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

CoroutineContext 储存着 Element 集合,而 Element 有一个 key 成员属性,它的类型就是 KeyCoroutineContextget(key: Key<E>): E? 方法通过 Elementkey 获取对应的 Element 实例。

2.2 Element 接口

    /**
     * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
     */
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }

协程上下文 CoroutineContext 对象中存储的就是 Element 索引集合,索引就是 Elementkey
ElementCoroutineContext 的子接口,表示 Element 本身也是一个协程上下文 CoroutineContext 对象,也有一个 Element 索引集合,但是这个集合中只有一个值,就是 Element 本身,是一个单元素索引集合。

  1. val key: Key<*> : Element 有一个重要属性就是 key,它是 Key 类型。

  2. get 方法

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

因为 Element 是一个单元素集合,所以当 key 相同的时候,返回 Element 本身,否则就是返回 null

  1. fold 方法
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

因为是单元素集合,进行累加操作时,只需要将 Element 本身传递给 operation 方法就行了。

  1. minusKey 方法
        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this

因为是单元素集合,当 key 相同的时候,将集合中唯一的元素删除了,返回一个空集合对象 EmptyCoroutineContext ,否则就什么都不做,返回原集合。

2.3 CoroutineContext 的重要方法

2.3.1 get 方法

   /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?

根据 key 从协程上下文元素集合中,获取对应的元素 Element, 如果元素不存在,就返回空。

注: 这个 key 值本身就是元素 Element 中的成员属性,这就是协程上下文元素集合即像 List 又像 Map 的原因。

2.3.2 fold 方法

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

相当于一个累加器的作用。

  1. 给定一个初始值 initial, 与上下文元素集合第一个元素进行 operation 的操作,得到和初始值相同类型R 累加值 sum
  2. 让这个 sum 与上下文元素集合第二个元素继续上述操作,直到上下文元素集合所有元素都如此操作,得到最后的累加值,并返回它。
    从这里来看,我们知道 CoroutineContext 是一个有序集合,能够从头到尾地遍历集合元素。

2.3.3 minusKey 方法

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key].
     */
    public fun minusKey(key: Key<*>): CoroutineContext

返回删除特定 key 元素后上下文 CoroutineContext

2.3.4 plus 方法

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

两个协程上下文相加,合并两个上下文中的 Element 集合,如果两个集合中有相同元素,即 Elementkey 相等,那么后一个上下文集合的 Element 元素会覆盖前一个上下文集合中对应的 Element 元素。

方法流程分析:

  1. contextEmptyCoroutineContext, 即协程上下文集合中元素为空,那么就直接返回自身。
  2. 调用 contextfold 方法,即将遍历后一个上下文集合元素,与前一个上下文集合进行操作。
  3. 通过 val removed = acc.minusKey(element.key) 方法,移除上下文集合中相同元素。
  4. 如果 removed === EmptyCoroutineContext 表示累加集合为空了,就直接返回 element, 否则就是返回 CombinedContext(removed, element) 合并集合。

CombinedContext 相当于 Element 的递归列表,具有不变性,在多线程环境下没有并发问题。

  1. 如果集合中有 ContinuationInterceptor 元素,一般我们会将它移动到集合尾部。

这个方法是 CoroutineContext 最重要的方法,从两个协程上下文相加,得到一个新的协程上下文。具体有三个特性:

  1. 两个上下文集合元素合并,如果有相同元素,则前一个上下文集合对应元素删除,后一个上下文集合添加进去。

客观上实现了后一个集合元素覆盖前一个集合元素,并且改变了元素的在集合汇总的顺序。这个是非常重要的,因为上下文集合是一个有序集合,很注重元素在集合中的位置。

  1. 协程拦截器 ContinuationInterceptor 类型元素毕竟特殊,如果集合中有这个类型元素,我们会将它移动到集合末尾。

2.4 示例

data class FirstCoroutineContext(val name: String)
    : AbstractCoroutineContextElement(FirstCoroutineContext) {
    companion object Key : CoroutineContext.Key<FirstCoroutineContext>

    override fun toString() = "$name"
}

data class SecondCoroutineContext(val name: String)
    : AbstractCoroutineContextElement(SecondCoroutineContext) {
    companion object Key : CoroutineContext.Key<SecondCoroutineContext>

    override fun toString() = "$name"
}

data class MyContinuationInterceptor(val name: String)
    : ContinuationInterceptor {
    override val key: CoroutineContext.Key<*> = MyContinuationInterceptor
    companion object Key : CoroutineContext.Key<MyContinuationInterceptor>
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = continuation
    override fun toString() = "$name"
}

fun main() {
    val firstContext = FirstCoroutineContext("firstContext")
    val secondContext = SecondCoroutineContext("secondContext")

    var context = firstContext + secondContext
    println("$context")

    val firstContext1 = FirstCoroutineContext("firstContext1")
    context += firstContext1
    println("$context")

    val interceptor = MyContinuationInterceptor("interceptor")
    context += interceptor
    println("$context")

    context += secondContext
    println("$context")
}

执行结果:

[firstContext, secondContext]
[secondContext, firstContext1]
[secondContext, firstContext1, interceptor]
[firstContext1, secondContext, interceptor]

结果分析:

  1. 正常相加,且没有相同 KEY 的元素,所以集合中元素顺序就是 [firstContext, secondContext]

  2. 因为新增的 firstContext1 的类型是 FirstCoroutineContext ,会覆盖集合中相同类型元素,并改变顺序;集合中元素顺序就是 [secondContext, firstContext1]

  3. 添加一个 ContinuationInterceptor 元素 interceptor,集合顺序就是 [secondContext, firstContext1, interceptor]

  4. 再次添加元素 secondContext, 会删除之前集合中存在的 secondContext 元素,又因为 interceptor 必须放在集合末尾,所以集合顺序就是 [firstContext1, secondContext, interceptor]

三. SafeContinuation 类

上一章中,经常看到将协程体 Continuation 通过 SafeContinuation 对象包装一下,我们来看看 SafeContinuation 类的作用。

3.1 SafeContinuation

@PublishedApi
@SinceKotlin("1.3")
internal expect class SafeContinuation<in T> : Continuation<T> {
    internal constructor(delegate: Continuation<T>, initialResult: Any?)

    @PublishedApi
    internal constructor(delegate: Continuation<T>)

    @PublishedApi
    internal fun getOrThrow(): Any?

    override val context: CoroutineContext
    override fun resumeWith(result: Result<T>): Unit
}

我们发现 SafeContinuation 有三个成员属性:

  1. context : 从 Continuation 中继承而来的,协程上下文对象
  2. delegate : 被代理的协程体对象
  3. initialResult : 初始值

有两个方法:

  1. resumeWith 方法: 从 Continuation 继承,恢复协程体的方法。
  2. getOrThrow 方法: 获取结果值的方法。

3.2 SafeContinuationJvm

SafeContinuationJvmSafeContinuation jvm 平台实现类

@PublishedApi
@SinceKotlin("1.3")
internal actual class SafeContinuation<in T>
internal actual constructor(
    private val delegate: Continuation<T>,
    initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
    @PublishedApi
    internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)

    public actual override val context: CoroutineContext
        get() = delegate.context

    @Volatile
    private var result: Any? = initialResult

    private companion object {
        @Suppress("UNCHECKED_CAST")
        @JvmStatic
        private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
            SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
        )
    }

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }

    @PublishedApi
    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = delegate as? CoroutineStackFrame

    override fun getStackTraceElement(): StackTraceElement? =
        null

    override fun toString(): String =
        "SafeContinuation for $delegate"
}
  1. context : 直接获取的是代理协程体 delegate.context 的上下文对象
  2. result :结果值,一开始是初始化值 initialResult ,默认就是 UNDECIDED

有三个特殊值: UNDECIDED 表示不确定的值;COROUTINE_SUSPENDED 表示挂起值;RESUMED 表示恢复。

  1. RESULT :通过它实现对 result 原子性更新操作

3.2.1 resumeWith 方法

public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }
  1. result = UNDECIDED, 表示 SafeContinuation 在初始状态下调用了 resumeWith, 那么将 result 值设置成 resumeWith 的参数值就行了。
  2. result = COROUTINE_SUSPENDED , 表示被代理的协程 delegate 已经挂起,将 result 原子性更新成 RESUMED ,更新成功,就调用协程 delegateresumeWith 方法恢复被代理协程;更新失败,说明有其他线程更新成功。

3.2.2 getOrThrow 方法

    @PublishedApi
    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }
  1. result === UNDECIDED 时,表示没有调用 SafeContinuationresumeWith 方法,即协程需要挂起。将 result 的值设置成 COROUTINE_SUSPENDED ,并返回 COROUTINE_SUSPENDED 值,让被代理协程挂起。
  2. result === RESUMED , 表示被代理协程已经恢复了,让它上游的协程继续挂起。
  3. result is Result.Failure , 结果值失败,就抛出异常
  4. 最后返回成功的结果值。

四. ContinuationImpl 类

@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

它是 BaseContinuationImpl 的子类,所有 suspend 挂起函数的状态机都是继承这个类。
有三个成员属性:

  1. completion :当本协程完成之后,会调用 completionresumeWith 方法唤醒对应协程。
  2. _contextcontext : 协程上下文对象
  3. intercepted : 协程拦截器对象

有两个方法:

  1. intercepted() :获取设置的协程拦截器对象 intercepted,或者从协程上下文 context 获取协程拦截器对象,并调用 interceptContinuation 方法,来封装本 ContinuationImpl 对象。如果没有协程拦截器对象,那么不做任何操作,直接返回 ContinuationImpl 对象。

  2. releaseIntercepted() : 调用协程拦截器对象的 releaseInterceptedContinuation 方法,进行释放操作。

五. BaseContinuationImpl 类

@SinceKotlin("1.3")
internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }

    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }

    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }

    public override fun toString(): String =
        "Continuation at ${getStackTraceElement() ?: this::class.java.name}"

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = completion as? CoroutineStackFrame

    public override fun getStackTraceElement(): StackTraceElement? =
        getStackTraceElementImpl()
}

它是 Continuation 的子类,我们只需要分析 resumeWith 方法;
resumeWith 方法中主要是调用 invokeSuspend(param) 方法

  1. 如果返回值是 COROUTINE_SUSPENDED , 表示本协程被挂起了,直接返回。如果返回值不是 COROUTINE_SUSPENDED,就得到结果值 outcome
  2. 如果 completionBaseContinuationImpl 的子类,那么就循环向上调用,直到 completion 不是 BaseContinuationImpl 的子类, 那么就代表最顶层,直接调用 resumeWith 方法返回结果值。

相关文章

网友评论

      本文标题:Kotlin Coroutine 标准库源码解析

      本文链接:https://www.haomeiwen.com/subject/qyjjmltx.html