Kotlin 协程

Kotlin 协程已经成为了谷歌官方推荐的异步任务处理方式。协程的英文单词是 Coroutines,Coroutines 这个单词实际上是一个组合单词,它是由 Co + routines 组合而成的。Co 在这里指的是 cooperation (协作),routines 在英文当中表达的意思是叫例行日程。利用协作的方式去帮助我们完成例行日程,就是协程的含义。
很多编程语言上都会有协程,在不同的编程语言上,协程的实现都有所差异。

协程不是为了替代线程而存在的,他是为了封装线程而存在的,是一种高效且方便的用于线程管理的框架。

Kotlin 协程官方描述如下:

One can think of a coroutine as a light-weight thread. Like threads, coroutines can run in parallel, wait for each other and communicate. The biggest difference is that coroutines are very cheap, almost free: we can create thousands of them, and pay very little in terms of performance. True threads, on the other hand, are expensive to start and keep around. A thousand threads can be a serious challenge for a modern machine.

特点:

  1. 轻量高效
  2. 用同步的方式编写异步代码

源码基于 org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.4.1

suspend 关键字

一个函数如果被声明为 suspend,这个函数就变成了挂起函数。一个挂起函数,它只能在另外一个挂起函数或者是在一个协程作用域当中才能调用。
suspend 是指函数可以在一个特定的时间点被挂起,然后它会被存储到某一个状态,暂停它的运行,挂起的不是线程而是协程。resume 指的是可以恢复之前挂起函数的状态,让它从当时被挂起的地方,继续向下执行。

suspend 关键字只起到了标志这个函数是一个耗时操作,放在协程中才有意义。

作用域

协程作用域就是调用挂起函数的入口。

1
2
3
4
5
6
7
8
9
10
public interface CoroutineScope {
/**
* The context of this scope.
* Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
* Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
*
* By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
*/
public val coroutineContext: CoroutineContext
}

CoroutineScope 函数的参数列表中有一个叫 CoroutineContext 的参数,可以将 CoroutineContext 简单理解成是一种 Set 集合,又因为 CoroutineContext 里面重载了加号运算符,所以多个 CoroutineContext 元素之间可以使用加号来连接。

例如:

1
val scope = CoroutineScope(Dispatchers.Main + Job())

上面的示例代码中,Dispatchers.Main 和 Job 其实都是 CoroutineContext 对象。

coroutineScope

1
2
3
4
5
6
7
8
9
10
// CoroutineScope.kt
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

使用 coroutineScope 构建器声明自己的作用域。它会创建一个协程作用域并且在所有已启动子协程执行完毕之前不会结束。

1
2
3
4
5
6
7
8
9
10
// CoroutineScope.kt
coroutineScope { // 创建一个协程作用域
launch {
delay(500L)
println("Task from nested launch")
}

delay(100L)
println("Task from coroutine scope")
}

GlobalScope

不受父协程的控制。

1
2
3
4
5
6
7
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}

MainScope

UI 作用域的协程。

1
2
// CoroutineScope.kt
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

supervisorScope

1
2
3
4
5
6
7
8
9
10
// Supervisor.kt
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = SupervisorCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

此作用域的协程失败后不会对父协程和子协程产生影响。

CoroutineContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Persistent context for the coroutine. It is an indexed set of [Element] instances.
* An indexed set is a mix between a set and a map.
* Every element in this set has a unique [Key].
*/
@SinceKotlin("1.3")
public interface CoroutineContext {
//操作符[]重载,可以通过CoroutineContext[Key]这种形式来获取与Key关联的Element
public operator fun <E : Element> get(key: Key<E>): E?

//它是一个聚集函数,提供了从left到right遍历CoroutineContext中每一个Element的能力,并对每一个Element做operation操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R

//操作符+重载,可以CoroutineContext + CoroutineContext这种形式把两个CoroutineContext合并成一个
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

//返回一个新的CoroutineContext,这个CoroutineContext删除了Key对应的Element
public fun minusKey(key: Key<*>): CoroutineContext

//Key定义,空实现,仅仅做一个标识
public interface Key<E : Element>

/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {}
}

CoroutineContext 是一个特殊的集合,这个集合它既有 Map 的特点,也有 Set 的特点,集合的每一个元素都是 Element,每个 Element 都有一个 Key 与之对应。

