破解 Kotlin 协程(6) - 协程挂起篇-程序员宅基地

技术标签: ViewUI  java  移动开发  javascript  

关键词:Kotlin 协程 协程挂起 任务挂起 suspend 非阻塞

协程的挂起最初是一个很神秘的东西,因为我们总是用线程的概念去思考,所以我们只能想到阻塞。不阻塞的挂起到底是怎么回事呢?说出来你也许会笑~~(哭?。。抱歉这篇文章我实在是没办法写的更通俗易懂了,大家一定要亲手实践!)

1. 先看看 delay

我们刚刚学线程的时候,最常见的模拟各种延时用的就是 Thread.sleep 了,而在协程里面,对应的就是 delaysleep 让线程进入休眠状态,直到指定时间之后某种信号或者条件到达,线程就尝试恢复执行,而 delay 会让协程挂起,这个过程并不会阻塞 CPU,甚至可以说从硬件使用效率上来讲是“什么都不耽误”,从这个意义上讲 delay 也可以是让协程休眠的一种很好的手段。

delay 的源码其实很简单:


public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

复制代码

cont.context.delay.scheduleResumeAfterDelay 这个操作,你可以类比 JavaScript 的 setTimeout,Android 的 handler.postDelay,本质上就是设置了一个延时回调,时间一到就调用 cont 的 resume 系列方法让协程继续执行。

剩下的最关键的就是 suspendCancellableCoroutine 了,这可是我们的老朋友了,前面我们用它实现了回调到协程的各种转换 —— 原来 delay 也是基于它实现的,如果我们再多看一些源码,你就会发现类似的还有 joinawait 等等。

2. 再来说说 suspendCancellableCoroutine

既然大家对于 suspendCancellableCoroutine 已经很熟悉了,那么我们干脆直接召唤一个老朋友给大家:


private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
    cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}

复制代码

Job.join() 这个方法会首先检查调用者 Job 的状态是否已经完成,如果是,就直接返回并继续执行后面的代码而不再挂起,否则就会走到这个 joinSuspend 的分支当中。我们看到这里只是注册了一个完成时的回调,那么传说中的 suspendCancellableCoroutine 内部究竟做了什么呢?


public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        block(cancellable)
        cancellable.getResult() // 这里的类型是 Any?
    }
    
复制代码

suspendCoroutineUninterceptedOrReturn 这个方法调用的源码是看不到的,因为它根本没有源码:P 它的逻辑就是帮大家拿到 Continuation 实例,真的就只有这样。不过这样说起来还是很抽象,因为有一处非常的可疑:suspendCoroutineUninterceptedOrReturn 的返回值类型是 T,而传入的 lambda 的返回值类型是 Any?, 也就是我们看到的 cancellable.getResult() 的类型是 Any?,这是为什么?

我记得在协程系列文章的开篇,我就提到过 suspend 函数的签名,当时是以 await 为例的,这个方法大致相当于:

fun await(continuation: Continuation<User>): Any {
    ...
}
复制代码

suspend 一方面为这个方法添加了一个 Continuation 的参数,另一方面,原先的返回值类型 User 成了 Continuation 的泛型实参,而真正的返回值类型竟然是 Any。当然,这里因为定义的逻辑返回值类型 User 是不可空的,因此真实的返回值类型也用了 Any 来示意,如果泛型实参是个可空的类型,那么真实的返回值类型也就是 Any? 了,这正与前面提到的 cancellable.getResult() 返回的这个 Any? 相对应。

如果大家去查 await 的源码,你同样会看到这个 getResult() 的调用。

简单来说就是,对于 suspend 函数,不是一定要挂起的,可以在需要的时候挂起,也就是要等待的协程还没有执行完的时候,等待协程执行完再继续执行;而如果在开始 join 或者 await 或者其他 suspend 函数,如果目标协程已经完成,那么就没必要等了,直接拿着结果走人即可。那么这个神奇的逻辑就在于 cancellable.getResult() 究竟返回什么了,且看:


internal fun getResult(): Any? {
    ...
    if (trySuspend()) return COROUTINE_SUSPENDED // ① 触发挂起逻辑
    ...
    if (state is CompletedExceptionally)  // ② 异常立即抛出
        throw recoverStackTrace(state.cause, this) 
    return getSuccessfulResult(state) // ③ 正常结果立即返回
}

复制代码

这段代码 ① 处就是挂起逻辑了,表示这时候目标协程还没有执行完,需要等待结果,②③是协程已经执行完可以直接拿到异常和正常结果的两种情况。②③好理解,关键是 ①,它要挂起,这返回的是个什么东西?

public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
复制代码

这是 1.3 的实现,1.3 以前的实现更有趣,就是一个白板 Any。其实是什么不重要,关键是这个东西是一个单例,任何时候协程见到它就知道自己该挂起了。

3. 深入挂起操作

既然说到挂起,大家可能觉得还是一知半解,还是不知道挂起究竟怎么做到的,怎么办?说真的这个挂起是个什么操作其实一直没有拿出来给大家看,不是我们太小气了,只是太早拿出来会比较吓人。。


