Kotlin:深入理解StateFlow与SharedFlow,StateFlow和LiveData使用差异区分,SharedFlow实现源码解析。

本文接上一篇博文:Kotlin:Flow 全面详细指南,附带源码解析。

StateFlow、SharedFlow

先看一下Google对于StateFlow和SharedFlow的介绍?‍♀️:

StateFlow and SharedFlow are Flow APIs that enable flows to optimally emit state updates and emit values to multiple consumers.

StateFlow 和 SharedFlow 是 Flow API,它们使流能够以最佳方式发出状态更新并向多个消费者发出值

StateFlow和SharedFlow是一种很特殊的Flow,它们是热流。介绍Flow的时候有说过,它是冷流,再不调用终端操作符的情况下,Flow构建块的代码是不会执行的,每一个消费者调用一次Flow,则构建块的代码会从头到尾执行一次。而热流可以不依赖消费者而存活,可以在流之外生成数据,然后传递给流

真所谓,由简入繁,繁尽至简?。

我们先从StateFlow和SharedFlow的使用讲起(会稍带涉及到StateFlow和LiveData的区别),而后对其核心实现进行源码分析。

StateFlow使用

StateFlow简介

StateFlow 是一个状态持有者可观察流,它向其收集器发出当前和新状态更新。 当前状态值也可以通过其 value 属性读取。 要更新状态并将其发送到流。

StateFlow提供可读可写仅可读两个版本,这一点和LiveData相似,如下表格?‍♀️。

仅可读 可读可写
LiveData LiveData MutableLiveData
StateFlow StateFlow SharedFlow

StateFlow的用法

StateFlow 非常适合需要维护可观察的可变状态的类。

简单使用举例?

    val value = AtomicInteger(0)
    val stateFlow = MutableStateFlow(value.incrementAndGet()) //+1
    stateFlow.value = value.incrementAndGet()// +1
    runBlocking {
        GlobalScope.launch (Dispatchers.Default){
            stateFlow.collect {
                log("receiver value $it")
            }
        }
        GlobalScope.launch {
            delay(2000)
            stateFlow.collect {
                log("receiver2 value $it")
            }
        }
        while (true) {
            delay(1000)
            val sendValue = value.incrementAndGet()
            log("sendValue$sendValue")
            stateFlow.emit(sendValue)
        }
    }
//输出
21:42:34:789 [DefaultDispatcher-worker-1] receiver value 2
21:42:35:781 [main] sendValue3
21:42:35:781 [DefaultDispatcher-worker-1] receiver value 3
21:42:36:784 [main] sendValue4
21:42:36:784 [DefaultDispatcher-worker-1] receiver value 4
21:42:36:784 [DefaultDispatcher-worker-1] receiver2 value 4
21:42:37:786 [main] sendValue5
21:42:37:786 [DefaultDispatcher-worker-1] receiver value 5
21:42:37:786 [DefaultDispatcher-worker-2] receiver2 value 5
...

下面来解释一下?‍♀️:

  • 我们创建了AtomicInteger初始值为0,我们调用incrementAndGet获取值,然后传入MutableStateFlow构造方法,构建StateFlow。之后设置StateFlow的属性value = incrementAndGet。注意:上面说的一系列操作都是没有消费者订阅的情况下进行的,之后启动第一个协程进行collect,打印出来的第一个值是2。说明:在没有订阅者的情况下,流依旧进行了执行。
  • 再看代码最下面写了死循环,每隔1秒钟emit一个值给到StateFlow,所以第一个协程手机是没有问题的,也是每隔一秒钟打印一次。看第二个协程,实在延迟两秒中之后,才进行收集,我们看到第二个收集器,是从4开始打印的,且和第一个同步。说明:StateFlow只保留一个值且是最新值;可以允许有多个订阅者,会将此时最新的值,发送给此时订阅的所有存活的订阅者。

LiveData与StateFlow差异对比

liveData可参考:Jetpack:LiveData使用指南,实现原理详细解析!

可以发现基本和LiveData基本一样?‍♀️,但是还是有不同的,下面总结一下StateFlow和LiveData的差异性

相同点:?

  1. 提供可读可写仅可读两个版本
  2. 值是唯一的
  3. 允许被多个观察者观察
  4. 永远会把最新的值给到观察者,即使没有观察者,也会更新自己的值
  5. 都会产生粘性事件问题
  6. 都可能产生丢失值的问题

