Kotlin:Flow 全面详细指南,附带源码解析。

Flow

来了来了它终于来了,这篇本应在好几个个月前就需要发布的文章?。一拖再拖?,毕竟对于flow还是的有敬畏之心的,不好好研究一下真心不敢乱写,有什么问题,欢迎指出,欢迎私信技术交流?。那么现在就正式进入Flow的世界吧!

Flow简介

Flow是什么❓

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

简单来说:挂起函数可以异步处理并且返回单个值,但是如果要返回多个异步计算的值呢?这就是flow的用处了。下面从如何返回多个值开始一步一步深入了解flow

如何返回多个值?

For example 1 : list

我们可以有一个简单的函数,它返回一个包含三个数字的列表,然后使用 forEach 将它们全部打印出来:

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

结果

1
2
3

For example 2 : Sequences

如果我们需要阻塞代码来计算数字(每次计算需要 100 毫秒),那么我们可以使用序列来表示数字:

fun simple(): Sequence<Int> = sequence { // sequence 构建
    for (i in 1..3) {
        //模拟计算耗时
        Thread.sleep(100)
        println("send$i")
        // yield 下一个值
        yield(i) 
    }
}

fun main() {
    simple().forEach { value -> println("receiver$value") }
    println("end")
}

结果

send1
receiver1
send2
receiver2
send3
receiver3
end

For example 3 : 异步计算并返回?引入flow

有什么办法可以异步计算多个值并且返回的吗?当然可以,我们可以使用挂起函数,使计算过程在异步线程执行,最终以list的形式返回。

举个?

suspend fun simple(): List<Int> {
	//模拟耗时操作
    delay(1000)
    return listOf(1, 2, 3)
}

fun main()= runBlocking {
    val job = launch {
        simple().forEach { value -> println(value) }
    }
    launch {
        println("other operate")
    }
    job.join()
}

结果:从结果来看,耗时操作并没有影响主线程的运行?

other operate
1
2
3

但是,这样就够了吗?no no no !?‍♀️

使用list意味着我们只能一次性的返回所有的值。所以为了表示流的计算,引入了flow。就像可以使用Sequence类型用于同步计算值一样。

Flow使用

Flow 简单使用

上面介绍了flow要解决什么问题,那么我们就开始使用起来吧。

先看一个简单的?

fun simple(): Flow<Int> = flow { // flow 构建
    for (i in 1..3) {
        //模拟异步耗时计算
        delay(100)
        //发射值
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    // launch 一个协程 同时延时100毫秒打印 校验主线程是否阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            //主线程在这个时间段可以干别的事情
            delay(100)
        }
    }
    // collect flow value
    simple().collect { println(it) }
}

结果:通过线程打印 I'm not blocked证明异步计算不会阻塞主线程,计算成功之后会resume到collect里面继续执行。

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

通过上述代码,需要注意一下几点:?‍♀️

  • 使用flow代码块构建出来的类型为Flow
  • flow代码块里面允许写挂起函数。比如上面的,delay emit
  • 使用emit进行值的发射,使用collect进行值的收集

Flow 构建

除了上面使用的flow{}进行构建之外,还可以使用其他的方式进行构建。

  1. 使用flowOf可以定义一组固定的值

    fun simple(): Flow<Int> = flowOf(1, 2, 3)
    
  2. 可以使用 asFlow() 扩展函数将各种集合和序列转换为流。

    // 将list转换为flow
    listOf(1,2,3).asFlow().collect { value -> println(value) }
    

Flow 冷流

Flow是冷流,构建器代码在调用collect之前是不会进行调用的,对于多个调用者,都会重新走一遍构建器的代码。

废话不多说,上?

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

结果

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

每次收集流时都会开始,这就是为什么我们再次调用 collect 时会看到“Flow started”的原因。

Flow 取消

如何取消一个Flow呢?

Kotlin官方并没有提供flow取消的函数。啊 这????听到这个是不是还满疑惑。且听我细细道来。