CoroutineContext 主要由以下 4 个 Element 组成:

  1. Job: 协程的唯一标识,用来控制协程的生命周期;
  2. CoroutineDispatcher:指定协程运行的线程(IO、Default、Main、Unconfined);
  3. CoroutineName:指定协程的名称,默认为 coroutine;
  4. CoroutineExceptionHandler:指定协程的异常处理器,用来处理未捕获的异常。

Job

Job 是协程的唯一标识,包含了这个协程任务的一系列状态,如下:
Kotlin The states of job's life cycle

由于协程是结构化的,取消父协程同时会取消子协程。通过 ensureActive() 方法检查协程是否还处于运行状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Job : CoroutineContext.Element {
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

public fun start(): Boolean
public fun cancel(cause: CancellationException? = null)
public fun cancel(): Unit = cancel(null)

public suspend fun join()
public suspend fun Job.cancelAndJoin() {
cancel()
return join()
}

public fun Job.ensureActive(): Unit {
if (!isActive) throw getCancellationException()
}
}
状态 说明
isActive 活动状态。协程已经启动,但是没有完成也没有取消。
isCompleted 完成状态。协程执行完毕、取消或异常都视为完成。
isCancelled 取消状态。协程主动调用 cancel() 方法、执行失败或父/子协程取消。

SupervisorJob 不会影响其他协程。

1
2
3
4
5
6
// SupervisorKt.kt
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}

Deferred

1
2
3
4
5
6
7
8
9
public interface Deferred<out T> : Job {
public suspend fun await(): T

public val onAwait: SelectClause1<T>

public fun getCompleted(): T

public fun getCompletionExceptionOrNull(): Throwable?
}

Deferred 也是一个接口,并继承了 Job。具有 Job 的特性,同时通过 await() 方法可以获取延时的结果。也是 async 构建器返回的类型。

CoroutineDispatcher

CoroutineDispatcher 可以指定协程的运行线程,dispatch() 方法用于把协程任务分派到特定线程运行。

1
2
3
4
5
6
7
8
9
/**
* Base class to be extended by all coroutine dispatcher implementations.
*/
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
}

Dispatcher 用于告知协程在哪个线程中运行,kotlin 内置了 4 个 CoroutineDispatcher 实现:

1
2
3
4
5
6
7
8
9
public actual object Dispatchers {
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
  • Main:把协程运行在平台相关的只能操作 UI 对象的线程,它根据不同的平台有不同的实现,kotlin 支持下面三种平台:
    1. js: 提供对 JavaScript 的支持
    2. native: 一种将 kotlin 代码编译为无需虚拟机就可运行的原生二进制文件的技术
    3. JVM: 需要虚拟机才能编译的平台,例如 Android 需要引入 kotlinx-coroutines-android 库
  • Default:运行在低并发的线程池中,去执行一些计算密集型的操作;
  • IO:运行在高并发的线程池中,执行一些阻塞密集型操作;
  • Unconfined:不指定协程运行的线程;

DefaultScheduler 使用的是 kotlin 自己实现的线程池,也是默认使用的线程池;CommonPool 使用的是 java 类库中的 Executor。

DefaultScheduler#createScheduler() 方法返回 CoroutineScheduler 类型的实例。CoroutineScheduler 使用工作窃取算法(Work Stealing)重新实现了一套线程池的任务调度逻辑,它的性能、扩展性对协程的任务调度更友好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {

override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
val task = createTask(block, taskContext)
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}

withContext() 函数可以切换代码块运行的线程,代码块最后一行作为返回值。

CoroutineName

1
2
3
4
5
6
7
/**
* User-specified name of coroutine.
*/
public data class CoroutineName( val name: String) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
override fun toString(): String = "CoroutineName($name)"
}

CoroutineExceptionHandler

CoroutineExceptionHandler 用来处理协程运行中未捕获的异常,每一个创建的协程默认都会有一个异常处理器。