不同点:?

  1. StateFlow必须在构建的时候传入初始值,LiveData不需要
  2. StateFlow默认是防抖的,LiveData默认不防抖
  3. 对于Android来说StateFlow默认没有和生命周期绑定,直接使用会有问题,请继续向下看?‍♀️

StateFlow特别说明

下面就上方的几点特别的做一下说明:?

StateFlow会产生粘性事件:这个其实在上面的例子就可以说明了,StateFlow是必有值的,只要有订阅者订阅,这个值立马就会发送过去。具体可以理解为,在ViewModel中放了一个StateFlow之后在Activity中对值进行了调用弹出一个SnackBar,在屏幕旋转之后会再次弹出SnackBar。

StateFlow丢失值问题:当上游发射的速度,大于订阅者处理的速度的时候,中间值就会丢失了,下面具体看一个例子?:

    val stateFlow = MutableStateFlow(value.get())
    runBlocking {
        stateFlow.onEach {
            //模拟处理耗时
            delay(3000)
            log("receiver:$it")
        }.launchIn(this)
        while (true) {
            delay(1000)
            val v = value.incrementAndGet()
            log("send:$v")
            stateFlow.emit(v)
        }
    }
//结果
10:58:26:004 [main] send:1
10:58:27:031 [main] send:2
10:58:27:990 [main] receiver:0
10:58:28:038 [main] send:3
10:58:29:054 [main] send:4
10:58:30:057 [main] send:5
10:58:30:996 [main] receiver:2
10:58:31:070 [main] send:6
10:58:32:073 [main] send:7
10:58:33:076 [main] send:8
10:58:34:004 [main] receiver:5
...

上方模拟处理耗时是3秒,发送则是每隔一秒钟发一次。处理耗时是发送的三倍,那么中间的值就丢失了。

至于之前讲Flow时说到的三种背压处理策略,这里只能使用collectLatest,发送新值时取消之前的操作,可以参考之前Flow里面的详细介绍。

StateFlow默认是防抖的

这一点可以看一下源码:?‍♀️

	emit()调用了setValue
    setValue 调用了 updateState
    看一下 updateState
    private fun updateState(expectedState: Any?, newState: Any): Boolean {
        var curSequence = 0
        var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
        synchronized(this) {
            val oldState = _state.value
            if (expectedState != null && oldState != expectedState) return false // CAS support
            if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
            _state.value = newState
            curSequence = sequence
            if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                curSequence++ // make it odd
                sequence = curSequence
            } else {
                // update is already in process, notify it, and return
                sequence = curSequence + 2 // change sequence to notify, keep it odd
                return true // updated
            }
            curSlots = slots // read current reference to collectors under lock
        }

关注一下这一行代码if (oldState == newState) return true,更新数据时,会判断当前值与新值是否相同,如果相同则不更新数据。

关于可读可写仅可读的使用官方给出一下的例子,可以参考:?‍♀️

class CounterModel {
    private val _counter = MutableStateFlow(0) // private mutable state flow
    val counter = _counter.asStateFlow() // publicly exposed as read-only state flow

    fun inc() {
        _counter.value++
    }
}

另外注意一点:StateFlow 是一个热流。所以只要流被收集或从垃圾收集根存在对它的任何其他引用,它就会保留在内存中。

Android中使用StateFlow实践

我们知道直接在UI中对收集流时不安全的,所以需要判断判断UI的生命周期,来决定是否开启或者取消流的收集,对应于Flow就是是否启动或者取消收集的协程。我们知道LiveData自身有一个LifecycleBoundObserver需要传入当前的LifecycleOwner用于绑定声明周期,在onStop之后就会停止收集,那么StateFlow有没有呢?没有!?

但是Google官方在有一个扩展函数,对生命周期进行了管理,方便使用。就是repeatOnLifecycle,它会在传入的生命周期开始启动一个协程,然后在与之对应的另一个状态取消携程,它是一个挂起函数,在onDestroy之后才会恢复下方的执行,举个例子如下所示?

	override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // 使用lifecycleScope启动协程
        lifecycleScope.launch {
            // 每次生命周期处于 STARTED 状态(或更高状态)时,repeatOnLifecycle 在新的协程中启动块,并在它停止时取消它。
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
                // 
                ViewModel.uiState.collect { uiState ->
                    // todo
                }
            }
            //如果这里被执行,则代表生命周期已经走到了onDestroy,因为repeatOnLifecycle是挂起函数,在生命周期为onDestroy的时候进行了恢复。
        }
    }

注意:repeatOnLifecycle API 仅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01 库及更高版本中可用。后续会有文章会对repeatOnLifecycle做源码分析,敬请期待!