suspend fun hello() = suspendCoroutineUninterceptedOrReturn<Int>{
    continuation ->
    log(1)
    thread {
        Thread.sleep(1000)
        log(2)
        continuation.resume(1024)
    }
    log(3)
    COROUTINE_SUSPENDED
}

复制代码

我写了这么一个 suspend 函数,在 suspendCoroutineUninterceptedOrReturn 当中直接返回了这个传说中的白板 COROUTINE_SUSPENDED,正常来说我们应该在一个协程当中调用这个方法对吧,可是我偏不,我写一段 Java 代码去调用这个方法,结果会怎样呢?


public class CallCoroutine {
    public static void main(String... args) {
        Object value = SuspendTestKt.hello(new Continuation<Integer>() {
            @NotNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }

            @Override
            public void resumeWith(@NotNull Object o) { // ①
                if(o instanceof Integer){
                    handleResult(o);
                } else {
                    Throwable throwable = (Throwable) o;
                    throwable.printStackTrace();
                }
            }
        });

        if(value == IntrinsicsKt.getCOROUTINE_SUSPENDED()){ // ②
            LogKt.log("Suspended.");
        } else {
            handleResult(value);
        }
    }

    public static void handleResult(Object o){
        LogKt.log("The result is " + o);
    }
}

复制代码

这段代码看上去比较奇怪,可能会让人困惑的有两处:

① 处,我们在 Kotlin 当中看到的 resumeWith 的参数类型是 Result,怎么这儿成了 Object 了?因为 Result 是内联类,编译时会用它唯一的成员替换掉它,因此就替换成了 Object (在Kotlin 里面是 Any?

② 处 IntrinsicsKt.getCOROUTINE_SUSPENDED() 就是 Kotlin 的 COROUTINE_SUSPENDED

剩下的其实并不难理解,运行结果自然就是如下所示了:

07:52:55:288 [main] 1
07:52:55:293 [main] 3
07:52:55:296 [main] Suspended.
07:52:56:298 [Thread-0] 2
07:52:56:306 [Thread-0] The result is 1024
复制代码

其实这段 Java 代码的调用方式与 Kotlin 下面的调用已经很接近了:

suspend fun main() {
    log(hello())
}
复制代码

只不过我们在 Kotlin 当中还是不太容易拿到 hello 在挂起时的真正返回值,其他的返回结果完全相同。

12:44:08:290 [main] 1
12:44:08:292 [main] 3
12:44:09:296 [Thread-0] 2
12:44:09:296 [Thread-0] 1024
复制代码

很有可能你看到这里都会觉得晕头转向,没有关系,我现在已经开始尝试揭示一些协程挂起的背后逻辑了,比起简单的使用,概念的理解和接受需要有个小小的过程。

4. 深入理解协程的状态转移

前面我们已经对协程的原理做了一些揭示,显然 Java 的代码让大家能够更容易理解,那么接下来我们再来看一个更复杂的例子:


suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    thread {
        Thread.sleep(1000)
        continuation.resume("Return suspended.")
    }
    COROUTINE_SUSPENDED
}

suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
    log(1)
    "Return immediately."
}

复制代码

我们首先定义两个挂起函数,第一个会真正挂起,第二个则会直接返回结果,这类似于我们前面讨论 join 或者 await 的两条路径。我们再用 Kotlin 给出一个调用它们的例子:


suspend fun main() {
    log(1)
    log(returnSuspended())
    log(2)
    delay(1000)
    log(3)
    log(returnImmediately())
    log(4)
}

复制代码

运行结果如下:

08:09:37:090 [main] 1
08:09:38:096 [Thread-0] Return suspended.
08:09:38:096 [Thread-0] 2
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4
复制代码

好,现在我们要揭示这段协程代码的真实面貌,为了做到这一点,我们用 Java 来仿写一下这段逻辑:

注意,下面的代码逻辑上并不能做到十分严谨,不应该出现在生产当中,仅供学习理解协程使用。


