0%

第3章:Kotlin协程的基础设施

3.1 协程的构造

3.1.1 协程的创建

Kotlin 提供了一个 createCoroutine 函数用来创建协程:

1
2
3
fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>

其中 suspend() -> T 是 createCoroutin 函数的 Receiver,对 Kotlin 函数不了解的话,这个还是有点费解的。我们依次剖析 createCoroutine 的参数和返回值:

  • Receiver 是一个被 suspend 修饰的挂起函数,这也是协程协程的执行体(协程体)

  • 参数是 completion(是 Continuation 类型)会在协程体执行完后调用,实际上就是协程完成的回调

  • 返回值也是一个 Continuation 对象,由于现在协程仅仅被创建出来,因此需要通过这个值在之后触发协程的启动

以一个例子来说明 createCoroutine 的用法:

1
2
3
4
5
6
7
8
9
10
val continuation = suspend {
println("In Coroutine.")
5
}.createCoroutine(object: Continuation<Int> {
override val context = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
println("Coroutine End: $result")
}
})

目前协程被创建出来了,但是它还未启动。

3.1.2 协程的启动

在上述的例子中,我们已经创建了协程,之后只需要调用 continuation.resume(Unit) 之后,协程就会立即开始。为什么这样就可以触发协程体执行呢?

其实,我们创建协程得到的 continuation 是 SafeContinuation 的实例,不过这也是个“马甲”,它有个 delegate 属性,里面将 suspend() -> T 封装成了一个 Continuation 对象

也就是说,我们创建协程得到的 continuation 其实就是套了几层“马甲”的协程体,故调用这个 continuation.resume() 可以触发协程体的执行

一般来讲,我们创建协程之后就会启动它,所以标准库还提供了另一个一步到位的API——startCoroutin :

1
fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>)

我们知道,作为参数传入的 completion 就如同回调一样,**协程体的返回值会作为 resumeWith 的参数传入,例如,下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun main() {
suspend {
println("我跑在协程里")
3
}.startCoroutine(object: Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
println("协程执行完成,result = $result")
}
})
}

如果协程体执行正常,则 result 为 Success 结果;否则返回 Failure 结果。上述代码结果如下:

我跑在协程里
//协程执行完成,result = Failure(java.lang.IllegalStateException: 抛出异常)
协程执行完成,result = Success(3)

3.1.3 协程体的 Receiver

与协程创建和启动相关的 API 还有一组:

1
2
3
4
5
6
fun <R, T> (suspend R.() -> T).create(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>

// start 方法略

上述的 R 可以为协程体提供一个作用域,在协程体内我们可以直接使用,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//先把上述方法封装下,如果不封装怎么调用?
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}

override val context = EmptyCoroutineContext
})
}


//创建额外的域
class ProducerScope<T> {
suspend fun produce(value: T) {
println("我在额外的 Receiver 中了: $value")
}
}



//使用
suspend fun main() {
launchCoroutine(ProducerScope<Int>()) {
println("In Coroutine.")
produce (1024)
//delay (1000) //我自己测试的时候,如果放开这个注释,就不会打到 2048,coroutine end 也不会执行到
produce (2048)
}
}

这里要注意的一点是,如果额外的域 ProducerScope 添加了 @RestrictsSuspension 注解,则无法使用外部的函数,因此例子中的 delay 也不会调用

3.1.4 可挂起的 main 函数

前面我们一直这样写:

1
2
3
suspend fun main() {
....
}

这样就能在程序入口就获得一个协程了,这到底是怎么实现的呢?首先,我们能确认2点:

  • JVM 压根就不知道什么挂起函数,kotlin协程

  • JVM 肯定有一个 main 函数的

为了搞清楚原理,我们可以对 Kotlin 反编译成 Java 代码(IDEA的 Kotlin byteCode 功能):

1
2
3
public static void main(String[] var0) {
RunSuspendKt.runSuspend(new KotlinMainKt$$$main(var0));
}

可以看到,其实它是有真正的 main 函数的,里面的协程封装逻辑都被扔到 RunSuspendKt里面了

3.2.1 挂起函数