SharedFlow使用

SharedFlow简介

SharedFlow也是一个热流。它是StateFlow 的高度可配置的泛化,提供了更多的能力。换句话说,StateFlow是一个特殊的SharedFlow。

SharedFlow同样有两个版本,SharedFlowMutableSharedFlow

那为什么说StateFlow是一个特殊的SharedFlow呢??我们来看一下继承关系就知道了

StateFlow 继承自 SharedFlow MutableStateFlow 继承自 MutableSharedFlow

SharedFlow的使用

在这之前,我们先看一下StateFlow与SharedFlow有什么不同

  • SharedFlow没有默认值
  • SharedFlow可以保存旧的数据,根据配置可以将旧的数据回播给新的订阅者
  • SharedFlow使用emit/tryEmit发射数据,StateFlow内部其实都是调用的setValue
  • SharedFlow会挂起直到所有的订阅者处理完成。

看一下MutableSharedFlow的构建函数

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

可以看得出,MutableSharedFlow不需要传入默认值。

解析一下构建MutableSharedFlow的三个参数的含义

  • replay:重播给新订阅者的值的数量(不能为负,默认为零)
  • extraBufferCapacity:除重播外缓冲的值的数量。 当有剩余缓冲区空间时,emit 不会挂起(可选,不能为负,默认为零)。
  • onBufferOverflow:配置缓冲区溢出的操作(可选,默认BufferOverflow.SUSPEND,缓存溢出时挂起;另外还有DROP_OLDEST与DROP_LATEST,分别是溢出时删除缓冲区中最旧的值,将新值添加到缓冲区,不要挂起。与在缓冲区溢出时删除当前添加到缓冲区的最新值(以便缓冲区内容保持不变),不要挂起。)

