Kotlin Flows
本文包含的内容:
- Flow是什么, 基本概念和用法.
- Flow的不同类型, StateFlow和SharedFlow比较.
- Flow在Android中的使用
- 安全收集.
- 操作符
stateIn
,shareIn
的用法和区别.
本文被收录在集合中: https://github.com/mengdd/KotlinTutorials
Coroutines Flow Basics
Flow是什么
Flow可以按顺序发送多个值, 概念上是一个数据流, 发射的值必须是同一个类型.
Flow使用suspend方法来生产/消费值, 数据流可以做异步计算.
几个基本知识点:
- 创建flow: 通过flow builders
- Flow数据流通过
emit()
来发射元素. - 可以通过各种操作符对flow的数据进行处理. 注意中间的操作符都不会触发flow的数据发送.
- Flow默认是cold flow, 即需要通过被观察才能激活, 最常用的操作符是
collect()
. - Flow的
CoroutineContext
, 不指定的情况下是collect()
的CoroutineContext
, 如果想要更改, 用flowOn
改之前的.
关于Flow的基本用法, 19年底写的这篇coroutines flow in Android可以温故知新.
Flow的操作符
一个Flow操作符的可视化小网站: FlowMarbles.
Flow的不同类型
SharedFlow and StateFlow
应用程序里比较常用的类型是SharedFlow和StateFlow.
Android官方有一篇专门的文档来介绍二者: StateFlow and SharedFlow
StateFlow继承于SharedFlow, SharedFlow继承于Flow.
基本关系如下:
-
Flow
基类. Cold.
Flow的两大特性: Context preservation; Exception transparency. -
SharedFlow
继承Flow, 是一种hot flow, 所有collectors共享它的值, 永不终止, 是一种广播的方式.
一个shared flow上的活跃collector被叫作subscriber.
在sharedFlow上的collect call永远不会正常complete, 还有Flow.launchIn.
可以配置replay and buffer overflow strategy.
如果subscriber suspend了, sharedflow会suspend这个stream, buffer这个要发射的元素, 等待subscriber resume.
Because onBufferOverflow is set with BufferOverflow.SUSPEND
, the flow will suspend until it can deliver the event to all subscribers.
默认参数:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
total buffer是: replay + extraBufferCapacity
.
如果total buffer是0, 那么onBufferOverflow只能是onBufferOverflow = BufferOverflow.SUSPEND
.
关于reply和buffer, 这个文章
有详细的解释, 并且配有动图.
- StateFlow
继承SharedFlow, hot flow, 和是否有collector收集无关, 永不complete.
可以通过value
属性访问当前值.
有conflated特性, 会跳过太快的更新, 永远返回最新值.
Strong equality-based conflation: 会通过equals()
来判断值是否发生改变, 如果没有改变, 则不会通知collector.
因为conflated的特性, StateFlow赋值的时候要注意使用不可变的值.
cold vs hot
cold stream 可以重复收集, 每次收集, 会对每一个收集者单独开启一次.
hot stream 永远发射不同的值, 和是否有人收集无关, 永远不会终止.
StateFlow vs SharedFlow
共性:
StateFlow
和SharedFlow
永远都不会停止. 不能指望它们的onCompletionCallback
.
不同点:
StateFlow
可以通过value
属性读到最新的值, 但SharedFlow
却不行.StateFlow
是conflated: 如果新的值和旧的值一样, 不会传播.SharedFlow
需要合理设置buffer和replay策略.
互相转换:
SharedFlow
用了distinctUntilChanged
以后变成StateFlow
.
// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
RxJava的等价替代:
PublishSubject
->SharedFlow
.BehaviorSubject
->StateFlow
.
Use Flow in Android
发送事件(Event或Effects): SharedFlow
因为SharedFlow没有conflated特性, 所以适合发送事件, 即便值变化得快也是每个都发送.
private val _sharedViewEffects = MutableSharedFlow<SharedViewEffects>() // 1
val sharedViewEffects = _sharedViewEffects.asSharedFlow() // 2
这里用了asSharedFlow
来创建一个ReadonlySharedFlow
.
SharedFlow发射元素有两个方法:
emit
: suspend方法.tryEmit
: 非suspend方法.
因为tryEmit
是非suspend的, 适用于有buffer的情况.
保存暴露UI状态: StateFlow
StateFlow
是一个state-holder, 可以通过value
读到当前状态值.
一般会有一个MutableStateFlow
类型的Backing property.
StateFlow
是hot的, collect并不会触发producer code.
当有新的consumer时, 新的consumer会接到上次的状态和后续的状态.
使用StateFlow时, 发射新元素只需要赋值:
mutableState.value = newState
注意这里新值和旧的值要equals
判断不相等才能发射出去.
StateFlow vs LiveData
StateFlow
和LiveData
很像.
StateFlow
和LiveData
的相同点:
- 永远有一个值.
- 只有一个值.
- 支持多个观察者.
- 在订阅的瞬间, replay最新的值.
有一点点不同:
StateFlow
需要一个初始值.LiveData
会自动解绑, flow要达到相同效果, collect要在Lifecycle.repeatOnLifecycle
里.
Flow的安全收集
关于收集Flow的方法, 主要还是关注一下生命周期的问题, 因为SharedFlow和StateFlow都是hot的.
在这个文章里有详细的讨论: A safer way to collect flows from Android UIs
在UI层收集的时候注意要用repeatOnLifecycle
:
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
//...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
这个文章里有个扩展方法也挺好的:
class FlowObserver<T> (
lifecycleOwner: LifecycleOwner,
private val flow: Flow<T>,
private val collector: suspend (T) -> Unit
) {
private var job: Job? = null
init {
lifecycleOwner.lifecycle.addObserver(LifecycleEventObserver {
source: LifecycleOwner, event: Lifecycle.Event ->
when (event) {
Lifecycle.Event.ON_START -> {
job = source.lifecycleScope.launch {
flow.collect { collector(it) }
}
}
Lifecycle.Event.ON_STOP -> {
job?.cancel()
job = null
}
else -> { }
}
})
}
}
inline fun <reified T> Flow<T>.observeOnLifecycle(
lifecycleOwner: LifecycleOwner,
noinline collector: suspend (T) -> Unit
) = FlowObserver(lifecycleOwner, this, collector)
inline fun <reified T> Flow<T>.observeInLifecycle(
lifecycleOwner: LifecycleOwner
) = FlowObserver(lifecycleOwner, this, {})
看了一下官方的repeatOnLifecycle
其实大概也是这个意思:
public suspend fun Lifecycle.repeatOnLifecycle(
state: Lifecycle.State,
block: suspend CoroutineScope.() -> Unit
) {
require(state !== Lifecycle.State.INITIALIZED) {
"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
}
if (currentState === Lifecycle.State.DESTROYED) {
return
}
// This scope is required to preserve context before we move to Dispatchers.Main
coroutineScope {
withContext(Dispatchers.Main.immediate) {
// Check the current state of the lifecycle as the previous check is not guaranteed
// to be done on the main thread.
if (currentState === Lifecycle.State.DESTROYED) return@withContext
// Instance of the running repeating coroutine
var launchedJob: Job? = null
// Registered observer
var observer: LifecycleEventObserver? = null
try {
// Suspend the coroutine until the lifecycle is destroyed or
// the coroutine is cancelled
suspendCancellableCoroutine<Unit> { cont ->
// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
// cancels when it falls below that state.
val startWorkEvent = Lifecycle.Event.upTo(state)
val cancelWorkEvent = Lifecycle.Event.downFrom(state)
val mutex = Mutex()
observer = LifecycleEventObserver { _, event ->
if (event == startWorkEvent) {
// Launch the repeating work preserving the calling context
launchedJob = this@coroutineScope.launch {
// Mutex makes invocations run serially,
// coroutineScope ensures all child coroutines finish
mutex.withLock {
coroutineScope {
block()
}
}
}
return@LifecycleEventObserver
}
if (event == cancelWorkEvent) {
launchedJob?.cancel()
launchedJob = null
}
if (event == Lifecycle.Event.ON_DESTROY) {
cont.resume(Unit)
}
}
this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
}
} finally {
launchedJob?.cancel()
observer?.let {
this@repeatOnLifecycle.removeObserver(it)
}
}
}
}
}
既然官方已经推出了, 我们就用官方的repeatOnLifecycle
方法吧.
shareIn
和stateIn
前面提过这两个操作符是用来做flow转换的:
shareIn
可以保证只有一个数据源被创造, 并且被所有collectors收集.
比如:
class LocationRepository(
private val locationDataSource: LocationDataSource,
private val externalScope: CoroutineScope
) {
val locations: Flow<Location> =
locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())
}
WhileSubscribed
这个策略是说, 当无人观测时, 上游的flow就被取消.
实际使用时可以用WhileSubscribed(5000)
, 让上游的flow即便在无人观测的情况下, 也能继续保持5秒.
这样可以在某些情况(比如旋转屏幕)时避免重建上游资源, 适用于上游资源创建起来很expensive的情况.
如果我们的需求是, 永远保持一个最新的cache值.
class LocationRepository(
private val locationDataSource: LocationDataSource,
private val externalScope: CoroutineScope
) {
val locations: Flow<Location> =
locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)
}
Flow.stateIn
将会缓存最后一个值, 并且有新的collector时, 将这个最新值传给它.
shareIn
, stateIn
使用注意事项
永远不要在方法里面调用shareIn
和stateIn
, 因为方法每次被调用, 它们都会创建新的流.
这些流没有被复用, 会存在内存里面, 直到scope被取消或者没有引用时被GC.
推荐的使用方式是在property上用:
class UserRepository(
private val userLocalDataSource: UserLocalDataSource,
private val externalScope: CoroutineScope
) {
// DO NOT USE shareIn or stateIn in a function like this.
// It creates a new SharedFlow/StateFlow per invocation which is not reused!
fun getUser(): Flow<User> =
userLocalDataSource.getUser()
.shareIn(externalScope, WhileSubscribed())
// DO USE shareIn or stateIn in a property
val user: Flow<User> =
userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
}
StateFlow使用总结
从ViewModel暴露数据到UI, 用StateFlow
的两种方式:
- 暴露一个StateFlow属性, 用
WhileSubscribed
加上一个timeout.
class MyViewModel(...) : ViewModel() {
val result = userId.mapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}
- 用
repeatOnLifecycle
收集.
onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
myViewModel.myUiState.collect { ... }
}
}
}
其他的组合都会保持上游的活跃, 浪费资源:
- 用
WhileSubscribed
暴露属性, 在lifecycleScope.launch/launchWhenX
里收集. - 通过
Lazily/Eagerly
暴露, 用repeatOnLifecycle
收集.
References
- Kotlin flows on Android
- StateFlow and SharedFlow
- A safer way to collect flows from Android UIs
- Things to know about Flow’s shareIn and stateIn operators
- Shared flows, broadcast channels
- Kotlin SharedFlow or: How I learned to stop using RxJava and love the Flow
- Migrating from LiveData to Kotlin’s Flow
- Substituting Android’s LiveData: StateFlow or SharedFlow?
- Learning State & Shared Flows with Unit Tests
- Reactive Streams on Kotlin: SharedFlow and StateFlow
- Reading Coroutine official guide thoroughly — Part 0