整个 kotlin 语境下有 2种函数: 普通函数和挂起函数,其中:挂起函数能调用任何函数,但是普通函数不能调用挂起函数。

所谓的协程挂起其实就是程序执行流程发生异步调用时,当前调用流程进入等待状态。注意:挂起函数不一定真的会挂起,只是提供了挂起的条件,那额什么情况才会真正挂起呢?

3.2.2 挂起点

回想下协程的创建过程,我们的协程体本身就是一个 Continuation 实例,正因如此,挂起函数才能在协程体内运行。在协程体内部,挂起函数的调用处称为挂起点,挂起点如果出现异步调用,当前协程就会被挂起,直到对应的 Continuation.resume 函数被调用才会恢复执行。

异步调用如何发生,取决于 resume 函数与对应的挂起函数的调用是否在相同的调用栈上

3.2.3 CPS 变换

我们知道,挂起函数如果需要挂起,则需要通过 suspendCoroutine 来获取 Continuation 实例,我们已经知道它是协程体封装成的 Continuation,但是这个实例是怎么传入的呢?先看下面的suspend 函数:

1
2
3
suspend fun notSuspend() = suspendCoroutine<Int> { continuation ->
continuation.resume(100)
}

看起来这个方法没有接收任何参数,kotlin 中看不出来我们就用 Java 直接调用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Object result = KotlinMainKt.notSuspend(new Continuation<Integer>() {
@NotNull
@Override
public CoroutineContext getContext() {
return null;
}

@Override
public void resumeWith(@NotNull Object o) {

}
});
}

我们发现用Java 调用的时候,需要传入一个 Continuation ,也就是说kotlin 中的 suspend() -> Int 类型 在Java 看来实际上是 Continuation -> Object 类型 !

这与我们平时写的异步回调相似,传入callback 等待结果回调。但是为什么会有返回值 Object?这里的 Object 会有 2 种情况:

  • 挂起函数同步返回时:作为参数传入的 Continuation 的 resumeWith 不会被调用,函数实际地返回值 Object 就是挂起函数的返回值

  • 挂起函数挂起,执行异步逻辑。此时函数返回值 Object 是一个挂起标志,通过这个标志外部协程就可以知道该函数需要挂起等到异步逻辑执行。

挂起标志是一个常量,定义在 Intrinsics.kt 当中:

1
2
3
4
5
6
7
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons {
COROUTINE_SUSPENDED,
UNDECIDED,
RESUMED
}

现在大家知道了原来挂起函数就是普通函数参数中多了一个 Continuation 实例,这也难怪普通函数不能调用挂起函数,但是挂起函数可以调用普通函数的原因

还可以仔细想想,为什么Kotlin 语法要求挂起函数一定要运行在协程体内或者挂起函数中呢?答案是:协程体或者挂起函数中都隐含了 Continuation 实例

3.3 协程的上下文

3.3.1 协程上下文的集合特征

协程的 Context 更像是个 List 结构,都有空的表示:

1
2
3
var list: List<Int> = emptyList()

var context: CoroutineContext = EmptyCoroutineContext

接下来,我们往里面添加数据(要记得将 list 和 context 设置为 var 而不是 val ,我就是因为设置 val 所以不能用 “+” 操作):

1
2
3
4
list += 0   //添加一个元素,得到一个新的list
list += listOf<Int>(1,2,3) //将listOf中的元素都添加进去,生成一个新的 list

context += EmptyCoroutineContext

协程 Context 是一个集合,那么它的元素类型是什么呢?看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
}



/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
//省略其他代码
}