下面看几个使用的例子?‍♀️

  • 默认情况下,SharedFlow没有粘性事件。如下所示:?‍♀️

        var v = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking {
            sharedFlow.emit(++v)
            val job = sharedFlow.onEach {
                log("receiver$it")
            }.launchIn(this)
            delay(5000)
            job.cancel()
            log("end")
        }
        //输出
        20:38:19:818 [main] end
    

    因为默认情况下,replay的值为0,也就是说在订阅的时候,将会有0个值回播给订阅者,所以就没有粘性事件了!

  • 再看一下正常使用的情况

        val sharedFlow = MutableSharedFlow<Int>()
        val value = AtomicInteger(0)
        runBlocking {
            GlobalScope.launch {
                sharedFlow.collect {
                    log("receiver value $it")
                }
            }
            GlobalScope.launch {
                delay(3000)
                sharedFlow.collect {
                    log("receiver2 value $it")
                }
            }
            while (true) {
                delay(1000)
                sharedFlow.emit(value.incrementAndGet())
            }
        }
        //结果
    20:55:34:565 [DefaultDispatcher-worker-2] receiver value 1
    20:55:35:552 [DefaultDispatcher-worker-2] receiver value 2
    20:55:36:560 [DefaultDispatcher-worker-2] receiver value 3
    20:55:36:560 [DefaultDispatcher-worker-1] receiver2 value 3
    20:55:37:570 [DefaultDispatcher-worker-1] receiver value 4
    20:55:37:570 [DefaultDispatcher-worker-2] receiver2 value 4
    ...
    

    默认的使用除了粘性事件之外,其他的和StateFlow就没有什么区别了。所以如果为了解决粘性事件的问题,可以使用SharedFlow。但是注意一点:SharedFlow是不防抖的

  • SharedFlow默认是要等到订阅者全部接收到并且处理完成之后,才会进行下一次发送,否则就是挂起

    var value = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking {
            sharedFlow.onEach {
                delay(3000)
                log("receiver1:$it")
            }.launchIn(GlobalScope)
            sharedFlow.onEach {
                delay(1000)
                log("receiver2:$it")
            }.launchIn(GlobalScope)
            while (true) {
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            }
        }
        //输出
    22:35:45:787 [main] send:1
    22:35:46:822 [main] send:2
    22:35:46:822 [DefaultDispatcher-worker-2] receiver2:1
    22:35:47:833 [DefaultDispatcher-worker-2] receiver2:2
    22:35:48:823 [DefaultDispatcher-worker-2] receiver1:1
    22:35:49:829 [main] send:3
    22:35:50:834 [DefaultDispatcher-worker-2] receiver2:3
    22:35:51:825 [DefaultDispatcher-worker-2] receiver1:2
    22:35:52:837 [main] send:4
    22:35:53:843 [DefaultDispatcher-worker-2] receiver2:4
    22:35:54:832 [DefaultDispatcher-worker-2] receiver1:3
    22:35:55:838 [main] send:5
    ...
    

    可以看得出,我们仅仅设置send的延迟为1秒钟,但是因为第二个订阅者模拟处理时间为3秒钟,所以emit会等最后一个处理完成才会进行下一次的发射,可以看到从send:2以后,每次发射都间隔三秒钟了,保证所有的订阅者全部接收到数据。如果是StateFlow的话,是不管订阅者是否处理完成的,依旧会保持值的替换

  • 其中一个订阅者出现异常怎么办呢

    这个就不放代码了,异常的传播机制和协程的异常传播机制还是一样的。如果订阅者和发送的在同一个作用域则异常传播机制同协程作用域。如果和发送者分别在不同的作用域则订阅者出现异常,影响不到别的订阅者和发送的协程。

  • SharedFlow同一个作用域下丢失值的问题,和启动协程有关,这里简单介绍一下什么情况下会出现值丢失的情况,下一篇会详细分析协程的恢复与挂起实现原理,会讲到具体的原因,可以关注博主

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>()
        runBlocking {
            sharedFlow.onEach {
                log("receiver1:$it")
            }.launchIn(this)
            while (true) {
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            }
        }
        //结果
    09:27:08:434 [main] send:1
    09:27:09:471 [main] send:2
    09:27:09:471 [main] receiver1:2
    09:27:10:477 [main] send:3
    09:27:10:477 [main] receiver1:3
    09:27:11:493 [main] send:4
    09:27:11:493 [main] receiver1:4
    ...
    

    显而易见,第一个值没有被接受到,这是为什么呢?我查看了一下源码,按照顺序执行下来的话,在相同的作用域下启动协程,如果下面不挂起的话,上面的协程是不会启动的,所以第一个值就没有被发送了(SharedFlow在无订阅者的时候会丢失值);如果launchIn的作用域不是当前的作用域的话,也就是可以单独设置一个GlobalScope的话,此时就是没有问题的,所以上面的情况需要注意下一下。

  • 如果需要回播就值给新的订阅者,可以设置replay

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>(replay = 4)
        runBlocking {
            launch {
                sharedFlow.collect {
                    log("receiver1:$it")
                }
            }
            launch {
                delay(2000)
                sharedFlow.collect {
                    log("receiver2:$it")
                }
            }
            launch {
                delay(4000)
                sharedFlow.collect {
                    log("receiver3:$it")
                }
            }
            while (true) {
                val v = ++value
                log("send:$v")
                sharedFlow.emit(v)
                delay(1000)
            }
        }
    //结果
    11:17:13:977 [main] send:1
    11:17:14:002 [main] receiver1:1
    11:17:15:009 [main] send:2
    11:17:15:009 [main] receiver1:2
    11:17:16:019 [main] receiver2:1
    11:17:16:019 [main] receiver2:2
    11:17:16:019 [main] send:3
    11:17:16:019 [main] receiver1:3
    11:17:16:019 [main] receiver2:3
    11:17:17:024 [main] send:4
    11:17:17:024 [main] receiver1:4
    11:17:17:024 [main] receiver2:4
    11:17:18:017 [main] receiver3:1
    11:17:18:017 [main] receiver3:2
    11:17:18:017 [main] receiver3:3
    11:17:18:017 [main] receiver3:4
    11:17:18:033 [main] send:5
    ...
    

    看得出,设置replay之后,如果当前发射缓存的值不够的话,会将当前已发射的值全部回播给新的订阅者。否则,就回播replay数量的值。

  • extraBufferCapacity 可以在去除重播的数量之外进行额外的缓存。上面的例子我们看到,如果有一个订阅者执行的很慢的话,那么emit就会挂起等待最慢的订阅者执行完,但是如果配置了额外缓存且缓存有剩余空间的话,此时就不会挂起。看如下一个例子。

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 3)
        runBlocking {
            sharedFlow.onEach {
                //模拟处理耗时
                delay(5000)
                log("receiver$it")
            }.launchIn(GlobalScope)
            while (true) {
                delay(1000)
                val v = ++value
                sharedFlow.emit(v)
                log("send$v")
            }
        }
        //结果
    12:57:23:347 [main] send1
    12:57:24:372 [main] send2
    12:57:25:374 [main] send3
    12:57:26:377 [main] send4
    12:57:28:350 [DefaultDispatcher-worker-1] receiver1
    12:57:28:350 [main] send5
    12:57:33:353 [DefaultDispatcher-worker-1] receiver2
    12:57:33:353 [main] send6
    

    可以看得到replay为0,模拟耗时是3秒,理论上来说send需要挂起等到订阅者处理完。但是因为额外的缓存配置了3,所以在订阅者没有处理完的时候,又连续发射了3个值,直到第四个才挂起等待订阅者处理完成。

    所以,总的缓存数量应该等于:replay+extraBufferCapacity;

  • 如果在replay和extraBufferCapacity都存在的情况下,新的订阅者取值的策略是什么呢?看一下如下的逻辑?‍♀️

        var value = 0
        val sharedFlow = MutableSharedFlow<Int>(replay = 2, extraBufferCapacity = 3)
        runBlocking {
            sharedFlow.onEach {
                //模拟处理耗时
                delay(5000)
                log("receiver$it")
            }.launchIn(GlobalScope)
            GlobalScope.launch {
                delay(5000)
                sharedFlow.collect{
                    log("receiver2:$it")
                }
            }
            while (true) {
                delay(1000)
                val v = ++value
                sharedFlow.emit(v)
                log("send$v")
            }
        }
        //结果
    13:12:06:167 [main] send1
    13:12:07:192 [main] send2
    13:12:08:195 [main] send3
    13:12:09:197 [main] send4
    13:12:10:157 [DefaultDispatcher-worker-2] receiver2:3
    13:12:10:157 [DefaultDispatcher-worker-2] receiver2:4
    13:12:10:200 [main] send5
    13:12:10:200 [DefaultDispatcher-worker-2] receiver2:5
    13:12:11:170 [DefaultDispatcher-worker-2] receiver1
    13:12:11:203 [main] send6
    ...
    

    看receiver2,显而易见,会按照当前已发送的最近的replay个进行回播给新的订阅者