Flow需要在协程里面使用,因为collect是挂起函数,另外基于冷流的特性,不调用collect构建器的代码压根不会走。所以只能是协程。那 我取消协程不就行了吗??。好像之前有看到过有开发者提出过,是否要给flow单独加一个取消的函数,被Jetbrains无情的拒绝了,哈哈哈哈很搞笑。下面引用Kotlin官方的一段话。

Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay).

adheres 坚持

Flow 坚持协程的一般协作取消。 像往常一样,当流在可取消的挂起函数(如延迟)中被挂起时,可以取消流收集。

这个adheres好像就像是在回复广大的开发者,你取消协程就行了???。

好了,下面看取消的?

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val job = launch {
        simple().collect { println(it) }
    }
    delay(250)
    job.cancel(CancellationException("timeout 250"))
    println("done")
}

结果:看我们只需要取消对应的协程即可,对应的flow也会被取消收集。

emit 1
1
emit 2
2
done

这里引申一点,对于timeout,官方有提供专用的操作函数,withTimeout系列。不需要我们手动delay然后继续调用取消,毕竟不是很优雅。

上述代码也可以写成如下的形式

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}
fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        simple().collect { value -> println(value) }
    }
    println("done")
}

看到了吗?直接将launch替换为withTimeoutOrNull就可以做到延时取消效果了,这里简单做一下源码分析。

源码
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
    ...
    try {
        return suspendCoroutineUninterceptedOrReturn { uCont ->
            val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
            coroutine = timeoutCoroutine
            setupTimeout<T?, T?>(timeoutCoroutine, block)
        }
    } catch (e: TimeoutCancellationException) {
       ...
    }
}
private class TimeoutCoroutine<U, in T: U>(
    @JvmField val time: Long,
    uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
    override fun run() {
        cancelCoroutine(TimeoutCancellationException(time, this))
    }
	...
}

private fun <U, T: U> setupTimeout(
    coroutine: TimeoutCoroutine<U, T>,
    block: suspend CoroutineScope.() -> T
): Any? {
    // schedule cancellation of this coroutine on time
    coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
    ...
}
    
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
        DefaultDelay.invokeOnTimeout(timeMillis, block, context)

省略了一大堆非核心的代码,我们直接看延时取消的操作,这里简单分析一下:

  • 创建TimeoutCoroutine对象,它同时实现了Runnable,在run里面调用了取消函数,抛出TimeoutCancellationException。
  • 调用当前context的invokeOnTimeout函数,该函数需要一个Runnable,传入了timeoutCoroutine。此实现使用内置的单线程调度执行器服务。会在延时对应的事件后调用Runnable的run函数,然后就会取消当前的协程。
  • 在取消协程之后,会取消掉当前上下文的所有将在完成时调用的回调,disposeOnCompletion函数被调用。

Flow 相关操作符

这一块的操作符,其实是比较多的。但是如果您熟悉RxJava的话,其实都是差不多的。这一块这里就不做源码分析了,只是看一下怎么使用即可。内部其实是创建了新的流返回出来了,有兴趣的话可以自行查看一下源码。

中间流操作符

map