public class ContinuationImpl implements Continuation<Object> {

    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    result = SuspendFunctionsKt.returnSuspended( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    result = DelayKt.delay(1000, this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    result = SuspendFunctionsKt.returnImmediately( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}

复制代码

我们定义了一个 Java 类 ContinuationImpl,它就是一个 Continuation 的实现。

实际上如果你愿意,你还真得可以在 Kotlin 的标准库当中找到一个名叫 ContinuationImpl 的类,只不过,它的 resumeWith 最终调用到了 invokeSuspend,而这个 invokeSuspend 实际上就是我们的协程体,通常也就是一个 Lambda 表达式 —— 我们通过 launch启动协程,传入的那个 Lambda 表达式,实际上会被编译成一个 SuspendLambda 的子类,而它又是 ContinuationImpl 的子类。

有了这个类我们还需要准备一个 completion 用来接收结果,这个类仿照标准库的 RunSuspend 类实现,如果你有阅读前面的文章,那么你应该知道 suspend main 的实现就是基于这个类:


public class RunSuspend implements Continuation<Unit> {

    private Object result;

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object result) {
        synchronized (this){
            this.result = result;
            notifyAll(); // 协程已经结束,通知下面的 wait() 方法停止阻塞
        }
    }

    public void await() throws Throwable {
        synchronized (this){
            while (true){
                Object result = this.result;
                if(result == null) wait(); // 调用了 Object.wait(),阻塞当前线程,在 notify 或者 notifyAll 调用时返回
                else if(result instanceof Throwable){
                    throw (Throwable) result;
                } else return;
            }
        }
    }
}

复制代码

这段代码的关键点在于 await() 方法,它在其中起了一个死循环,不过大家不要害怕,这个死循环是个纸老虎,如果 resultnull,那么当前线程会被立即阻塞,直到结果出现。具体的使用方法如下:


...
    public static void main(String... args) throws Throwable {
        RunSuspend runSuspend = new RunSuspend();
        ContinuationImpl table = new ContinuationImpl(runSuspend);
        table.resumeWith(Unit.INSTANCE);
        runSuspend.await();
    }
...

复制代码

这写法简直就是 suspend main 的真实面貌了。

我们看到,作为 completion 传入的 RunSuspend 实例的 resumeWith 实际上是在 ContinuationImplresumeWtih 的最后才会被调用,因此它的 await() 一旦进入阻塞态,直到 ContinuationImpl 的整体状态流转完毕才会停止阻塞,此时进程也就运行完毕正常退出了。

于是这段代码的运行结果如下:

08:36:51:305 [main] 1
08:36:52:315 [Thread-0] Return suspended.
08:36:52:315 [Thread-0] 2
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 3
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 4
复制代码

我们看到,这段普通的 Java 代码与前面的 Kotlin 协程调用完全一样。那么我这段 Java 代码的编写根据是什么呢?就是 Kotlin 协程编译之后产生的字节码。当然,字节码是比较抽象的,我这样写出来就是为了让大家更容易的理解协程是如何执行的,看到这里,相信大家对于协程的本质有了进一步的认识:

  • 协程的挂起函数本质上就是一个回调,回调类型就是 Continuation
  • 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像我们前面例子中的 label 不断的自增来实现状态流转一样

如果能够把这两点认识清楚,那么相信你在学习协程其他概念的时候就都将不再是问题了。如果想要进行线程调度,就按照我们讲到的调度器的做法,在 resumeWith 处执行线程切换就好了,其实非常容易理解的。官方的协程框架本质上就是在做这么几件事儿,如果你去看源码,可能一时云里雾里,主要是因为框架除了实现核心逻辑外还需要考虑跨平台实现,还需要优化性能,但不管怎么样,这源码横竖看起来就是五个字:状态机回调。

5. 小结

不同以往,我们从这一篇开始毫无保留的为大家尝试揭示协程背后的逻辑,也许一时间可能有些难懂,不过不要紧,你可以使用协程一段时间之后再来阅读这些内容,相信一定会豁然开朗的。

当然,这一篇内容的安排更多是为后面的序列篇开路,Kotlin 的 Sequence 就是基于协程实现的,它的用法很简单,几乎与普通的 Iterable 没什么区别,因此序列篇我们会重点关注它的内部实现原理,欢迎大家关注。


欢迎关注 Kotlin 中文社区!

中文官网:www.kotlincn.net/

中文官方博客:www.kotliner.cn/

公众号:Kotlin

知乎专栏:Kotlin

CSDN:Kotlin中文社区

掘金:Kotlin中文社区

简书:Kotlin中文社区

转载于:https://juejin.im/post/5ceb494851882532b93019e2

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_33856370/article/details/91429056

智能推荐

oracle 12c 集群安装后的检查_12c查看crs状态-程序员宅基地

文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态

解决jupyter notebook无法找到虚拟环境的问题_jupyter没有pytorch环境-程序员宅基地

文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境

国内安装scoop的保姆教程_scoop-cn-程序员宅基地

文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn

Element ui colorpicker在Vue中的使用_vue el-color-picker-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker

迅为iTOP-4412精英版之烧写内核移植后的镜像_exynos 4412 刷机-程序员宅基地

文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机

Linux系统配置jdk_linux配置jdk-程序员宅基地

文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk

随便推点

matlab(4):特殊符号的输入_matlab微米怎么输入-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入

C语言程序设计-文件(打开与关闭、顺序、二进制读写)-程序员宅基地

文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。‍ Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。

Touchdesigner自学笔记之三_touchdesigner怎么让一个模型跟着鼠标移动-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动

【附源码】基于java的校园停车场管理系统的设计与实现61m0e9计算机毕设SSM_基于java技术的停车场管理系统实现与设计-程序员宅基地

文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计

Android系统播放器MediaPlayer源码分析_android多媒体播放源码分析 时序图-程序员宅基地

文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;amp;gt;Jni-&amp;amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图

java 数据结构与算法 ——快速排序法-程序员宅基地

文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法