SharedFlow使用实战?‍♀️

  • 可以封装为事件总线

    SharedFlow 可用于将应用程序内部发生的事件广播给可以来来去去的订阅者。 例如,以下类封装了一个事件总线,该总线以集合方式将事件分发给所有订阅者,挂起直到所有订阅者处理每个事件:

    class EventBus {
        private val _events = MutableSharedFlow<Event>() // private mutable shared flow
        val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
    
        suspend fun produceEvent(event: Event) {
            _events.emit(event) // suspends until all subscribers receive it
        }
    }
    
  • Android中使用,比如定时刷新新闻:

    // 当应用程序中所有需要集中刷新的类,都可以订阅tickFlow来获取同一时间的刷新
    class TickHandler(
        private val externalScope: CoroutineScope,
        private val tickIntervalMs: Long = 5000
    ) {
        private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
        val tickFlow: SharedFlow<Event<String>> = _tickFlow
    
        init {
            externalScope.launch {
                while(true) {
                    _tickFlow.emit(Unit)
                    delay(tickIntervalMs)
                }
            }
        }
    }
    
    class NewsRepository(
        private val tickHandler: TickHandler,
        private val externalScope: CoroutineScope
    ) {
        init {
            externalScope.launch {
                // 监听更新   collect会挂起  所以该协程一直不会取消
                tickHandler.tickFlow.collect {
                    //todo 刷新 新闻
                }
            }
        }
        ...
    }
    
  • MutableSharedFlow 还有一个 subscriptionCount 属性,其中包含活动收集器的数量,可以用来协助处理很多事情,比如判断当前SharedFlow是否被激活

    sharedFlow.subscriptionCount
        .map { count -> count > 0 } // map count into active/inactive flag
        .distinctUntilChanged() // only react to true<->false changes
        .onEach { isActive -> // configure an action
            if (isActive) onActive() else onInactive()
        }
        .launchIn(scope) // launch it
    

另外补充一点:MutableSharedFlow 还包含一个 resetReplayCache 函数,可以用来重置回播的数量。可以自行了解一下,这里不再举例。

SharedFlow、StateFlow的使用区别,换句话说,事件和状态的区别?

  • 事件:并不全是希望将最新一条之前的事件全部丢弃。绝大部分情况是希望每条事件都能被执行
  • 状态:可以允许丢弃,展示最新的状态给UI展示即可,比如通知更新进度。

StateFlow可以更新值,不会关心当前消费者是否消费完毕上一个值(丢失中间值)。适用于状态。