映射。看?(传入请求流可以使用 map 操作符映射到结果,即使执行请求是由挂起函数实现的长时间运行的操作)

          
suspend fun performRequest(request: Int): String {
    delay(1000) // 模拟长时间的异步工作
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 转换为flow
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

结果

response 1
response 2
response 3
filter

过滤操作,看?

suspend fun performRequest(request: Int): String {
    delay(1000) // 模拟长时间的异步工作
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // 转换为flow
        .map { request -> performRequest(request) }
        .filter { it == "response 1" }
        .collect { response -> println(response) }
}

结果:仅返回匹配到的值

response 1

变换操作符

在流变换算子中,最通用的一种叫做变换。 它可以用来模仿简单的转换,比如 map 和 filter,也可以实现更复杂的转换。 使用transform,我们可以发出任意次数的任意值。

例如?,使用transform,我们可以在执行长时间运行的异步请求之前发出一个字符串,并在其后响应:

(1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }

结果

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

size限制操作符

顾名思义,限制收集的数量。使用运算符take

它会在判断当发射的值在达到相应限制时取消流程的执行。 因为协程中的取消总是通过抛出异常来执行。所以需要考虑进行相应的异常捕获来保证后续的流畅正常进行不被取消掉

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("not execute")
        emit(3)    
    } finally {
        println("finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take 两个值
        .collect { value -> println(value) }
}            

结果

1
2
finally in numbers

终端操作符

终端操作符可以启动一个流,最基础的就是上述常提到的collect。但是还有一些其他的终端操作符,它可能会让一些操作变得更简单:

转换为各种集合, toList 和 toSet。
    flowOf(1,2).toList().forEach { 
        println(it)
    }
    flowOf(1,2).toSet().forEach(::println)
first , 确保获取且进获取第一个值

返回流发出的第一个元素然后取消流的集合的终端运算符。 如果流为空,则抛出 NoSuchElementException。

    val value : Int = flowOf(1, 2).first()
    println(value)
使用 reduce 和 fold 将流合并到一个值。
    val sum = (1..5).asFlow()
        .map { it * it } //平方
        .reduce { a, b -> a + b } // 进行累加
    println(sum)
    //结果
    55

fold和reduce使用起来差不多,区别就是fold可以定义初始化,其实很简单,reduce传入的lambda前一个参数是每次计算的结果累计,后一个参数是当前需要传入的值,不明白可以去瞅一眼源码,这里不在引申。

onEach

这个操作符也较为常用,这里也介绍一下,返回在上游流的每个值向下游发出之前调用给定操作的流。

?

(1..5)
.asFlow()
.onEach {
	println("onEach$it")
}.collect()
//结果
onEach1
onEach2
onEach3
onEach4
onEach5

操作符的顺序

除非使用对多个流进行操作的特殊运算符,否则流的每个单独集合都按顺序执行。 该集合直接在调用终端运算符的协程中工作。 默认情况下不会启动新的协程。 每个发出的值都由从上游到下游的所有中间操作符处理,然后交付给终端操作符。

?

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }    
//结果  按照顺序没有值依次向下发射
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow 调度器切换

对于UI驱动型的程序来说,需要将长时间计算的任务放在异步线程处理,UI展示工作需要放在主线程处理。也就是说需要将构建器的代码放到异步线程执行,但是终端操作符,比如collect需要在主线程获取,那么怎么做呢?

使用flowOn操作符

在这之前您可能的了解一下,协程的调度器。可以简单参考之前写的一篇文章,有对调度器做简单介绍:https://blog.csdn.net/weixin_44235109/article/details/119981210

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模拟异步处理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)//使用flowOn传入Default的调度器
        .collect { value ->
            log("Collected $value")
        }
}

结果

16:39:21:954 [DefaultDispatcher-worker-1] Emitting 1
16:39:21:969 [main] Collected 1
16:39:22:071 [DefaultDispatcher-worker-1] Emitting 2
16:39:22:071 [main] Collected 2
16:39:22:178 [DefaultDispatcher-worker-1] Emitting 3
16:39:22:178 [main] Collected 3

可以很明显的看出,构建模块被调度到异步线程处理了。而收集的工作还在主线程进行。

flowOn负责构建的模块调度,那么收集的谁负责呢?

其实和异常处理类似,collect受调用它的协程上下文限制,所以最后的执行线程和当前协程上下文的调度器有关,目前我使用的是idea测试的,默认runBlocking的调度器就是主线程。如果是android上面的话,runBlocking可能就需要传入Dispatchers.Main了。

其实和RxJava还是非常相似的?。

注意一点,此时其实已经改变流执行的顺序了

官方的解释如下:

