Skip to content

Commit 6bcc320

Browse files
nicklaslclaudevahidlazio
authored
fix(java): use writeLock for close/flush to prevent concurrent WASM access (#303)
Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: vahid torkaman <vahidt@spotify.com>
1 parent 227ea1e commit 6bcc320

File tree

2 files changed

+132
-4
lines changed

2 files changed

+132
-4
lines changed

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/WasmResolveApi.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void setResolverState(byte[] state, String accountId) {
102102
}
103103

104104
public void close() {
105-
wasmLock.readLock().lock();
105+
wasmLock.writeLock().lock();
106106
try {
107107
final var voidRequest = Messages.Void.getDefaultInstance();
108108

@@ -125,12 +125,12 @@ public void close() {
125125
writeFlagLogs.writeSync(request);
126126
isConsumed = true;
127127
} finally {
128-
wasmLock.readLock().unlock();
128+
wasmLock.writeLock().unlock();
129129
}
130130
}
131131

132132
public void flushAssignLogs() {
133-
wasmLock.readLock().lock();
133+
wasmLock.writeLock().lock();
134134
try {
135135
if (isConsumed) {
136136
return;
@@ -141,7 +141,7 @@ public void flushAssignLogs() {
141141
final var request = consumeResponse(respPtr, WriteFlagLogsRequest::parseFrom);
142142
writeFlagLogs.write(request);
143143
} finally {
144-
wasmLock.readLock().unlock();
144+
wasmLock.writeLock().unlock();
145145
}
146146
}
147147

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package com.spotify.confidence.sdk;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.google.protobuf.Struct;
6+
import com.google.protobuf.Value;
7+
import com.spotify.confidence.sdk.flags.resolver.v1.ResolveFlagsRequest;
8+
import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessRequest;
9+
import com.spotify.confidence.sdk.flags.resolver.v1.Sdk;
10+
import com.spotify.confidence.sdk.flags.resolver.v1.SdkId;
11+
import com.spotify.confidence.sdk.flags.resolver.v1.WriteFlagLogsRequest;
12+
import java.util.concurrent.CountDownLatch;
13+
import org.junit.jupiter.api.BeforeAll;
14+
import org.junit.jupiter.api.Test;
15+
16+
/**
17+
* Regression test for the race condition between {@link WasmResolveApi#flushAssignLogs()} and
18+
* {@link WasmResolveApi#close()}. Previously both methods acquired a readLock, allowing concurrent
19+
* execution on the same WASM instance. When they raced, flag assignments were lost — close() found
20+
* 0 entries because flushAssignLogs() already drained them (or vice versa, with corrupted WASM
21+
* memory access). The fix changes both methods to use writeLock for exclusive access.
22+
*/
23+
class WasmResolveApiFlushCloseRaceTest {
24+
private static final String FLAG_CLIENT_SECRET = "ti5Sipq5EluCYRG7I5cdbpWC3xq7JTWv";
25+
private static final String TARGETING_KEY = "test-race";
26+
27+
private static byte[] resolverState;
28+
private static String accountId;
29+
30+
@BeforeAll
31+
static void fetchState() {
32+
final var stateProvider =
33+
new FlagsAdminStateFetcher(FLAG_CLIENT_SECRET, new DefaultHttpClientFactory());
34+
stateProvider.reload();
35+
resolverState = stateProvider.provide();
36+
accountId = stateProvider.accountId();
37+
}
38+
39+
@Test
40+
void concurrentFlushAndCloseShouldNotLoseAssignments() throws Exception {
41+
final int iterations = 100;
42+
int lostAssigns = 0;
43+
44+
for (int i = 0; i < iterations; i++) {
45+
final var logger = new CapturingWasmFlagLogger();
46+
final var api = new WasmResolveApi(logger);
47+
api.setResolverState(resolverState, accountId);
48+
49+
// Resolve a flag to create a flag assignment in the WASM buffer
50+
api.resolveProcess(buildResolveRequest());
51+
52+
// Race: flushAssignLogs and close concurrently
53+
final var startLatch = new CountDownLatch(1);
54+
final var doneLatch = new CountDownLatch(2);
55+
56+
final Thread flusher =
57+
new Thread(
58+
() -> {
59+
try {
60+
startLatch.await();
61+
api.flushAssignLogs();
62+
} catch (InterruptedException e) {
63+
Thread.currentThread().interrupt();
64+
} finally {
65+
doneLatch.countDown();
66+
}
67+
});
68+
69+
final Thread closer =
70+
new Thread(
71+
() -> {
72+
try {
73+
startLatch.await();
74+
api.close();
75+
} catch (Exception e) {
76+
// close() may fail due to corrupted WASM state from concurrent access
77+
} finally {
78+
doneLatch.countDown();
79+
}
80+
});
81+
82+
flusher.start();
83+
closer.start();
84+
startLatch.countDown();
85+
doneLatch.await();
86+
87+
// Count total flag assignments across all captured requests
88+
final int totalAssigns =
89+
logger.getCapturedRequests().stream()
90+
.mapToInt(WriteFlagLogsRequest::getFlagAssignedCount)
91+
.sum();
92+
93+
if (totalAssigns == 0) {
94+
lostAssigns++;
95+
}
96+
}
97+
98+
assertThat(lostAssigns)
99+
.as(
100+
"Flag assignments were lost in %d out of %d iterations due to concurrent "
101+
+ "WASM memory access in flushAssignLogs() and close()",
102+
lostAssigns, iterations)
103+
.isEqualTo(0);
104+
}
105+
106+
private static ResolveProcessRequest buildResolveRequest() {
107+
final var resolveFlagsRequest =
108+
ResolveFlagsRequest.newBuilder()
109+
.addFlags("flags/web-sdk-e2e-flag")
110+
.setApply(true)
111+
.setClientSecret(FLAG_CLIENT_SECRET)
112+
.setEvaluationContext(
113+
Struct.newBuilder()
114+
.putFields(
115+
"targeting_key", Value.newBuilder().setStringValue(TARGETING_KEY).build())
116+
.build())
117+
.setSdk(
118+
Sdk.newBuilder()
119+
.setId(SdkId.SDK_ID_JAVA_LOCAL_PROVIDER)
120+
.setVersion(Version.VERSION)
121+
.build())
122+
.build();
123+
124+
return ResolveProcessRequest.newBuilder()
125+
.setWithoutMaterializations(resolveFlagsRequest)
126+
.build();
127+
}
128+
}

0 commit comments

Comments
 (0)