SharedFlow会挂起直到,最后一个订阅者处理完成事件,才会发射下一个值。适用于事件消费。

StateFlow、SharedFlow源码解析

又到了激动人心的时刻了,源码分析它来了!!!?‍♀️

StateFlow实现和SharedFlow实现相对来说大差不大,相对来说SharedFlow会更复杂一点,篇幅有限,所以这里就只对SharedFlow做一波实现分析,StateFlow大家可以根据SharedFlow的源码自行跟一波?。

认识几个变量

先看一下MutableSharedFlow函数吧,可以构建一个SharedFlow

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 
    //传进去 replay , bufferCapacity = replay + extraBufferCapacity  , onBufferOverflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

此时会有几个变量,传到SharedFlowImpl构建了SharedFlow,这几个变量通过上面的介绍应该比较熟悉了,下面简单列一下。

  • replay 回播给新订阅者数量
  • bufferCapacity 真正缓存容量 = replay + extraBufferCapacity
  • onBufferOverflow 当缓存池满了之后的处理策略

接下来,需要看SharedFlowImpl里面涉及到的变量,由于SharedFlowImpl里面涉及到的代码量过多,这里就不贴代码,只列一下变量的名字以及含义。

  • buffer: Array<Any?>? 缓存发射的值(有两种,一种是直接是发射的值,另外是值的包装类Emitters),按需分配,分配的大小总是二的幂
  • replayIndex :Int 对于新的收集器来说,最小的回播序号。默认值为0
  • minCollectorIndex :Int 对于已经激活的收集器来说,最小的索引。如果没有等于replayIndex
  • bufferSize :Int 缓存的值 的size
  • queueSize :Int 挂起排队发射 的size
  • head : Long = minOf(minCollectorIndex, replayIndex) 缓存队列的头
  • replaySize :Int 需要回播的size
  • totalSize : Int 缓存的值数量和排队挂起发射值数量总和
  • bufferEndIndex :Long 缓存值的最后一个的序号索引
  • queueEndIndex :Long 排队发射器队列最后一个的序号索引

官方buffer缓存池,逻辑结构

                  buffered values
             /-----------------------
                          replayCache      queued emitters
                          /----------/----------------------
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
         |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
               ^           ^           ^                      ^
               |           |           |                      |
              head         |      head + bufferSize     head + totalSize
               |           |           |
     index of the slowest  |    index of the fastest
      possible collector   |     possible collector
               |           |
               |     replayIndex == new collector's index
               ---------------------- /
          range of possible minCollectorIndex

          head == minOf(minCollectorIndex, replayIndex) // by definition
          totalSize == bufferSize + queueSize // by definition

       INVARIANTS:
          minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
          replayIndex <= head + bufferSize

熟悉一下涉及到的几个其他类

在阅读主流程代码之前还要分析几个以及方法,辅助主流程代码的进行。?‍♀️

在上面的代码中,我们知道SharedFlow具体实现类是SharedFlowImpl,所以我们先分析SharedFlowImpl的父类和接口。

先看一下MutableSharedFlow接口?

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    override suspend fun emit(value: T)
    public fun tryEmit(value: T): Boolean
    public val subscriptionCount: StateFlow<Int>
    @ExperimentalCoroutinesApi
    public fun resetReplayCache()
}
public interface SharedFlow<out T> : Flow<T> {
    public val replayCache: List<T>
}
public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

看得出提供了一些方法或者常量的实现接口方法。

在看一下AbstractSharedFlow抽象类?

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
}

首先看到需要一个AbstractSharedFlowSlot的泛型参数,SharedFlowImpl继承AbstractSharedFlow时传递的是,SharedFlowSlot所以我们先看一下,这是什么东西。

private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
    @JvmField
    var index = -1L //当前收集器发射的索引

    @JvmField
    var cont: Continuation<Unit>? = null // collector waiting for new value
	//首次调用collect 会调用该函数 分配index
    override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
        if (index >= 0) return false // not free
        //将当前索引初始化为,新收集器需要回播的最初的序号replayIndex和minCollectorIndex的最小值  具体在SharedFlowImpl实现
        index = flow.updateNewCollectorIndexLocked()
        return true
    }
	//清除卡槽  当前订阅者 协程被取消或者终止时会调用到
    override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
        assert { index >= 0 }
        val oldIndex = index
        index = -1L
        cont = null // cleanup continuation reference
        //当收集器消失或更改索引时调用,返回锁定后要恢复的延续列表(返回可以进行恢复的emit列表)  具体在SharedFlowImpl实现
        return flow.updateCollectorIndexLocked(oldIndex)
    }
}