Another thing to observe here is that the flowOn operator has changed the default sequential nature of the flow. Now collection happens in one coroutine (“coroutine#1”) and emission happens in another coroutine (“coroutine#2”) that is running in another thread concurrently with the collecting coroutine. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

这里要注意的另一件事是 flowOn 运算符更改了流的默认顺序性质。 现在收集发生在一个协程(“coroutine#1”)中,发射发生在另一个协程(“coroutine#2”)中,该协程与收集协程同时运行在另一个线程中。 当必须在其上下文中更改 CoroutineDispatcher 时, flowOn 运算符为上游流创建另一个协程。

这一块我简单看了一下源码,这里面不同的调度器会遇到多线程的问题,最里面使用了channel进行了调度处理。具体的核心类是ChannelFlow。后面会对flow进行简单源码分析,但篇幅有限不对这一块过深入分析,感兴趣可以自行查看,或找博主私下探讨。

其实上面的看不出来会改变流执行的顺序,下面改变一下代码,验证一下看看?

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模拟异步处理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .collect { value ->
            delay(200)
            log("Collected $value")
        }
}

结果

16:52:33:258 [DefaultDispatcher-worker-1] Emitting 1
16:52:33:386 [DefaultDispatcher-worker-1] Emitting 2
16:52:33:482 [main] Collected 1
16:52:33:493 [DefaultDispatcher-worker-1] Emitting 3
16:52:33:684 [main] Collected 2
16:52:33:887 [main] Collected 3

我们只需要将,collect里面增加一个delay即可,发现其实这时候就是发射归发射,收集归收集了。不似上面我们写的程序,发射一个值只有到终端操作符之后才会发射第二个。这里面肯定就会对值进行缓存。那么就会牵扯到一个问题。老生常谈?‍♀️,背压处理

Flow 背压处理

对于背压处理,Kotlin 提供三种解决方案:

操作符 含义
buffer 指定固定容量缓存
conflate 保留最新的值
collectLatest 新值发送时取消之前的

buffer

这里有必要看一下buffer函数的源码定义

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND)

可以看出需要两个参数,都有默认值。

  • 第一个好理解容量,默认等于BUFFERED,这个值其实时64,可以按照使用时需要自己指定具体的数字。
  • 第二个是指定,当buffer溢出时的操作,默认的操作是挂起。还要两个操作分别是删除最旧的值不要挂起或者删除当前最新的值不要挂起,可以自行查看源码,这里不再引申。

使用?

    flow {
        for (i in 1..3) {
            //模拟异步处理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .buffer()
        .collect { value ->
            delay(200)
            log("Collected $value")
        }
//结果       
17:17:28:420 [DefaultDispatcher-worker-1] Emitting 1
17:17:28:536 [DefaultDispatcher-worker-1] Emitting 2
17:17:28:646 [main] Collected 1
17:17:28:646 [DefaultDispatcher-worker-1] Emitting 3
17:17:28:846 [main] Collected 2
17:17:29:049 [main] Collected 3

conflate

这个只获取最新值也比较好理解,应用场景,比如说获取下载进度,对于用户来说其实每次只需要获取当前最新的进度就好了,不需要把之前的值再去获取一遍,下面也举一个例子?

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模拟异步处理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .conflate()
        .collect { value ->
            delay(300)//模拟下游处理比较慢
            log("Collected $value")
        }
}  
//结果   第一个值肯定可以拿到 当地一个值处理完成之后  最新的值就是3了 所以丢弃了2
17:21:42:916 [DefaultDispatcher-worker-1] Emitting 1
17:21:43:034 [DefaultDispatcher-worker-1] Emitting 2
17:21:43:140 [DefaultDispatcher-worker-1] Emitting 3
17:21:43:236 [main] Collected 1
17:21:43:546 [main] Collected 3

collectLatest

说明一点:这玩意其实也是一个终端操作符

前两个可能都比较好理解,那新值发送时取消之前的是什么意思呢?为了便于理解,直接上例子,按照结果说明:

?

fun main() = runBlocking {
    flow {
        for (i in 1..3) {
            //模拟异步处理
            delay(100)
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default)
        .collectLatest {
            delay(300)
            log("Collected $it")
        }
}
//结果
17:25:33:916 [DefaultDispatcher-worker-1] Emitting 1
17:25:34:038 [DefaultDispatcher-worker-1] Emitting 2
17:25:34:150 [DefaultDispatcher-worker-1] Emitting 3
17:25:34:453 [main] Collected 3

对比上面的例子,这里只是将collect替换成了collectLatest而已。为什么1没有了呢?

显而易见了,这玩意会在最新的到来会直接取消下游上一个消费的处理,因为有delay所以1还没有来得及打印,就因为下一个值发射了,然后就被取消了!!!您可真霸道呢??‍♀️

Flow 异常处理

当操作符内的发射器或代码抛出异常时,流收集可以以异常结束。 有几种方法可以处理这些异常。

try…catch

较为简单,上?

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}
//结果  
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

显而易见,抛出异常之后,收集结束,如果是UI驱动程序比如:Android还是推荐主从作用域,异常不会向上传播。

思考一个问题,刚刚异常是发生在收集端,如果在构建的时候发生异常呢?

fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
        .map { value ->
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
            "string $value"
        }

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}
//结果
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2

