-
Notifications
You must be signed in to change notification settings - Fork 1.5k
JAVA-5950 Update Transactions Convenient API with exponential backoff on retries #1852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: backpressure
Are you sure you want to change the base?
Changes from all commits
1ed116f
eb8b4ad
b8b0e1a
c05ce05
aa96659
98fc57b
f98262e
bfc89fc
d9405ef
5de452b
1867ff5
f89d62d
3d646ae
9b4bf15
da83704
cb95167
ef734a0
96b5ed7
43eda52
a4193dd
f2d8263
f3daab0
7afd9d4
ccb8d03
bbb9a68
c44872d
f0dd916
0e00c90
36ecbf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Copyright 2008-present MongoDB, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.mongodb.internal.time; | ||
|
|
||
| import com.mongodb.internal.VisibleForTesting; | ||
|
|
||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.function.DoubleSupplier; | ||
|
|
||
| import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; | ||
|
|
||
| /** | ||
| * Implements exponential backoff with jitter for retry scenarios. | ||
| */ | ||
| public enum ExponentialBackoff { | ||
| TRANSACTION(5.0, 500.0, 1.5); | ||
|
|
||
| private final double baseMs, maxMs, growth; | ||
|
|
||
| // TODO remove this global state once https://jira.mongodb.org/browse/JAVA-6060 is done | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
https://jira.mongodb.org/browse/JAVA-6060 is about introducing
|
||
| private static DoubleSupplier testJitterSupplier = null; | ||
|
|
||
| ExponentialBackoff(final double baseMs, final double maxMs, final double growth) { | ||
| this.baseMs = baseMs; | ||
| this.maxMs = maxMs; | ||
| this.growth = growth; | ||
| } | ||
|
|
||
| /** | ||
| * Calculate the next delay in milliseconds based on the retry count. | ||
| * | ||
| * @param retryCount The number of retries that have occurred. | ||
| * @return The calculated delay in milliseconds. | ||
| */ | ||
| public long calculateDelayBeforeNextRetryMs(final int retryCount) { | ||
| double jitter = testJitterSupplier != null | ||
| ? testJitterSupplier.getAsDouble() | ||
| : ThreadLocalRandom.current().nextDouble(); | ||
| double backoff = Math.min(baseMs * Math.pow(growth, retryCount), maxMs); | ||
| return Math.round(jitter * backoff); | ||
| } | ||
|
Comment on lines
+43
to
+55
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The class name uses the term "backoff", but the method uses the term "delay" (both in its name and in the documentation comment). Let's not use two different terms to refer to the same thing. I am guessing the above is an indirect result of you deciding to call
Comment on lines
+43
to
+55
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method accepts
Let's change this method so that it also operates 0-based attempt number by accepting |
||
|
|
||
| /** | ||
| * Calculate the next delay in milliseconds based on the retry count and a provided jitter. | ||
| * | ||
| * @param retryCount The number of retries that have occurred. | ||
| * @param jitter A double in the range [0, 1) to apply as jitter. | ||
| * @return The calculated delay in milliseconds. | ||
| */ | ||
| public long calculateDelayBeforeNextRetryMs(final int retryCount, final double jitter) { | ||
| double backoff = Math.min(baseMs * Math.pow(growth, retryCount), maxMs); | ||
| return Math.round(jitter * backoff); | ||
| } | ||
|
Comment on lines
+57
to
+67
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| /** | ||
| * Set a custom jitter supplier for testing purposes. | ||
| * | ||
| * @param supplier A DoubleSupplier that returns values in [0, 1) range. | ||
| */ | ||
| @VisibleForTesting(otherwise = PRIVATE) | ||
| public static void setTestJitterSupplier(final DoubleSupplier supplier) { | ||
| testJitterSupplier = supplier; | ||
| } | ||
|
|
||
| /** | ||
| * Clear the test jitter supplier, reverting to default ThreadLocalRandom behavior. | ||
| */ | ||
| @VisibleForTesting(otherwise = PRIVATE) | ||
| public static void clearTestJitterSupplier() { | ||
| testJitterSupplier = null; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| /* | ||
| * Copyright 2008-present MongoDB, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.mongodb.internal; | ||
|
|
||
| import com.mongodb.internal.time.ExponentialBackoff; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| public class ExponentialBackoffTest { | ||
|
|
||
| @Test | ||
| void testTransactionRetryBackoff() { | ||
| // Test that the backoff sequence follows the expected pattern with growth factor 1.5 | ||
| // Expected sequence (without jitter): 5, 7.5, 11.25, ... | ||
| // With jitter, actual values will be between 0 and these maxima | ||
| double[] expectedMaxValues = {5.0, 7.5, 11.25, 16.875, 25.3125, 37.96875, 56.953125, 85.4296875, 128.14453125, 192.21679688, 288.32519531, 432.48779297, 500.0}; | ||
|
|
||
| ExponentialBackoff backoff = ExponentialBackoff.TRANSACTION; | ||
| for (int retry = 0; retry < expectedMaxValues.length; retry++) { | ||
| long delay = backoff.calculateDelayBeforeNextRetryMs(retry); | ||
| assertTrue(delay >= 0 && delay <= Math.round(expectedMaxValues[retry]), String.format("Retry %d: delay should be 0-%d ms, got: %d", retry, Math.round(expectedMaxValues[retry]), delay)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void testTransactionRetryBackoffRespectsMaximum() { | ||
| ExponentialBackoff backoff = ExponentialBackoff.TRANSACTION; | ||
|
|
||
| // Even at high retry counts, delay should never exceed 500ms | ||
| for (int retry = 0; retry < 25; retry++) { | ||
| long delay = backoff.calculateDelayBeforeNextRetryMs(retry); | ||
| assertTrue(delay >= 0 && delay <= 500, String.format("Retry %d: delay should be capped at 500 ms, got: %d ms", retry, delay)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void testCustomJitter() { | ||
| ExponentialBackoff backoff = ExponentialBackoff.TRANSACTION; | ||
|
|
||
| // Expected delays with jitter=1.0 and growth factor 1.5 | ||
| double[] expectedDelays = {5.0, 7.5, 11.25, 16.875, 25.3125, 37.96875, 56.953125, 85.4296875, 128.14453125, 192.21679688, 288.32519531, 432.48779297, 500.0}; | ||
| double jitter = 1.0; | ||
|
|
||
| for (int retry = 0; retry < expectedDelays.length; retry++) { | ||
| long delay = backoff.calculateDelayBeforeNextRetryMs(retry, jitter); | ||
| long expected = Math.round(expectedDelays[retry]); | ||
| assertEquals(expected, delay, String.format("Retry %d: with jitter=1.0, delay should be %d ms", retry, expected)); | ||
| } | ||
|
|
||
| // With jitter = 0, all delays should be 0 | ||
| jitter = 0; | ||
| for (int retry = 0; retry < 10; retry++) { | ||
| long delay = backoff.calculateDelayBeforeNextRetryMs(retry, jitter); | ||
| assertEquals(0, delay, "With jitter=0, delay should always be 0 ms"); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,8 @@ | |
| import com.mongodb.client.ClientSession; | ||
| import com.mongodb.client.TransactionBody; | ||
| import com.mongodb.internal.TimeoutContext; | ||
| import com.mongodb.internal.observability.micrometer.TracingManager; | ||
| import com.mongodb.internal.observability.micrometer.TransactionSpan; | ||
| import com.mongodb.internal.operation.AbortTransactionOperation; | ||
| import com.mongodb.internal.operation.CommitTransactionOperation; | ||
| import com.mongodb.internal.operation.OperationHelper; | ||
|
|
@@ -36,8 +38,7 @@ | |
| import com.mongodb.internal.operation.WriteOperation; | ||
| import com.mongodb.internal.session.BaseClientSessionImpl; | ||
| import com.mongodb.internal.session.ServerSessionPool; | ||
| import com.mongodb.internal.observability.micrometer.TracingManager; | ||
| import com.mongodb.internal.observability.micrometer.TransactionSpan; | ||
| import com.mongodb.internal.time.ExponentialBackoff; | ||
| import com.mongodb.lang.Nullable; | ||
|
|
||
| import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; | ||
|
|
@@ -46,6 +47,7 @@ | |
| import static com.mongodb.assertions.Assertions.assertTrue; | ||
| import static com.mongodb.assertions.Assertions.isTrue; | ||
| import static com.mongodb.assertions.Assertions.notNull; | ||
| import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; | ||
|
|
||
| final class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession { | ||
|
|
||
|
|
@@ -251,13 +253,21 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra | |
| notNull("transactionBody", transactionBody); | ||
| long startTime = ClientSessionClock.INSTANCE.now(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [just a comment on a code that wasn't changed in this PR] I have just noticed this |
||
| TimeoutContext withTransactionTimeoutContext = createTimeoutContext(options); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa. Taking into account
we should replace it with a correct internal API that uses monotonic clock. We already have such API in the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa. We should create See also the comment stIncMale@08171aa#r175640155 for additional related explanations. 1 |
||
| ExponentialBackoff transactionBackoff = ExponentialBackoff.TRANSACTION; | ||
| int transactionAttempt = 0; | ||
| MongoException lastError = null; | ||
|
|
||
| try { | ||
| outer: | ||
| while (true) { | ||
| if (transactionAttempt > 0) { | ||
| backoff(transactionBackoff, transactionAttempt, startTime, lastError); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa.
|
||
| } | ||
| T retVal; | ||
| try { | ||
| startTransaction(options, withTransactionTimeoutContext.copyTimeoutContext()); | ||
| transactionAttempt++; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa.
|
||
|
|
||
| if (transactionSpan != null) { | ||
| transactionSpan.setIsConvenientTransaction(); | ||
| } | ||
|
|
@@ -266,14 +276,17 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra | |
| if (transactionState == TransactionState.IN) { | ||
| abortTransaction(); | ||
| } | ||
| if (e instanceof MongoException && !(e instanceof MongoOperationTimeoutException)) { | ||
| MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e); | ||
| if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL) | ||
| && ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) { | ||
| if (transactionSpan != null) { | ||
| transactionSpan.spanFinalizing(false); | ||
| if (e instanceof MongoException) { | ||
| lastError = (MongoException) e; | ||
| if (!(e instanceof MongoOperationTimeoutException)) { | ||
| MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e); | ||
| if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL) | ||
| && ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The The changes I propose for |
||
| if (transactionSpan != null) { | ||
| transactionSpan.spanFinalizing(false); | ||
| } | ||
| continue; | ||
| } | ||
| continue; | ||
| } | ||
| } | ||
| throw e; | ||
|
|
@@ -296,6 +309,7 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra | |
| if (transactionSpan != null) { | ||
| transactionSpan.spanFinalizing(true); | ||
| } | ||
| lastError = e; | ||
| continue outer; | ||
| } | ||
| } | ||
|
|
@@ -359,4 +373,22 @@ private TimeoutContext createTimeoutContext(final TransactionOptions transaction | |
| TransactionOptions.merge(transactionOptions, getOptions().getDefaultTransactionOptions()), | ||
| operationExecutor.getTimeoutSettings())); | ||
| } | ||
|
|
||
| private static void backoff(final ExponentialBackoff exponentialBackoff, final int transactionAttempt, final long startTime, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa. What is the reason behind passing |
||
| final MongoException lastError) { | ||
| long backoffMs = exponentialBackoff.calculateDelayBeforeNextRetryMs(transactionAttempt - 1); | ||
| if (ClientSessionClock.INSTANCE.now() + backoffMs - startTime >= MAX_RETRY_TIME_LIMIT_MS) { | ||
| if (lastError != null) { | ||
| throw lastError; | ||
| } | ||
| throw new MongoClientException("Transaction retry timeout exceeded"); | ||
| } | ||
| try { | ||
| if (backoffMs > 0) { | ||
| Thread.sleep(backoffMs); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| throw interruptAndCreateMongoInterruptedException("Transaction retry interrupted", e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,15 +22,20 @@ | |
| import com.mongodb.TransactionOptions; | ||
| import com.mongodb.client.internal.ClientSessionClock; | ||
| import com.mongodb.client.model.Sorts; | ||
| import com.mongodb.internal.time.ExponentialBackoff; | ||
| import org.bson.BsonDocument; | ||
| import org.bson.Document; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.DisplayName; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import static com.mongodb.ClusterFixture.TIMEOUT; | ||
| import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; | ||
| import static com.mongodb.ClusterFixture.isSharded; | ||
| import static com.mongodb.client.Fixture.getPrimary; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
@@ -203,6 +208,76 @@ public void testTimeoutMSAndLegacySettings() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * See | ||
| * <a href="https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/tests/README.md#retry-backoff-is-enforceds">Convenient API Prose Tests</a>. | ||
| */ | ||
| @DisplayName("Retry Backoff is Enforced") | ||
| @Test | ||
| public void testRetryBackoffIsEnforced() throws InterruptedException { | ||
| // Run with jitter = 0 (no backoff) | ||
| ExponentialBackoff.setTestJitterSupplier(() -> 0.0); | ||
|
|
||
| BsonDocument failPointDocument = BsonDocument.parse("{'configureFailPoint': 'failCommand', 'mode': {'times': 13}, " | ||
| + "'data': {'failCommands': ['commitTransaction'], 'errorCode': 251}}"); | ||
|
|
||
| long noBackoffTime; | ||
| try (ClientSession session = client.startSession(); | ||
| FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { | ||
| long startNanos = System.nanoTime(); | ||
| session.withTransaction(() -> collection.insertOne(session, Document.parse("{}"))); | ||
| noBackoffTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); | ||
|
Comment on lines
+227
to
+229
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa. We have StartTime startTime = StartTime.now();
...
noBackoffTime = startTime.elapsed().toMillis(); |
||
| } finally { | ||
| // Clear the test jitter supplier to avoid affecting other tests | ||
| ExponentialBackoff.clearTestJitterSupplier(); | ||
| } | ||
|
|
||
| // Run with jitter = 1 (full backoff) | ||
| ExponentialBackoff.setTestJitterSupplier(() -> 1.0); | ||
|
|
||
| failPointDocument = BsonDocument.parse("{'configureFailPoint': 'failCommand', 'mode': {'times': 13}, " | ||
| + "'data': {'failCommands': ['commitTransaction'], 'errorCode': 251}}"); | ||
|
|
||
| long withBackoffTime; | ||
| try (ClientSession session = client.startSession(); | ||
| FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { | ||
| long startNanos = System.nanoTime(); | ||
| session.withTransaction(() -> collection.insertOne(session, Document.parse("{}"))); | ||
| withBackoffTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); | ||
| } finally { | ||
| ExponentialBackoff.clearTestJitterSupplier(); | ||
| } | ||
|
|
||
| long expectedWithBackoffTime = noBackoffTime + 1800; | ||
| long actualDifference = Math.abs(withBackoffTime - expectedWithBackoffTime); | ||
|
|
||
| assertTrue(actualDifference < 1000, String.format("Expected withBackoffTime to be ~% dms (noBackoffTime %d ms + 1800 ms), but" | ||
| + " got %d ms. Difference: %d ms (tolerance: 1000 ms per spec)", expectedWithBackoffTime, noBackoffTime, withBackoffTime, | ||
| actualDifference)); | ||
| } | ||
|
|
||
| /** | ||
| * This test is not from the specification. | ||
| */ | ||
| @Test | ||
| public void testExponentialBackoffOnTransientError() throws InterruptedException { | ||
| BsonDocument failPointDocument = BsonDocument.parse("{'configureFailPoint': 'failCommand', 'mode': {'times': 3}, " | ||
| + "'data': {'failCommands': ['insert'], 'errorCode': 112, " | ||
| + "'errorLabels': ['TransientTransactionError']}}"); | ||
|
|
||
| try (ClientSession session = client.startSession(); | ||
| FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { | ||
| AtomicInteger attemptsCount = new AtomicInteger(0); | ||
|
|
||
| session.withTransaction(() -> { | ||
| attemptsCount.incrementAndGet(); // Count the attempt before the operation that might fail | ||
| return collection.insertOne(session, Document.parse("{}")); | ||
| }); | ||
|
|
||
| assertEquals(4, attemptsCount.get(), "Expected 1 initial attempt + 3 retries"); | ||
| } | ||
| } | ||
|
|
||
| private boolean canRunTests() { | ||
| return isSharded() || isDiscoverableReplicaSet(); | ||
| } | ||
|
Comment on lines
281
to
283
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an explanatory comment for the changes proposed in stIncMale@08171aa. This method can be |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use this declaration style in the Java driver codebase. Let's declare each instance field separately.