Element 是本身也实现了 CoroutineContext 接口,这看上去就好像 Int 实现了 List 接口一样,这就很奇怪。其实这主要是为了 API 设计方便,Element 中是不会存放除了它自己以外的其他数据的(这句话其实不太明白,还需要后续的理解

Element 中有个属性 key ,这个属性很关键,虽然我们往 list中添加元素时没有明确指出,但是我们都知道 list 中的元素都有一个 index 索引,而这里的协程上下文Element 的 key 就是这个集合中元素的索引,不同之处是这个索引“长”在数据里面,意味着上下文的数据在出生时就找到了自己的位置(这句话同样不太理解。。。)

可能有人觉得协程 Context 和 Map 似乎更近,为什么这里要与 List 对比呢?一是 List 的 Key 类型是固定的 Int ,而 Map 的Key 有很多种类型;二是是协程上下文内部实现是一个单链表,这也反映出它与 List 之间的关系。

3.3.2 协程上下文元素的实现

上一节知道协程 Conext 是个接口,实际上还有个抽象类 AbstractCoroutineContextElement,能让我们实现协程上下文更加方便:

1
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

创建元素并不难,只需要提供对应的 Key 即可,以下是协程名的实现(系统源码):

1
2
3
4
5
6
7
8
9
10
11
public data class CoroutineName(
/**
* User-defined coroutine name.
*/
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
/**
* Key for [CoroutineName] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineName>
}

以下是协程异常处理器的实现源码:

1
2
3
4
5
6
7
8
class CoroutineExceptionHandler(val onErrorAction: (Throwable) -> Unit) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

fun onError(error: Throwable) {
error.printStackTrace()
onErrorAction(error)
}
}

3.3.3 协程上下文的使用

前面说了,可以为协程上下文添加多个 Context,添加好之后,我们可以将 Context 绑定到协程上了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend {
println("我在协程里面")
5
}.createCoroutine(object: Continuation<Int> {
var tempContext: CoroutineContext = CoroutineName("name-01") +
CoroutineExceptionHandler(){
println("出错啦")
}

override val context: CoroutineContext = tempContext

override fun resumeWith(result: Result<Int>) {
// todo
}
})

先自己构建出 tempContext 然后再将 tempContext 赋值给 context。绑定了协程的上下文,我们的协程就初步成型了!接下来演示如何使用这个CoroutineExceptionHandler:

1
2
3
4
5
override fun resumeWith(result: Result<Int>) {
result.onFailure {
context[CoroutineExceptionHandler]?.onError(it)
}
}

不管结果如何,这个 resumeWith 是一定会被调用的,如果有异常出现,我们就从协程上下文找到 CoroutineExceptionHandler 实例,调用它的 onError 方法即可,这个上下文在协程内部都是可以直接获取的,比如,在协程内部获取名字:

1
2
3
4
suspend {
println("协程名字:${coroutineContext[CoroutineName]?.name}")
5
}.createCoroutine(object: Continuation<Int>

这样,我们就知道了协程上下文的设置和获取方法了。

3.4 协程的拦截器

之前的内容知道 Kotlin 协程通过调用挂起函数实现挂起,通过Continuation 的恢复调用来实现恢复,还可以通过 Context 的设置来丰富协程能力,那么,如果处理线程的调度?其实标准库还提供了拦截器(Interceptor)的组件,允许我们拦截协程异步回调时的恢复调用,那么线程调度应该也不是什么难事。

3.4.2 拦截器的使用

拦截器 ContinuationInterceptor 继承了 CoroutineContext.Element ,而Element 又继承 CoroutineContext 类型,所以拦截器也是 上下文 的一种实现

1
2
3
4

public interface ContinuationInterceptor : CoroutineContext.Element

public interface Element : CoroutineContext

自己定义拦截器只需要实现拦截器接口 ContinuationInterceptor 即可,比如打印日志(注意:拦截器的 Key 是一个固定值: ContinuationInterceptor,协程执行时会拿到拦截器并拦截):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class LogInterceptor() : ContinuationInterceptor{
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
return LogContinuation(continuation)
}
}

class LogContinuation<T> (private val continuation: Continuation<T>): Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("执行前")
continuation.resumeWith(result)
println("执行后")
}
}

接下来使用的时候,我们就将拦截器作为context就好了,前面说了拦截器本来就是上下文:

1
2
3
4
5
6
7
8
9
10
11
12
fun haha (){
val continuation = suspend {
println("我在协程里面")
5
}.startCoroutine(object : Continuation<Int>{
override val context: CoroutineContext = LogInterceptor()

override fun resumeWith(result: Result<Int>) {
//。。。
}
})
}

3.4.3 拦截器的执行细节

3.5 Kotlin 协程所属类别

谢谢你的鼓励