背景: 使用
Kotlin Coroutine 的使用方法,参照官方文档食用即可。这里只简单给出一些概念。
suspend
使用 suspend 表示函数支持挂起操作,目的在于告诉编译器,该方法可能产生阻塞(因为普通方法也能使用 suspend 标记,但是实际不会起作用)。suspend 方法会被编译为继承 SuspendLambda 的类
创建协程
-
launch
返回 Job,能够获取和控制当前 Scope 下的协程的状态,比如取消(cancel)协程、获取是否运行(isActive)。
-
async
返回 Deferred,Deferred 继承自 Job,它拥有 Job 的功能。另外 Deferred 还类似 Java UTC 中的 Future 接口,通过 Deferred 能够获取协程执行完毕之后的结果。
-
withContext
切换代码块的执行上下文
结构化并发
将多个协程放到一个可以管理的空间里面,这个空间的名字就叫 CoroutineScope。通过 Scope 就能够统一的管理内部的协程,方便对多个协程整体上做取消(cancel)、等待结果(join)等操作。
实现原理
Kotlin 的协程实现属于有限状态机编程, 有限状态机编程是编程范式的一种。是指利用有限状态机来进行编程实现。
Kotlin 在 JVM 上实现的协程机制,本身没有超脱出 JVM 的范畴,也不能够超脱,所以代码本质还是运行在操作系统级别的线程上的。所以 Kotlin Coroutine 的实现就是要在 JVM 上实现一套代码(任务)的挂起和恢复机制,而挂起和恢复刚好能抽取为两种状态,于是要实现代码运行的挂起和恢复的需求,就转变为了实现一种控制状态转移的需求。而有限状态下的编程,使用有限状态机编程范式再合适不过了。
Kotlin 协程本质还是运行在线程上的,所以如果从代码的运行角度来看,并没有太多的魔法,代码的运行机制和传统一样,虚拟机按行读取指令和操作数,然后执行操作。所以 Kotlin 协程的魔法更多是在编译期就开始了。
执行的最小单元 CodeBlock
在 Kotlin 代码编译时,launch/async 和 suspend 方法中的代码会根据挂起点被拆分到多个 Code Block 中。Code Block 会被封装为 Runnable
,被封装的 Runnable
会被 Dispatcher 执行。使用哪种 Dispatcher 则由传递给 launch/async 的 CoroutineContext 参数决定。
例子代码:
|
|
例子代码中,使用的 runBlocking
来创建协程,runBlocking
会为内部的协程提供一个阻塞当前线程的 EventLoop 队列,等队列中的所有协程都执行完之后,runBlocking
才会执行完毕。例子代码中的 launch
没有传递额外的 CoroutineContext 参数,所以它会继承 runBlocking
的 context 去使用。
例子代码的执行过程可以简述为:
运行流 | 说明 |
---|---|
1. 调用 runBlocking ,block 1 入队,然后开始循环从队列中消费任务 2. block 1 出队执行,输出 1 3. launch 被调用,block 2 入队4. 输出 3 5. block 1 执行完成,执行继续从队列中取下一个任务 6. block 2 出队执行,输出 3 ,调用 suspendFunc 方法8. suspendFunc 方法在编译之后,会在调用时先持有它自己的代码执行完成之后将要继续执行的代码块(block 3)的引用(在它自己的代码执行完成之后,就会恢复执行 block 3) 9. 执行 suspendFunc ,输出 4 10. 调用 delay ,block 5 会被封装为一个 delay task 并入队 11. block 2 和 block 4 执行完毕,执行继续从队列中取下一个任务 12. 这时队列中存在的任务是 block 5 ,循环会一直循环等待,直到到满足了 block 5 delay 的时间时就将 block 5 出队 13. block 5 出队执行,输出 5 14. block 5 执行完成就代表 suspendFunc 执行完毕了,就会恢复执行 suspendFunc 在进入时持有的 block 3 15. block 3 执行,输出 6 16. runBlocking 中的所有代码执行完毕,程序执行完毕 |
通过上面这个例子能看出,虽然所有代码都是在同一个线程执行的,但是 Kotlin 协程却实现了非阻塞的运行(println(2)
不会被 println(3)
阻塞),而协程内部又是按照同步的方式执行的(println(5)
在 delay
完成之后才会被执行)。
正是由于编译器将来自不同协程的代码块相互交错的插入到事件循环队列中,才让仅使用一个线程就能实现代码块的挂起和恢复得以实现。
其他的 CoroutineContext 或许使用不同的 Dispatcher 在不同的线程上采用不同的策略去执行协程,但是其过程与这个例子是类似的。
维护执行状态的 Continuation
到目前为止,我们已经了解了使代码块执行的逻辑概念了。但是,在该概念是如何实现上面还存在一些疑惑。
- 如何在已编译的 Java 字节码中实现这个逻辑概念?
- 如何跟踪代码块的执行状态?或者说如何决定一个代码块执行完成之后该执行的哪一个代码块?
我尝试通过 JD-GUI 去反编译 RunBlockingDemo 类,但是 suspendFuncA 方法 JD-GUI 反编译失败了,没有显示出来。于是我先通过 javap RunBlockingDemo
查看了 RunBlockingDemo 类会有哪些方法,发现 suspendFuncA 被编译为了下面的样子:
|
|
然后我再使用 javap -c RunBlockingDemo
反编译得到编译之后的字节码:
*suspendFuncA* 方法的反编译字节码太长, 点击可展开
Code:
0: aload_1 // 加载第二个局部变量到栈顶, 这里的第二个局部变量就是方法的第一个参数 Continuation
1: instanceof #29 // class RunBlockingDemo$suspendFuncA$1
4: ifeq 39 //如果不是 RunBlockingDemo$suspendFuncA$1 的对象,就跳转到 39 行
7: aload_1 //否则加载第二个局部变量到栈顶
8: checkcast #29 // class RunBlockingDemo$suspendFuncA$1
11: astore 5 //栈顶对象存入第5个局部变量,即方法的第一个参数通过了 checkcast 判断之后,存入第5个局部变量
13: aload 5 //加载第5个局部变量到栈顶
15: getfield #33 // Field RunBlockingDemo$suspendFuncA$1.label:I // 获取 label 字段
18: ldc #34 // int -2147483648 // 从常量池中加载第34个整形常量到栈顶,其值是 Integer.MIN_VALUE
20: iand // 从栈中弹出两个整数,进行与运算,即将 label 和 Integer.MIN_VALUE 进行与运算
21: ifeq 39 // 如果结果为0,就跳转到 39 行
24: aload 5 // 否则加载第5个局部变量到栈顶
26: dup // 复制栈顶元素
27: getfield #33 // Field RunBlockingDemo$suspendFuncA$1.label:I // 加载 label 字段到栈顶
30: ldc #34 // int -2147483648 // 从常量池加载数字 Integer.MIN_VALUE
32: isub // label 减去 Integer.MIN_VALUE 的结果存入栈顶
33: putfield #33 // Field RunBlockingDemo$suspendFuncA$1.label:I // 将栈顶元素存入 label
36: goto 50 // 跳转到 50 行
39: new #29 // class RunBlockingDemo$suspendFuncA$1
42: dup
43: aload_0
44: aload_1
45: invokespecial #35 // Method RunBlockingDemo$suspendFuncA$1."<init>":(LRunBlockingDemo;Lkotlin/coroutines/Continuation;)V
48: astore 5
50: aload 5 // 加载第5个局部变量
52: getfield #39 // Field RunBlockingDemo$suspendFuncA$1.result:Ljava/lang/Object; // result 字段加载到栈顶
55: astore 4 // result 存到第4个局部变量中
57: invokestatic #45 // Method kotlin/coroutines/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
60: astore 6 // 将 getCOUROUTINE_SUSPENDED() 的返回值存入第6个局部变量中
62: aload 5 // 加载第5个局部变量
64: getfield #33 // Field RunBlockingDemo$suspendFuncA$1.label:I // 加载 label 字段到栈顶
67: tableswitch { // 0 to 1
0: 88 // label 为 0 就跳转到 88 行
1: 133 // label 为 1 就跳转到 133 行
default: 165 // 默认跳转到 165 行
}
88: aload 4 // 加载 result 到栈顶
90: invokestatic #51 // Method kotlin/ResultKt.throwOnFailure:(Ljava/lang/Object;)V
93: iconst_4 // 加载常量数字 4 到栈顶
94: istore_2 // 将栈顶数字(这里就是4)存到第二个局部变量中
95: iconst_0
96: istore_3 // 将数字0 存到第3个局部变量中
97: getstatic #57 // Field java/lang/System.out:Ljava/io/PrintStream;
100: iload_2 // 加载第2个局部变量,这里存的是 4
101: invokevirtual #63 // Method java/io/PrintStream.println:(I)V // 调用 println 输出栈顶的数字 4
104: ldc2_w #64 // long 500l // 加载常量池第63个长整形数字到栈顶,这里就是 500L
107: aload 5
109: aload 5
111: aload_0
112: putfield #68 // Field RunBlockingDemo$suspendFuncA$1.L$0:Ljava/lang/Object; // 将 this 对象引用存储到 L$0 字段
115: aload 5
117: iconst_1
118: putfield #33 // Field RunBlockingDemo$suspendFuncA$1.label:I // label 赋值 1
121: invokestatic #74 // Method kotlinx/coroutines/DelayKt.delay:(JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
124: dup
125: aload 6
127: if_acmpne 149 // 如果 delay 的返回值 != getCOROUTINE_SUSPEND() 的返回值,就跳转 149 行
130: aload 6 // 否则返回 getCOROUTINE_SUSPEND() 的返回值
132: areturn
133: aload 5
135: getfield #68 // Field RunBlockingDemo$suspendFuncA$1.L$0:Ljava/lang/Object;
138: checkcast #2 // class RunBlockingDemo
141: astore_0
142: aload 4
144: invokestatic #51 // Method kotlin/ResultKt.throwOnFailure:(Ljava/lang/Object;)V
147: aload 4
149: pop
150: iconst_5
151: istore_2 // 5 存入第2个局部变量
152: iconst_0
153: istore_3 // 0 存入第3个局部变量
154: getstatic #57 // Field java/lang/System.out:Ljava/io/PrintStream;
157: iload_2
158: invokevirtual #63 // Method java/io/PrintStream.println:(I)V // 输出 5
161: getstatic #80 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
164: areturn // return Unit.INSTANCE
165: new #82 // class java/lang/IllegalStateException
168: dup
169: ldc #84 // String call to 'resume' before 'invoke' with coroutine
171: invokespecial #87 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
174: athrow // 抛出异常
下面是我根据反编译出来的字节码推测出的 RunBlockingDemo 类的 suspendFuncA 方法经过编译器处理之后的源码:
|
|
从源码中能得出两个有趣的地方:
- 例子中的
supendFuncA
是没有参数的,但是编译之后,编译器为它增加了一个 Continuation 类型的参数 suspendFuncA
内部通过一个 switch-case 去调度执行不同的代码块,而 switch 的参数则是传入的 Continuation 对象的 label 字段
因此不难推测出,编译器为 suspendFuncA
方法增加的 Continuation 参数就是用来跟踪协程运行状态的对象,而且运行状态就保存在一个简单的 int 类型的 label
字段中。
再回头看看 suspendFuncA
方法是如何被调用的。
- 首先编译器会为 suspend 方法生成一个实现了 ContinuationImpl 的包装类,然后在包装类的 invokeSuspend 方法中调用
suspendFuncA
。
下面就是编译器为suspendFuncA
方法生成的包装类(使用 JD-GUI 查看):
|
|