zoukankan      html  css  js  c++  java
  • Kotlin 朱涛16 协程 生命周期 Job 结构化并发

    本文地址


    目录

    16 | Job:协程也有生命周期吗?

    Job 其实就是协程的句柄。从某种程度上讲,当我们用 launch 和 async 创建一个协程以后,同时也会创建一个对应的 Job 对象。另外,Job 也是我们理解协程生命周期、结构化并发的关键知识点。通过 Job 暴露的 API,我们还可以让不同的协程之间互相配合,从而实现更加复杂的功能。

    Job 和 Deferred

    launch、async 的返回值类型分别是 JobDeferred,其中 Deferred 继承自 Job,只是多了一个泛型参数 T 和一个返回类型为 T 的 await() 方法。

    public interface Job : CoroutineContext.Element { ... } // Job 接口
    public interface Deferred<out T> : Job { ... }          // 继承自 Job 接口
    

    通过 Job 对象,我们主要可以做两件事情:

    • 监测协程的生命周期状态
    • 操控协程

    测试案例

    fun main() = runBlocking {
        val job = launch(start = CoroutineStart.LAZY) { // 指定启动模式为懒加载模式
            val name = "Thread:${Thread.currentThread().name}"
            println("----- 协程 start! $name")
            delay(200L)  // 可修改 delay 的时长(例如 800) 后再看下日志
            println("----- 协程 end!   $name")
        }
    
        delay(50L)
        job.log(1)
    
        job.start()  // 使用 LAZY 作为启动模式,调用 start() 后,状态才变成 Active
        job.log(2)
    
        delay(50L)   // 可修改 delay 的时长(0-500) 后再看下日志
        job.cancel() // 可注释后再看下日志
        job.log(3)
    
        delay(50L)  // 等待程序结束,可修改 delay 的时长(0-200) 后再看下日志
        job.log(4)
        println("-------------- Process end!")
    }
    
    fun Job.log(text: Any) { // 扩展函数,打印 Job 的生命周期状态
        val log = """
    ----------------------------------------- $text
    是否活跃: $isActive
    是否取消: $isCancelled
    是否完成: $isCompleted
    当前协程:Thread:${Thread.currentThread().name}
    ----------------------------------------- $text
    """
        println(log.trimIndent())
    }
    

    打印日志:

    ----------------------------------------- 1
    是否活跃: false
    是否取消: false
    是否完成: false
    当前协程:Thread:main @coroutine#1
    ----------------------------------------- 1
    ----------------------------------------- 2
    是否活跃: true
    是否取消: false
    是否完成: false
    当前协程:Thread:main @coroutine#1
    ----------------------------------------- 2
    ----- 协程 start! Thread:main @coroutine#2
    ----------------------------------------- 3
    是否活跃: false
    是否取消: true
    是否完成: false
    当前协程:Thread:main @coroutine#1
    ----------------------------------------- 3
    ----------------------------------------- 4
    是否活跃: false
    是否取消: true
    是否完成: true
    当前协程:Thread:main @coroutine#1
    ----------------------------------------- 4
    -------------- Process end!
    

    Job 的生命周期

    可以看到,对于协程的 Job 来说,它有两种初始状态:

    • 如果 Job 是以懒加载的方式创建的,那么它的初始状态是 New
      • 协程任务被 launch 以后,并不会立即执行
      • 调用 start() 以后,状态才变成 Active 状态
    • 如果一个协程是以非懒加载的方式创建的,那么它的初始状态是 Active

    注意:

    • 在协程任务正常执行完毕之前,调用 cancel() 以后,最终的 isCancelledisCompleted 都是 true
      • 其中,isCancelled 状态会及时更改为 true,isCompleted 状态的更改有一定的延迟
    • 在协程任务正常执行完毕之后,isCompleted 是 true,isCancelled 是 false
      • 此时再调用 cancel() 也不会改变协程的状态
    • 流程图当中的 New、Active、Completing、Cancelling、Completed、Cancelled 这些状态,都是 Job 内部私有的状态
    • Job 内部私有的 Completed、Cancelled 状态,都会认为是对外暴露出的 isCompleted

    等待和监听协程结束

    上面的代码中,如果 Job 内部 delay 时间很长,打印 Process end 之后,程序并不会立即结束,而是等 Job 任务执行完毕以后才真正退出。

    为了更加灵活地等待和监听协程的结束事件,我们可以用 join() 以及 invokeOnCompletion {} 优化上面的代码。

    • invokeOnCompletion {} 的作用是监听协程结束的事件,如果 job 被取消了,这个回调仍然会被调用
    • join() 是一个 挂起函数,它的作用是:挂起当前的程序执行流程,待 job 中的协程任务执行完毕后,再恢复当前的程序执行流程
    fun main() = runBlocking {
        val job = launch(start = CoroutineStart.LAZY) {
            val name = "Thread:${Thread.currentThread().name}"
            println("----- 协程 start! $name")
            delay(200L)
            println("----- 协程 end!   $name")
        }
    
        delay(50L)
        job.log(1)
    
        job.start()
        job.log(2)
        job.invokeOnCompletion { job.log("end") } // 监听协程结束的事件
    
        job.join() // 等待协程执行完毕
        job.log(3)
        println("-------------- Process end!")
    }
    

    Job 的常用 API

    public interface Job : CoroutineContext.Element {
        // ------------ 状态查询 ------------
        public val isActive: Boolean
        public val isCompleted: Boolean
        public val isCancelled: Boolean
        public fun getCancellationException(): CancellationException
    
        // ------------ 操控状态 ------------
        public fun start(): Boolean
        public fun cancel(cause: CancellationException? = null)
        public fun cancel(): Unit = cancel(null)
        public fun cancel(cause: Throwable? = null): Boolean
    
        // ------------ 等待状态 ------------
        public suspend fun join()
        public val onJoin: SelectClause0
    
        // ------------ 完成状态回调API ------------
        public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
        public fun invokeOnCompletion(
            onCancelling: Boolean = false,
            invokeImmediately: Boolean = true,
            handler: CompletionHandler): DisposableHandle
        // ...
    }
    

    通过 Job 的 API 可以发现,Job 和协程的关系,有点像遥控器和空调的关系 :

    • 空调遥控器可以监测空调的运行状态,Job 也可以监测协程的运行状态
    • 空调遥控器可以操控空调的运行状态,Job 也可以简单操控协程的运行状态
    • 所以,从某种程度来讲,遥控器是空调对外暴露的一个 句柄,同样,Job 是协程的句柄

    Deferred

    Deferred 是继承自 Job 的一个接口,它在 Job 的基础上扩展了一个 await() 方法:

    public interface Deferred<out T> : Job { // 带泛型
        public suspend fun await(): T        // 挂起函数,有返回值
    }
    

    await() 是一个挂起函数,如果当前的 Deferred 任务还没执行完毕,那么,await() 就会挂起当前的协程执行流程,等待 Deferred 任务执行完毕,再恢复执行后面剩下的代码。await() 的行为模式和 join() 是类似的,只不过 join() 是没有返回值的。

    fun main() = runBlocking {
        val deferred = async {
            val name = "Thread:${Thread.currentThread().name}"
            println("start $name")
            delay(100L)
            println("end   $name")
            "bqt"                     // 协程任务的返回值
        }
        val result = deferred.await() // 挂起当前协程的执行流程,直到协程任务执行完毕后恢复
        println("exit  $result")
    }
    
    start Thread:main @coroutine#2
    end   Thread:main @coroutine#2
    exit  bqt
    

    协程的结构化并发

    Kotlin 协程的结构化并发,是 Kotlin 协程的第二大优势,其重要性仅次于 挂起函数

    结构化并发,简单来说就是:带有结构和层级的并发

    线程之间是不存在父子关系的,但协程之间是会存在父子关系的。Job 源码中有两个 API 是用来描述父子关系的:

    public interface Job : CoroutineContext.Element {
        public val children: Sequence<Job>             // 一个惰性的集合,可以对它的子 Job 进行遍历
        @InternalCoroutinesApi
        public fun attachChild(child: ChildJob): ChildHandle // 协程内部的 API,用于绑定 ChildJob
    }
    
    • 调用 parentJob 的 join() 方法后,它会等待其内部的子 Job 全部执行完毕,才会恢复执行
    • 调用 parentJob 的 cancel() 方法后,它内部的协程任务也全都被取消了

    join 案例

    fun main() = runBlocking {
        val parentJob: Job
        var job1: Job? = null
        var job2: Job? = null
        var job3: Job? = null
    
        parentJob = launch {       // 在外部创建了 1 个父 Job
            job1 = launch {        // 在内部创建了 3 个子 Job
                println("1 start")
                delay(100L)
                println("1 end")
            }
            job2 = launch {
                println("2 start")
                delay(2000L)
                println("2 end")
            }
            job3 = launch {
                println("3 start")
                delay(5000L)
                println("3 end")
            }
        }
    
        delay(50L) // 确保所有子 Job 已正常启动,且尚未结束(否则下面的遍历会错误)
        parentJob.children.forEachIndexed { index, job ->    // 遍历 parentJob 的子 Job
            when (index) {
                0 -> println("job is job1: ${job1 === job}") // 判断引用是否相等,即是否是同一个对象,结果为 true
                1 -> println("job is job2: ${job2 === job}")
                2 -> println("job is job3: ${job3 === job}")
            }
        }
    
        parentJob.join() // 会等待其内部的子 Job 全部执行完毕,才会恢复执行
        println("Process end!")
    }
    

    调用 parentJob 的 join() 方法后,它会等待其内部的子 Job 全部执行完毕,才会恢复执行。

    1 start
    2 start
    3 start
    job is job1: true
    job is job2: true
    job is job3: true
    1 end
    2 end
    3 end
    Process end!
    

    cancel 案例

    将上面的 join() 改为 cancel() 后,

    fun main() = runBlocking {
        val parentJob: Job = launch {
            launch {
                println("1 start")
                delay(100L)
                println("1 end")
            }
            launch {
                println("2 start")
                delay(2000L)
                println("2 end")
            }
            launch {
                println("3 start")
                delay(5000L)
                println("3 end")
            }
        }
    
        delay(500L)        // 确保子 Job 已正常启动。注意,在 delay 期间,第一个子协程以已经执行完成了
        parentJob.cancel() // 调用 parentJob 的 cancel() 方法后,它内部的协程任务也全都被取消了
        println("Process end!")
    }
    

    调用 parentJob 的 cancel() 方法后,它内部的协程任务也全都被取消了。

    1 start
    2 start
    3 start
    1 end
    Process end!
    

    实战:使用 async 优化并发

    下面代码里定义了三个挂起函数(异步任务),假设它们之间的运行结果互不相干,且各自都会耗时 1000 毫秒,请问整个过程执行大约耗时多少时间?

    fun main() = runBlocking {
        suspend fun getResult1(): String {          // 挂起函数
            delay(1000L).also { return "Result1" }  // 异步任务
        }
        suspend fun getResult2(): String {
            delay(1000L).also { return "Result2" }
        }
        suspend fun getResult3(): String {
            delay(1000L).also { return "Result3" }
        }
    
        val results = mutableListOf<String>()
        val time = kotlin.system.measureTimeMillis { // 计算总耗时
            results.add(getResult1())
            results.add(getResult2())
            results.add(getResult3())
        }
    
        println("Time: $time") // Time: 3007
        println(results)       // [Result1, Result2, Result3]
    }
    

    上面代码整个过程大约需要消耗 3000 毫秒,也就是这几个函数耗时的总和。请问该如何优化上面的代码?

    对于这样的情况,我们其实完全可以使用 async 来优化:

    fun main() = runBlocking {
        // ...
        val results: List<String>
        val time = kotlin.system.measureTimeMillis {
            val deferred1: Deferred<String> = async { getResult1() }
            val deferred2: Deferred<String> = async { getResult2() }
            val deferred3: Deferred<String> = async { getResult3() }
            results = listOf(deferred1.await(), deferred2.await(), deferred3.await())
        }
    
        println("Time: $time") // Time: 1034
        println(results)       // [Result1, Result2, Result3]
    }
    

    当我们总是拿 launch 和 async 来做对比的时候,就会不自觉地认为 async 是用来替代 launch 的。但实际上,async 最常见的使用场景是:与挂起函数结合,优化并发

    请不要小看这个场景,在实际工作中,如果你仔细去分析嵌套的异步代码,你会发现,很多异步任务之间都是没有互相依赖的,这样的代码结合挂起函数后,再通过 async 并发来执行,是可以大大提升代码运行效率的。

    小结

    这节课,我们主要学习了 Job、Deferred,通过对它们两者的学习,我们知道了,协程是有生命周期的,同时也发现,协程其实是结构化的

    • Job 相当于协程的句柄,在 Job 的内部,维护了一系列的生命周期状态,它也对应着协程的生命周期状态
    • 可以通过 Job 监测协程的状态,也可以一定程度地操控协程的状态
    • 可以通过 Job.invokeOnCompletion {} 监听协程执行完毕的事件,通过 Job.join() 挂起当前协程的执行流程,等到协程执行完毕以后,再恢复执行后面的代码
    • Deferred.await() 的行为模式和 Job.join() 类似,只是它还会返回协程的执行结果
    • 协程是结构化的并发,这是它的第二大优势,一个 Job 可以拥有多个 ChildJob;对应的,协程也可拥有多个 子协程
    • 结构化并发带来的最大优势就在于,我们可以实现只控制 父协程,从而达到控制一堆子协程的目的
    • parentJob.join() 不仅会等待它自身执行完毕,还会等待它内部的全部子 Job 执行完毕,parentJob.cancel() 同理

    2016-11-12

  • 相关阅读:
    CentOS查看CPU信息、位数、多核信息
    Linux常用命令大全
    chmod命令详细用法
    tar命令的详细解释
    yum和rpm命令详解
    LeetCode 241. Different Ways to Add Parentheses
    LeetCode 139. Word Break
    LeetCode 201. Bitwise AND of Numbers Range
    LeetCode 486. Predict the Winner
    LeetCode 17. Letter Combinations of a Phone Number
  • 原文地址:https://www.cnblogs.com/baiqiantao/p/6056520.html
Copyright © 2011-2022 走看看