Skip to content

Commit 0b46f2b

Browse files
authored
Merge pull request #9 from tunjid/tj/suspending-mutator
Add suspending mutator variants
2 parents dd063b4 + c5ecf58 commit 0b46f2b

File tree

9 files changed

+414
-14
lines changed

9 files changed

+414
-14
lines changed

core/src/commonMain/kotlin/com/tunjid/mutator/Mutator.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,23 @@ typealias Mutation<State> = State.() -> State
2323

2424
typealias StateHolder<State> = StateMutator<State>
2525

26-
interface StateMutator<State : Any> {
26+
/**
27+
* A type that holds a state of type [State].
28+
*/
29+
interface StateMutator<out State : Any> {
30+
/**
31+
* The current state of the mutator.
32+
*/
2733
val state: State
2834
}
2935

36+
/**
37+
* A [StateMutator] that can accept actions of type [Action] to mutate its state.
38+
*/
3039
interface ActionStateMutator<Action : Any, State : Any> : StateMutator<State> {
40+
/**
41+
* Accepts an action to mutate the state.
42+
*/
3143
val accept: (Action) -> Unit
3244
}
3345

coroutines/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ kotlin {
3030
val commonTest by getting {
3131
dependencies {
3232
implementation(kotlin("test"))
33-
implementation(libs.kotlinx.coroutines.test)
33+
implementation(libs.compose.multiplatform.runtime)
3434
implementation(libs.cashapp.turbine)
35+
implementation(libs.kotlinx.coroutines.test)
3536
}
3637
}
3738
all {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.tunjid.mutator.coroutines
18+
19+
import com.tunjid.mutator.ActionStateMutator
20+
import kotlinx.coroutines.CoroutineScope
21+
import kotlinx.coroutines.channels.Channel
22+
import kotlinx.coroutines.flow.Flow
23+
import kotlinx.coroutines.flow.SharingStarted
24+
import kotlinx.coroutines.flow.receiveAsFlow
25+
import kotlinx.coroutines.launch
26+
27+
/**
28+
* An [ActionStateMutator] that can be suspended.
29+
*
30+
* This interface combines the capabilities of [ActionStateMutator] and [SuspendingStateMutator],
31+
* allowing for state production to be driven by actions and tied to the lifecycle of the collector.
32+
*/
33+
interface ActionSuspendingStateMutator<Action : Any, State : Any> :
34+
ActionStateMutator<Action, State>,
35+
SuspendingStateMutator<State>
36+
37+
/**
38+
* Creates an [ActionSuspendingStateMutator] that derives its state from a [producer] which
39+
* consumes a stream of [Action]s.
40+
*
41+
* @param initialState The initial state of the mutator.
42+
* @param started The [SharingStarted] strategy to control when the producer is active.
43+
* @param producer A suspending lambda that produces state changes. It is invoked when the
44+
* [started] strategy dictates that the producer should be active. It receives the current state
45+
* and a [Flow] of actions.
46+
*/
47+
fun <Action : Any, State : Any> CoroutineScope.actionSuspendingStateMutator(
48+
initialState: State,
49+
started: SharingStarted = SharingStarted.WhileSubscribed(DEFAULT_STOP_TIMEOUT_MILLIS),
50+
producer: suspend CoroutineScope.(State, Flow<Action>) -> Unit,
51+
): ActionSuspendingStateMutator<Action, State> = DelegatingActionSuspendingStateMutator(
52+
coroutineScope = this,
53+
initialState = initialState,
54+
started = started,
55+
producer = producer,
56+
)
57+
58+
private class DelegatingActionSuspendingStateMutator<Action : Any, State : Any>(
59+
coroutineScope: CoroutineScope,
60+
initialState: State,
61+
started: SharingStarted,
62+
producer: suspend CoroutineScope.(State, Flow<Action>) -> Unit,
63+
) : ActionSuspendingStateMutator<Action, State> {
64+
65+
private val actions = Channel<Action>()
66+
67+
val mutator = coroutineScope.suspendingStateMutator(
68+
state = initialState,
69+
started = started,
70+
producer = { currentState ->
71+
producer(
72+
currentState,
73+
actions.receiveAsFlow(),
74+
)
75+
},
76+
)
77+
78+
override val state: State
79+
get() = mutator.state
80+
81+
override val accept: (Action) -> Unit = { action ->
82+
coroutineScope.launch {
83+
actions.send(action)
84+
}
85+
}
86+
87+
override suspend fun collect() =
88+
mutator.collect()
89+
}

coroutines/src/commonMain/kotlin/com/tunjid/mutator/coroutines/FlowMutationStream.kt

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717
package com.tunjid.mutator.coroutines
1818

1919
import com.tunjid.mutator.Mutation
20+
import kotlinx.coroutines.CoroutineScope
2021
import kotlinx.coroutines.channels.BufferOverflow
2122
import kotlinx.coroutines.channels.Channel
2223
import kotlinx.coroutines.flow.Flow
2324
import kotlinx.coroutines.flow.channelFlow
25+
import kotlinx.coroutines.flow.collect
2426
import kotlinx.coroutines.flow.flatMapMerge
27+
import kotlinx.coroutines.flow.flow
2528
import kotlinx.coroutines.flow.onStart
2629
import kotlinx.coroutines.flow.receiveAsFlow
30+
import kotlinx.coroutines.launch
2731

2832
/**
2933
* Class holding the context of the [Action] emitted that is being split out into
@@ -32,7 +36,7 @@ import kotlinx.coroutines.flow.receiveAsFlow
3236
* Use typically involves invoking [type] to identify the [Action] stream being transformed, and
3337
* subsequently invoking [flow] to perform a custom transformation on the split out [Flow].
3438
*/
35-
data class TransformationContext<Action : Any>(
39+
class TransformationContext<Action : Any>(
3640
private val type: Action,
3741
val backing: Flow<Action>,
3842
) {
@@ -63,12 +67,12 @@ data class TransformationContext<Action : Any>(
6367
* [onBufferOverflow]: The behavior of the [Channel] on overflow. See the [BufferOverflow]
6468
* for details.
6569
*
66-
* [keySelector]: The mapping for the [Action] to the key used to identify it. This is useful
67-
* for nested class hierarchies. By default each distinct type will be split out, but if you want
68-
* to treat certain subtypes as one type, this lets you do that.
70+
* [keySelector]: The mapping for the [Action] to the key used to identify it. This is useful
71+
* for nested class hierarchies. By default each distinct type will be split out, but if you want
72+
* to treat certain subtypes as one type, this lets you do that.
6973
*
70-
* [transform]: a function for mapping independent [Flow]s of [Action] to [Flow]s of [State]
71-
* [Mutation]s
74+
* [transform]: a function for mapping independent [Flow]s of [Action] to [Flow]s of [State]
75+
* [Mutation]s
7276
* @see [splitByType]
7377
*/
7478
fun <Action : Any, State : Any> Flow<Action>.toMutationStream(
@@ -85,6 +89,53 @@ fun <Action : Any, State : Any> Flow<Action>.toMutationStream(
8589
transform = transform,
8690
)
8791

92+
/**
93+
* Processes a [Flow] of [Action] in the provided [productionScope], allowing for finer grained
94+
* processing on subtypes of [Action]. This allows for certain actions to be processed differently
95+
* than others. For example: a certain action may need to be only processed on distinct
96+
* emissions, whereas other actions may need to use more complex [Flow] transformations like
97+
* [Flow.flatMapMerge] and so on. It does so by creating a [Channel] for each subtype.
98+
*
99+
* [productionScope]: The [CoroutineScope] for processing flows transformed.
100+
* [capacity]: The capacity for the [Channel] created for each subtype. See the [Channel] factory
101+
* function for details.
102+
*
103+
* [onBufferOverflow]: The behavior of the [Channel] on overflow. See the [BufferOverflow]
104+
* for details.
105+
*
106+
* [keySelector]: The mapping for the [Action] to the key used to identify it. This is useful
107+
* for nested class hierarchies. By default each distinct type will be split out, but if you want
108+
* to treat certain subtypes as one type, this lets you do that.
109+
*
110+
* [transform]: a function for processing independent [Flow]s of [Action]. Each independent flow should be collected
111+
* in this block.
112+
*
113+
* @see [splitByType]
114+
*/
115+
fun <Action : Any> Flow<Action>.launchMutationsIn(
116+
productionScope: CoroutineScope,
117+
capacity: Int = Channel.BUFFERED,
118+
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
119+
keySelector: (Action) -> String = Any::defaultKeySelector,
120+
transform: suspend TransformationContext<Action>.() -> Unit,
121+
) {
122+
productionScope.launch {
123+
splitByType(
124+
capacity = capacity,
125+
onBufferOverflow = onBufferOverflow,
126+
typeSelector = { it },
127+
keySelector = keySelector,
128+
transform = transformation@{
129+
flow {
130+
transform(this@transformation)
131+
emit(Unit)
132+
}
133+
},
134+
)
135+
.collect()
136+
}
137+
}
138+
88139
/**
89140
* Transforms a [Flow] of [Input] to a [Flow] of [Output] by splitting the original into [Flow]s
90141
* of type [Selector]. Each independent [Flow] of the [Selector] type can then be transformed
@@ -125,10 +176,11 @@ fun <Input : Any, Selector : Any, Output : Any> Flow<Input>.splitByType(
125176
}
126177

127178
else -> {
128-
existingHolder.internalSharedFlow.send(selected)
179+
existingHolder.channel.send(selected)
129180
}
130181
}
131182
}
183+
keysToFlowHolders.values.forEach { it.channel.close() }
132184
}
133185
.flatMapMerge(
134186
concurrency = Int.MAX_VALUE,
@@ -144,16 +196,16 @@ private data class FlowHolder<Action>(
144196
val onBufferOverflow: BufferOverflow,
145197
val firstEmission: Action,
146198
) {
147-
val internalSharedFlow: Channel<Action> = Channel(
199+
val channel: Channel<Action> = Channel(
148200
capacity = capacity,
149201
onBufferOverflow = onBufferOverflow,
150202
)
151-
val exposedFlow: Flow<Action> = internalSharedFlow
203+
val exposedFlow: Flow<Action> = channel
152204
.receiveAsFlow()
153205
.onStart { emit(firstEmission) }
154206
}
155207

156-
private fun Any.defaultKeySelector(): String = this::class.simpleName
208+
private fun Any.defaultKeySelector(): String = this::class.qualifiedName
157209
?: throw IllegalArgumentException(
158210
"Only well defined classes can be split or specify a different key selector",
159211
)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.tunjid.mutator.coroutines
18+
19+
import com.tunjid.mutator.StateMutator
20+
import kotlinx.coroutines.CoroutineScope
21+
import kotlinx.coroutines.Job
22+
import kotlinx.coroutines.awaitCancellation
23+
import kotlinx.coroutines.flow.MutableStateFlow
24+
import kotlinx.coroutines.flow.SharingCommand
25+
import kotlinx.coroutines.flow.SharingStarted
26+
import kotlinx.coroutines.flow.update
27+
import kotlinx.coroutines.launch
28+
29+
/**
30+
* A [StateMutator] that can be suspended.
31+
*
32+
* This interface allows for the state production to be tied to the lifecycle of the collector.
33+
* When [collect] is called, it signals that the state production should be active.
34+
*/
35+
interface SuspendingStateMutator<State : Any> : StateMutator<State> {
36+
/**
37+
* Suspends and keeps the upstream producer active for as long as this function
38+
* is running.
39+
*
40+
* Callers should launch this in a coroutine. When the coroutine is cancelled,
41+
* the reference count is decremented.
42+
* * This function does not return normally; it suspends until cancellation.
43+
*/
44+
suspend fun collect()
45+
}
46+
47+
/**
48+
* Creates a [SuspendingStateMutator] that derives its state from a [producer].
49+
*
50+
* @param state The initial state of the mutator.
51+
* @param started The [SharingStarted] strategy to control when the producer is active.
52+
* @param producer A suspending lambda that produces state changes. It is invoked when the
53+
* [started] strategy dictates that the producer should be active.
54+
*/
55+
fun <State : Any> CoroutineScope.suspendingStateMutator(
56+
state: State,
57+
started: SharingStarted = SharingStarted.WhileSubscribed(DEFAULT_STOP_TIMEOUT_MILLIS),
58+
producer: suspend CoroutineScope.(State) -> Unit,
59+
): SuspendingStateMutator<State> {
60+
val subscriptionCount = MutableStateFlow(0)
61+
val mutator = RefCountingSuspendingStateMutator(
62+
state = state,
63+
subscriptionCount = subscriptionCount,
64+
)
65+
66+
launch {
67+
var producerJob: Job? = null
68+
started.command(subscriptionCount).collect { command ->
69+
when (command) {
70+
SharingCommand.START -> {
71+
if (producerJob == null || producerJob?.isActive == false) {
72+
producerJob = launch { producer(state) }
73+
}
74+
}
75+
76+
SharingCommand.STOP,
77+
SharingCommand.STOP_AND_RESET_REPLAY_CACHE,
78+
-> {
79+
producerJob?.cancel()
80+
producerJob = null
81+
}
82+
}
83+
}
84+
}
85+
86+
return mutator
87+
}
88+
89+
private class RefCountingSuspendingStateMutator<State : Any>(
90+
override val state: State, // The stable state holder
91+
private val subscriptionCount: MutableStateFlow<Int>,
92+
) : SuspendingStateMutator<State> {
93+
override suspend fun collect() {
94+
try {
95+
subscriptionCount.update { it + 1 }
96+
awaitCancellation()
97+
} finally {
98+
subscriptionCount.update { it - 1 }
99+
}
100+
}
101+
}

coroutines/src/commonTest/kotlin/com/tunjid/mutator/coroutines/ActionStateMutatorKtTest.kt renamed to coroutines/src/commonTest/kotlin/com/tunjid/mutator/coroutines/ActionStateFlowMutatorTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import kotlinx.coroutines.test.resetMain
3737
import kotlinx.coroutines.test.runTest
3838
import kotlinx.coroutines.test.setMain
3939

40-
class ActionStateMutatorKtTest {
40+
class ActionStateFlowMutatorTest {
4141

4242
private val testDispatcher = UnconfinedTestDispatcher()
4343

0 commit comments

Comments
 (0)