1
2
3
4
public interface CoroutineExceptionHandler : CoroutineContext.Element {
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler 只对 launch() 方法启动的根协程有效,而对 async() 启动的根协程无效,因为 async 启动的根协程默认会捕获所有未捕获异常并把它放在 Deferred 中,等到用户调用 Deferred 的 await 方法才抛出。

CancellationException 会被所有 CoroutineExceptionHandler 省略,但可以 try-catch 它。而且协程抛出 CancellationException 时,并不会终止当前父协程的运行。
协程内部的异常通过传统的 try-catch 方式捕获没有问题,但是不要跨协程捕获异常。

全局捕获异常的方式:CoroutineExceptionHandler。CorountineExceptionHandler 只能放到顶层协程当中,子协程当中不要使用它。

1
2
3
4
5
6
7
val handler = CoroutineExceptionHandler { coroutineContext, throwable ->
// caught exception
}

val scope = CoroutineScope(Dispatchers.Main + Job() + handler)

viewModelScope.launch(handler) {}

协程失败时会将事件冒泡到上一层,也就是 Parent 的这层协程当中,然后 Parent 层会先将自己的子协程全部 cancel 掉,接着再将自己 cancel 掉,最后再将这个事件继续冒泡到上一层。可以简单理解:假如一个协程失败的话,它的整个协程栈,所有的协程全都会被取消。

协程的构建

在 Kotlin 当中创建协程主要有两种方式,分别是 launch()async() 函数。它们必须要在协程作用域当中才能调用。

runBlocking()

通常适用于单元测试的场景,而业务开发中不会用到这个函数,因为它是线程阻塞的。调用了 runBlocking 的主线程会一直阻塞直到 runBlocking 内部的协程执行完毕。

1
2
3
4
5
6
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
// ...
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}

launch()

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
  • context: 协程上下文,可以指定协程运行的线程。默认与指定的 CoroutineScope 中的 coroutineContext 保持一致,比如 GlobalScope 默认运行在一个后台工作线程内,也可以通过显示指定参数来更改协程运行的线程,比如 Dispatchers.IO。
  • start:协程的启动模式。
    1. CoroutineStart.DEFAULT:协程的默认启动模式,表示立即执行协程。
    2. CoroutineStart.LAZY:需要时才执行协程。
    3. CoroutineStart.ATOMIC:类似 DEFAULT,但是不能取消。
    4. CoroutineStart.UNDISPATCHED:类似 ATOMIC,立即执行协程,直到它在当前线程中的第一个挂起点。
  • block:协程主体。也就是要在协程内部运行的代码,可以通过 lamda 表达式的方式方便的编写协程内运行的代码。

GlobalScope.launch {} 用于创建一个生命周期和应用程序一致协程,不阻塞调用者线程。

async()

async 并发函数会返回一个 Deferred 类型的值。调用 async 函数代码会执行,之后可以调用 deferred.await 函数来获取函数执行的返回结果。async 会开启协程去执行代码块里的代码,同时代码块最后一行代码会作为返回值返回,可以调用 await 函数来去获取返回的返回值。如果调用 await 函数的时候,协程还没有运行完,调用 await 函数的协程就会被挂起,一直等到 async 函数执行结束之后, await 函数才会重新被恢复。

1
2
3
4
5
6
7
8
9
10
11
12
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

取消协程

绝大部分情况下,协程的取消是自动的。所有 kotlin.coroutines 的 suspend 方法都是可取消的。

1
2
3
4
5
6
7
GlobalScope.launch(Dispatchers.IO) {
while (true) {
if (isActive) {
// todo
}
}
}

isActive 是 CoroutineScope 中的扩展属性。

1
2
3
while (true) {
delay(500)
}

在上面这个无限循环里,每一个 delay 都会检查协程是否处于有效状态,一旦发现协程被取消,循环的操作也会被取消。

协程的取消是协作的。一段协程代码必须协作才能被取消。

Job 是作为协程身份唯一标识的存在,每一个协程内部都会有一个唯一的标识。通过 Job 可以控制协程的生命周期,比如:

  • 判断协程是否正在运行
  • 判断协程是否已经被取消
  • 判断协程是否运行结束

参考

[1] Kotlin 协程指南
[2] GDG 上海实录回顾,带你快速上手 Kotlin 协程
[3] Kotlin 协程的挂起 - 扔物线(朱凯)
[4] Kotlin Coroutines - Github
[5] Coroutines Guide
[6] 硬核万字解读——Kotlin 协程原理解析
[7] kotlinx.coroutines