主要是订阅者的卡槽,每个订阅者都有一个属于自己的SharedFlowSlot对象,记录当前的自身所需值的index以及Continuation

那我们继续看AbstractSharedFlow类?

    @Suppress("UNCHECKED_CAST")
    protected var slots: Array<S?>? = null // 记录当前被分配的卡槽,按需分配,以2的幂次方增长
        private set
    protected var nCollectors = 0 // 订阅者的数量
        private set
    private var nextIndex = 0 // 下一个空闲卡槽的索引
    private var _subscriptionCount: MutableStateFlow<Int>? = null // init on first need
	//订阅者的数量 其实时用StateFlow来进行管理的  
    val subscriptionCount: StateFlow<Int>
        get() = synchronized(this) {
            // allocate under lock in sync with nCollectors variable
            _subscriptionCount ?: MutableStateFlow(nCollectors).also {
                _subscriptionCount = it
            }
        }
	//创建卡槽  在SharedFlowImpl中实现,具体返回了一个SharedFlowSlot
    protected abstract fun createSlot(): S
	//创建卡槽数组  在SharedFlowImpl中实现,具体返回了一个空ArrayList
    protected abstract fun createSlotArray(size: Int): Array<S?>

	//主要是分配了卡槽,相关状态进行了增减,代码比较简单,就是比较长,这里就不放了
    @Suppress("UNCHECKED_CAST")
    protected fun allocateSlot(): S {
        ...
    }
	//释放卡槽,同样,代码不难,比较长篇幅考虑,这里不放出
    @Suppress("UNCHECKED_CAST")
    protected fun freeSlot(slot: S) {
       ...
    }
	//封装的工具方法,迭代卡槽执行block
    protected inline fun forEachSlotLocked(block: (S) -> Unit) {
        if (nCollectors == 0) return
        slots?.forEach { slot ->
            if (slot != null) block(slot)
        }
    }

看得出,AbstractSharedFlow主要是对slot进行了一波管理。

在看一个类Emitter,用于发射挂起,记录发射的continuation等相关信息。

    private class Emitter(
        @JvmField val flow: SharedFlowImpl<*>,
        @JvmField var index: Long,
        @JvmField val value: Any?,
        @JvmField val cont: Continuation<Unit>
    ) : DisposableHandle {
        //发射取消,一些操作
        override fun dispose() = flow.cancelEmitter(this)
    }

记录了值的序号,值,continuation。另外它继承了DisposableHandle,所以可以直接加入当前协程的dispose的回调。可以在取消是触发dispose()。

下面再看个方法,用于对buffer值的添加和获取?

    // 将值放进buffer数组里面,若为null,则创建,会议2的倍数进行增长
    private fun enqueueLocked(item: Any?) {
        val curSize = totalSize
        val buffer = when (val curBuffer = buffer) {
            null -> growBuffer(null, 0, 2)
            else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
        }
        buffer.setBufferAt(head + curSize, item)
    }
//获取值
private fun getPeekedValueLockedAt(index: Long): Any? =
        when (val item = buffer!!.getBufferAt(index)) {
            is Emitter -> item.value
            else -> item
        }

主流程实现分析

上面的知识了解之后,下面进行主流程的分析。

我们先看一下发射emit

    override suspend fun emit(value: T) {
        if (tryEmit(value)) return // fast-path
        emitSuspend(value)
    }

好的看得出,会先进行tryEmit这个过程没有挂起,如果可以这时候发射的话,就return了,所以我们看一下tryEmit

override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = synchronized(this) {
            if (tryEmitLocked(value)) {
                //返回了 需要恢复的订阅者 主要便利了所有的卡槽,将需要恢复的订阅者continuation拉出来
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
    	//恢复需要恢复的订阅者列表
        for (cont in resumes) cont?.resume(Unit)
        return emitted
    }
   private fun tryEmitLocked(value: T): Boolean {
        // Fast path without collectors -> no buffering
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
        // 看的出只有当前缓存容量有剩余空间的话或者DROP_OLDEST策略,才会继续后面的流程
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
            }
        }
       //缓存控件有剩余或者DROP_OLDEST策略,允许发射值,发送进入buffer队列
        enqueueLocked(value)
       //其他状态调整,包括buffersize等相关状态
        ....
       //返回true,代码发生成功
        return true
    }

可以发现在缓存区有空闲剩余的时候,允许不挂起直接发射值。