完美:异常仍被捕获并停止收集

上述代码的问题就是不够优雅,还有异常对于流来说必须是透明的,使用try … catch 显然违反了透明性,所以Kotlin 封装了try catch.

catch 相关运算符

catch 运算符的主体可以分析异常并根据捕获的异常以不同的方式对其做出反应:

  • 可以使用 throw 重新抛出异常。
  • 可以使用 catch 主体中的发射将异常转换为值的发射。
  • 异常可以被其他代码忽略、记录或处理。

看?

simple()
    .catch { e -> emit("Caught $e") } // 我不仅捕获到了异常,我还能继续发射!!!!
    .collect { value -> println(value) }

结果

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: exception value is 2

当然因为抛出了异常,协程还是会终止,但是此时异常是以发生的形式传递下去的。

注意:catch 中间运算符,尊重异常透明性,只捕获上游异常(即来自 catch 之上的所有运算符的异常,但不在其之下)。 如果 collect { … } 中的块(位于 catch 下方)抛出异常,则它不会捕获:

?

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // 不会捕获到下游的异常
        .collect { value ->
            if(value <= 1) throw IllegalStateException("Collected $value")
            println(value)
        }
}
//
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
	at org.example.zxf.kotlin11.flow.TestFlowKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)

这个时候可以利用onEach将需要可能捕获异常的地方前置,如下所示。

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

onCompletion 相关操作

我们知道和try … catch 搭配的 还有一个finally。所以 flow当然也有一个差不多的了,叫onCompletion

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}
//结果
Emitting 1
1
Emitting 2
2
Emitting 3
3
Done

抛个异常试试,onCompletion可以通过cause进行判断是否是正常结束.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
    emit(2)
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause ->
            if (cause != null) {
                println("Flow completed exceptionally")
            }
        }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}
//结果
1
Flow completed exceptionally
Caught exception

与 catch 运算符的另一个区别是 onCompletion 会看到所有异常,并且仅在成功完成上游流(没有取消或失败)时才会收到空异常。

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            if (value>1){
                throw IllegalStateException("exception value is $value")
            }
            println(value)
        }
}
//结果
1
Flow completed with java.lang.IllegalStateException: exception value is 2
Exception in thread "main" java.lang.IllegalStateException: exception value is 2

注意一点:虽然看到了,但是并没有进行捕获。异常还是抛出了

Flow 启动

最后提一点和启动相关的,先看下面两个例子:

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- collect 挂起函数阻塞
    println("Done")
}   
//结果
Event: 1
Event: 2
Event: 3
Done

对于上述,没有异议吧,Done的打印需要等待collect恢复,因为在一个协程内。如果需要不等待呢?那就需要另起一个协程了

?

fun main() = runBlocking<Unit> {
    launch {
        events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- collect 挂起函数阻塞
    }
    println("Done")
}
//结果
Done
Event: 1
Event: 2
Event: 3

但是上述写法可以等价于,下面的写法,flow这玩意封装了另一种启动方式

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Done")
}
//结果
Done
Event: 1
Event: 2
Event: 3

看一下launchIn源码!

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

好吧,其实不就是封装了一下嘛。主要是源码好多地方使用launchIn来着,所以这里提一下?‍♀️。

Flow原理解析

好吧,激动人心的时刻终于来了,做一些源码分析。

