Skip to content

Commit 78b7423

Browse files
nicklaslclaude
andauthored
perf(java): combine AOT compilation, shared module, and bounded flush (#284)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c97d701 commit 78b7423

File tree

8 files changed

+134
-249
lines changed

8 files changed

+134
-249
lines changed

openfeature-provider/java/pom.xml

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,6 @@
155155
<artifactId>runtime</artifactId>
156156
<version>1.4.0</version>
157157
</dependency>
158-
<dependency>
159-
<groupId>com.dylibso.chicory</groupId>
160-
<artifactId>compiler</artifactId>
161-
<version>1.4.0</version>
162-
</dependency>
163158
<dependency>
164159
<groupId>org.junit.jupiter</groupId>
165160
<artifactId>junit-jupiter-api</artifactId>
@@ -279,14 +274,50 @@
279274
<sources>
280275
<source>${project.build.directory}/generated-sources/protobuf/java</source>
281276
<source>${project.build.directory}/generated-sources/protobuf/grpc-java</source>
277+
<source>${project.build.directory}/generated-sources/chicory-compiler</source>
282278
</sources>
283279
</configuration>
284280
</execution>
281+
<execution>
282+
<id>add-chicory-classes</id>
283+
<phase>generate-sources</phase>
284+
<goals>
285+
<goal>add-resource</goal>
286+
</goals>
287+
<configuration>
288+
<resources>
289+
<resource>
290+
<directory>${project.build.directory}/generated-resources/chicory-compiler</directory>
291+
</resource>
292+
</resources>
293+
</configuration>
294+
</execution>
285295
</executions>
286296
</plugin>
287297

288298
<!-- Makefile copies local WASM into resources; no download/clean plugin needed -->
289-
299+
300+
<!-- AOT compile WASM to Java bytecode at build time -->
301+
<plugin>
302+
<groupId>com.dylibso.chicory</groupId>
303+
<artifactId>chicory-compiler-maven-plugin</artifactId>
304+
<version>1.4.0</version>
305+
<executions>
306+
<execution>
307+
<id>compile-wasm</id>
308+
<phase>generate-sources</phase>
309+
<goals>
310+
<goal>compile</goal>
311+
</goals>
312+
<configuration>
313+
<wasmFile>src/main/resources/wasm/confidence_resolver.wasm</wasmFile>
314+
<name>com.spotify.confidence.sdk.ConfidenceResolverModule</name>
315+
<interpreterFallback>WARN</interpreterFallback>
316+
</configuration>
317+
</execution>
318+
</executions>
319+
</plugin>
320+
290321
<!-- JUnit 5 - Unit tests -->
291322
<plugin>
292323
<groupId>org.apache.maven.plugins</groupId>

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/GrpcWasmFlagLogger.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.spotify.confidence.sdk.flags.resolver.v1.WriteFlagLogsRequest;
88
import io.grpc.*;
99
import java.time.Duration;
10-
import java.util.ArrayList;
11-
import java.util.List;
1210
import java.util.concurrent.ExecutorService;
1311
import java.util.concurrent.Executors;
1412
import java.util.concurrent.TimeUnit;
@@ -22,8 +20,6 @@ interface FlagLogWriter {
2220

2321
public class GrpcWasmFlagLogger implements WasmFlagLogger {
2422
private static final Logger logger = LoggerFactory.getLogger(GrpcWasmFlagLogger.class);
25-
// Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
26-
private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000;
2723
private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
2824
private final InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub;
2925
private final ExecutorService executorService;
@@ -81,53 +77,6 @@ public void write(WriteFlagLogsRequest request) {
8177
return;
8278
}
8379

84-
final int flagAssignedCount = request.getFlagAssignedCount();
85-
86-
// If flag_assigned list is small enough, send everything as-is
87-
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
88-
sendAsync(request);
89-
return;
90-
}
91-
92-
// Split flag_assigned into chunks and send each chunk asynchronously
93-
logger.debug(
94-
"Splitting {} flag_assigned entries into chunks of {}",
95-
flagAssignedCount,
96-
MAX_FLAG_ASSIGNED_PER_CHUNK);
97-
98-
final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
99-
for (WriteFlagLogsRequest chunk : chunks) {
100-
sendAsync(chunk);
101-
}
102-
}
103-
104-
private List<WriteFlagLogsRequest> createFlagAssignedChunks(WriteFlagLogsRequest request) {
105-
final List<WriteFlagLogsRequest> chunks = new ArrayList<>();
106-
final int totalFlags = request.getFlagAssignedCount();
107-
108-
for (int i = 0; i < totalFlags; i += MAX_FLAG_ASSIGNED_PER_CHUNK) {
109-
final int end = Math.min(i + MAX_FLAG_ASSIGNED_PER_CHUNK, totalFlags);
110-
final WriteFlagLogsRequest.Builder chunkBuilder =
111-
WriteFlagLogsRequest.newBuilder()
112-
.addAllFlagAssigned(request.getFlagAssignedList().subList(i, end));
113-
114-
// Include telemetry and resolve info only in the first chunk
115-
if (i == 0) {
116-
if (request.hasTelemetryData()) {
117-
chunkBuilder.setTelemetryData(request.getTelemetryData());
118-
}
119-
chunkBuilder
120-
.addAllClientResolveInfo(request.getClientResolveInfoList())
121-
.addAllFlagResolveInfo(request.getFlagResolveInfoList());
122-
}
123-
124-
chunks.add(chunkBuilder.build());
125-
}
126-
127-
return chunks;
128-
}
129-
130-
private void sendAsync(WriteFlagLogsRequest request) {
13180
writer.write(request);
13281
}
13382

@@ -140,27 +89,6 @@ public void writeSync(WriteFlagLogsRequest request) {
14089
return;
14190
}
14291

143-
final int flagAssignedCount = request.getFlagAssignedCount();
144-
145-
// If flag_assigned list is small enough, send everything as-is
146-
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
147-
sendSync(request);
148-
return;
149-
}
150-
151-
// Split flag_assigned into chunks and send each chunk synchronously
152-
logger.debug(
153-
"Synchronously splitting {} flag_assigned entries into chunks of {}",
154-
flagAssignedCount,
155-
MAX_FLAG_ASSIGNED_PER_CHUNK);
156-
157-
final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
158-
for (WriteFlagLogsRequest chunk : chunks) {
159-
sendSync(chunk);
160-
}
161-
}
162-
163-
private void sendSync(WriteFlagLogsRequest request) {
16492
try {
16593
stub.clientWriteFlagLogs(request);
16694
logger.debug("Synchronously sent flag log with {} entries", request.getFlagAssignedCount());

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/OpenFeatureLocalResolveProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,14 @@ public class OpenFeatureLocalResolveProvider implements FeatureProvider {
5353
private final MaterializationStore materializationStore;
5454
private final ResolverApi wasmResolveApi;
5555
private static final Duration POLL_LOG_INTERVAL = Duration.ofSeconds(10);
56+
private static final Duration ASSIGN_LOG_FLUSH_INTERVAL = Duration.ofMillis(100);
5657
private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(30);
5758
private final ScheduledExecutorService flagsFetcherExecutor =
5859
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
5960
private final ScheduledExecutorService logPollExecutor =
6061
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
62+
private final ScheduledExecutorService assignLogExecutor =
63+
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
6164
private final AccountStateProvider stateProvider;
6265
private final AtomicReference<ProviderState> state =
6366
new AtomicReference<>(ProviderState.NOT_READY);
@@ -226,6 +229,16 @@ public void initialize(EvaluationContext evaluationContext) {
226229
POLL_LOG_INTERVAL.getSeconds(),
227230
POLL_LOG_INTERVAL.getSeconds(),
228231
TimeUnit.SECONDS);
232+
233+
assignLogExecutor.scheduleAtFixedRate(
234+
() -> {
235+
if (wasmResolveApi.isInitialized()) {
236+
wasmResolveApi.flushAssignLogs();
237+
}
238+
},
239+
ASSIGN_LOG_FLUSH_INTERVAL.toMillis(),
240+
ASSIGN_LOG_FLUSH_INTERVAL.toMillis(),
241+
TimeUnit.MILLISECONDS);
229242
}
230243

231244
private void scheduleStateRefresh(
@@ -332,6 +345,7 @@ public void shutdown() {
332345
log.debug("Shutting down scheduled executors");
333346
flagsFetcherExecutor.shutdown();
334347
logPollExecutor.shutdown();
348+
assignLogExecutor.shutdown();
335349

336350
try {
337351
if (!flagsFetcherExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
@@ -342,10 +356,15 @@ public void shutdown() {
342356
log.warn("Log poll executor did not terminate gracefully");
343357
logPollExecutor.shutdownNow();
344358
}
359+
if (!assignLogExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
360+
log.warn("Assign log executor did not terminate gracefully");
361+
assignLogExecutor.shutdownNow();
362+
}
345363
} catch (InterruptedException e) {
346364
log.warn("Interrupted while waiting for scheduled executors to shut down", e);
347365
flagsFetcherExecutor.shutdownNow();
348366
logPollExecutor.shutdownNow();
367+
assignLogExecutor.shutdownNow();
349368
Thread.currentThread().interrupt();
350369
}
351370

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/ResolverApi.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ interface ResolverApi {
2929
*/
3030
void updateStateAndFlushLogs(byte[] state, String accountId);
3131

32+
/** Flushes pending assignment logs using bounded flush. */
33+
void flushAssignLogs();
34+
3235
/** Closes the resolver and releases any resources. */
3336
void close();
3437

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/SwapWasmResolverApi.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public SwapWasmResolverApi(
2323
this.materializationStore = materializationStore;
2424
this.flagLogger = flagLogger;
2525

26-
// Create initial instance
2726
final WasmResolveApi initialInstance = new WasmResolveApi(flagLogger);
2827
initialInstance.setResolverState(initialState, accountId);
2928
this.wasmResolverApiRef.set(initialInstance);
@@ -41,7 +40,6 @@ public boolean isInitialized() {
4140

4241
@Override
4342
public void updateStateAndFlushLogs(byte[] state, String accountId) {
44-
// Create new instance with updated state
4543
final WasmResolveApi newInstance = new WasmResolveApi(flagLogger);
4644
newInstance.setResolverState(state, accountId);
4745

@@ -52,6 +50,18 @@ public void updateStateAndFlushLogs(byte[] state, String accountId) {
5250
}
5351
}
5452

53+
@Override
54+
public void flushAssignLogs() {
55+
final WasmResolveApi currentInstance = wasmResolverApiRef.get();
56+
if (currentInstance != null) {
57+
currentInstance.flushAssignLogs();
58+
}
59+
}
60+
61+
/**
62+
* Closes the current WasmResolveApi instance, flushing any pending logs. This ensures all
63+
* buffered log data is sent before shutdown completes.
64+
*/
5565
@Override
5666
public void close() {
5767
final WasmResolveApi currentInstance = wasmResolverApiRef.getAndSet(null);

openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/ThreadLocalSwapWasmResolverApi.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public void updateStateAndFlushLogs(byte[] state, String accountId) {
9696
CompletableFutures.allAsList(futures).join();
9797
}
9898

99+
@Override
100+
public void flushAssignLogs() {
101+
resolverInstances.values().forEach(SwapWasmResolverApi::flushAssignLogs);
102+
}
103+
99104
/**
100105
* Maps the current thread to a resolver instance using round-robin assignment. Each thread gets
101106
* assigned to an instance index when first accessed, ensuring even distribution across available

0 commit comments

Comments
 (0)