这里注意一点?,比如设置了replay和extraBufferCapacity均为0时,即直接构造的SharedFlow,此时bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex == true所以使用tryEmit的话,永远发射不了值

如果缓存区不满足,比如上面的情况,此时就必须要挂起了,所以继续看emitSuspend函数。

    private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        //加了锁  所以是线程安全的
        val emitter = synchronized(this) lock@{
            // 这一步重试的之前的tryEmit
            if (tryEmitLocked(value)) {
                cont.resume(Unit)
                resumes = findSlotsToResumeLocked(resumes)
                return@lock null
            }
            // 发现还不行 创建Emitter对象,记录相关信息
            Emitter(this, head + totalSize, value, cont).also {
                enqueueLocked(it)
                queueSize++ // 发射器队列++
                // 查找可以恢复的订阅者
                if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
            }
        }
        // 将Emitter加入取消回调,在当前协程取消时会触发onDispose函数
        emitter?.let { cont.disposeOnCancellation(it) }
        // 恢复通知订阅者
        for (cont in resumes) cont?.resume(Unit)
    }

如果前面的东西都理解的话,其实也比较简单的吧。主要是将当前的值封装存到数组里面,然后恢复通知订阅者,自身挂起。等待订阅者调用恢复。

接下来看一下订阅者collect的实现

    override suspend fun collect(collector: FlowCollector<T>) {
        //调用父类(AbstractSharedFlow)allocateSlot()  分配卡槽
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                var newValue: Any?
                while (true) {
                    //查找值
                    newValue = tryTakeValue(slot)
                    //如果查找到值 break当前循环
                    if (newValue !== NO_VALUE) break
                    //没有找到值可供发射,则挂起
                    awaitValue(slot) 
                }
                collectorJob?.ensureActive()
                //当上一个循环退出时,代表找到了值,调用collector的emit进行真正的发射,此时就会调用到外部的collect程序体了。如果这一块不清楚,可以看博主之前对于flow篇的详解。
                collector.emit(newValue as T)
            }
        } finally {
            //卡槽释放
            freeSlot(slot)
        }
    }

看的出,其实就是一个循环的过程。如果缓存区一直有值,就一直会发射值出去。如果没有值则挂起等待恢复。所以有两个关键的方法tryTakeValue查找值以及awaitValue挂起。

下面我们分析一下这两个方法的实现?。

tryTakeValue

    private fun tryTakeValue(slot: SharedFlowSlot): Any? {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val value = synchronized(this) {
            //获取当前卡槽的索引index,如果index非法则返回-1,代表没有找到值
            val index = tryPeekLocked(slot)
            if (index < 0) {
                NO_VALUE
            } else {
                //走这个分支,可以找到正常的index
                val oldIndex = slot.index
                //根据索引,在buffer中获取值
                val newValue = getPeekedValueLockedAt(index)
                slot.index = index + 1 // 指向下一个值
                //该方法里面各种逻辑太多,就不展开分析了。
                //主要是获取了可以恢复的发射器,根据缓存的size和目前的值的数量,判断可以恢复恢复的发射器的数量。
                //根据排队挂起的发射器,获取对应的数量,进行恢复。另外在该方法后面还调用了updateBufferLocked方法,进行buffer数组各种状态的重新设置,比如head等相关状态。
                resumes = updateCollectorIndexLocked(oldIndex)
                newValue
            }
        }
        //恢复相应的发射器
        for (resume in resumes) resume?.resume(Unit)
        return value
    }

主要是对获取卡槽,通过索引获取值,另外判断此时有多少发射器可以恢复,然后进行通知发射器的恢复。

awaitValue

    private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
        synchronized(this) lock@{
            val index = tryPeekLocked(slot) // 重新使用tryPeekLocked判断是否存在可用的值
            if (index < 0) {
                slot.cont = cont // 没有值挂起
            } else {
                cont.resume(Unit) // 有值不需要挂起
                return@lock
            }
            slot.cont = cont // 挂起等待
        }
    }

这个比较简单,判断可以挂起的话,直接将continuation赋值给卡槽。挂起了,等待发射器的恢复。

总结?

以上就是SharedFlow基本的流程了,可能比较绕吧,各种状态的,博主也是花了一些时间才完全理清楚。各位再看的时候,可以结合源码进行分析,有些博主没有列举出来的方法,建议需要点进去看一看。

另外,有不理解的地方随时欢迎,私信博主。如有错误也欢迎指正。?‍♀️

下一篇预告:深入理解协程的挂起和恢复?

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>