先写一个小?

fun main() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
    }.collect {
        println(it)
    }
}

好了,我们就分析这玩意怎么实现的!往深篇幅有限写不下了?。如果您想深入交流,欢迎私信交流。

所以,我先列出来flow函数构建的源码,只贴出核心代码:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

可以看出flow构建了一个SafeFlowblock传入。并且重写了collectSafely方法,调用了block()。这里需要注意一点SafeFlow继承自AbstractFlow.

好,我们在看一调用collect之后发生了什么。

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }	
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

看的出,这时将collect的block封装成了FlowCollector类型(重写emit方法用于执行block),此时会调用到SafeFlowcollect方法,具体在AbstractFlow里面实现。这里面干了什么呢?看的出,创建了SafeCollector类型的东西,然后将SafeCollector传递给了collectSafely。而在上面的分析中,我们知道collectSafely在构建flow时进行了实现,SafeCollector作为receiver以扩展函数的形式调用了,flow构建器的block(这里面执行了,我们手动的emit操作)。这么一看是不是有点联系起来了?

好的,接下来我们继续分析一下,SafeCollector,那这玩意的源码其实也是有点多的,我们应该看哪一个函数呢?通过上面的分析可知,调用collect之后,会调用到SafeFlowcollect方法,进而会以扩展函数的形式调用到flow的block,而在block里面就是我们自己写的emit了,所以receiver既然是SafeCollector,那肯定就是调用SafeCollector的emit了,我们去瞅一瞅SafeCollector的emit函数相关的调用链:?

	override suspend fun emit(value: T) {
        	...
            try {
                emit(uCont, value)
            } catch (e: Throwable) {
                ...
            }
    }
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        ...
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
    

好了,经过简化之后,就是上面这些玩意儿了。

首先,需要分清几个东西。

collectorthis的区别

  • collector:还记得上面的源码吗?我们调用collect的block封装到了一个FlowCollector,然后传递给了SafeCollector,collector就指FlowCollector。这里面重写了emit,emit里面调用了block,这个block就是我们自己写的了,这个时间简单理解调用collector的emit就可以调用到collect的block了。
  • this:理解了collector,this就指代当前的SafeCollector了。

现在我们继续看上面的代码:额 此时好像还需要理解一个东西emitFun

我们把它翻译成java看一下

   private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
      // $FF: synthetic method
      // $FF: bridge method
      public Object invoke(Object var1, Object var2, Object var3) {
         return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
      }

      @Nullable
      public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
         return p1.emit(p2, continuation);
      }
   }, 3);

简单明了,就是调用了collector的emit函数,但是你可能会说,emit不是支持一个参数吗?上面传进去俩啊,但是你别忘了,emit是挂起函数,默认有一个参数的传递就是continuation,这一块Kotlin替我们做了,但是对于Java来说必须要传递continuation进去。通过这种方式传入一样的continuation。保证了continuation的统一。

好了,总结一下:

分析下来,简单就是,利用扩展函数的性质,调用到flow的block进而调用了SafeCollector的emit,而这里的emit会调用到传进来的FlowCollector的emit,而传进来的emit函数被重写调用block,所以就会调用到collect的block了。

另外因为,只有调用collect之后,进而才会调用到SafeFlow的collect函数,进而才会调用到collectSafely函数去执行flow的代码。所以不调用collect的话,flow的代码构建块的代码是不会执行的,最多就是返回一个SafeFlow的对象而已

对于各种操作符的源码,这里不带着大家看了,不是很难,里面就是饶了一圈,又返回了流而已。可以自己查看。

另外对于flowOn或者其他操作符,导致调度器不一致时,此时底层将不再上上面这一套的逻辑,内部是使用ChannelFlow实现。这一块感兴趣的也可以瞅瞅源码,也可以与博主私信交流,里面还是比较绕。。。

还有还有补充一点,对于ShareFlowStateFlow,博主目前也在积极的筹划中,想了解的可以关注一波哈。如果您看到这里的话,觉得不错,希望您可以毫不吝啬手中的赞?,鼓励一下博主,感谢!
在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>