Skip to content

Commit 9243c3f

Browse files
Implement updated buffer clearing rules (RTO4d, RTO4b5, RTO5a2b)
Per spec commit 997584f, buffered object operations must now be cleared on every ATTACHED message (new RTO4d), rather than when a new OBJECT_SYNC sequence starts (old RTO5a2b) or in the no-HAS_OBJECTS path specifically (old RTO4b5). The key insight is that on ATTACHED, either: - HAS_OBJECTS is set, meaning the subsequent sync sequence includes all operations up to the attach point, making previously buffered operations redundant - HAS_OBJECTS is not set, meaning the client performs an implicit sync and clears local state anyway Production changes: - nosync_onChannelAttached: replace `if` with `switch` to clear buffer on every ATTACHED (RTO4d), even when already SYNCING - nosync_handleObjectSyncProtocolMessage: stop clearing buffer on new sync sequence (RTO5a2b replaced by RTO4d) Test changes: - doesNotModifyStateWhenHasObjectsIsTrue: renamed, now asserts buffer is cleared (RTO4d) whilst pool and sync sequence remain unchanged - newSequenceIdDiscardsInFlightSync: removed RTO5a2b annotation and buffered ops assertion, now only covers RTO5a2a Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1a67249 commit 9243c3f

File tree

2 files changed

+36
-17
lines changed

2 files changed

+36
-17
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
8484
}
8585
}
8686

87+
/// Returns the number of buffered object operations if in the SYNCING state, or nil otherwise.
88+
internal var testsOnly_bufferedObjectOperationsCount: Int? {
89+
mutableStateMutex.withSync { mutableState in
90+
if case let .syncing(syncingData) = mutableState.state {
91+
syncingData.bufferedObjectOperations.count
92+
} else {
93+
nil
94+
}
95+
}
96+
}
97+
8798
// These drive the testsOnly_waitingForSyncEvents property that informs the test suite when `getRoot()` is waiting for the object sync sequence to complete per RTO1c.
8899
private let waitingForSyncEvents: AsyncStream<Void>
89100
private let waitingForSyncEventsContinuation: AsyncStream<Void>.Continuation
@@ -728,8 +739,12 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
728739
onChannelAttachedHasObjects = hasObjects
729740

730741
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
731-
if state.toObjectsSyncState != .syncing {
732-
// RTO4c
742+
switch state {
743+
case let .syncing(syncingData):
744+
// RTO4d: Clear buffered object operations on every ATTACHED (already SYNCING per RTO4c)
745+
syncingData.bufferedObjectOperations = []
746+
case .initialized, .synced:
747+
// RTO4c (RTO4d is inherently satisfied by the empty buffer)
733748
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
734749
}
735750

@@ -743,7 +758,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
743758

744759
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
745760

746-
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8
761+
// RTO4b3, RTO4b4, RTO5c3, RTO5c4, RTO5c5, RTO5c9, RTO5c8 (RTO4b5 replaced by RTO4d)
747762
appliedOnAckSerials.removeAll()
748763
transition(to: .synced, userCallbackQueue: userCallbackQueue)
749764

@@ -784,9 +799,9 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
784799
// Figure out whether to continue any existing sync sequence or start a new one
785800
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
786801
if isNewSyncSequence {
787-
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
802+
// RTO5a2a: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
803+
// (RTO5a2b replaced by RTO4d — buffer is now only cleared on ATTACHED, not on new sync sequence)
788804
syncingData.syncSequence = nil
789-
syncingData.bufferedObjectOperations = []
790805
}
791806
}
792807

Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ struct InternalDefaultRealtimeObjectsTests {
126126

127127
// @spec RTO5a2
128128
// @spec RTO5a2a
129-
// @spec RTO5a2b
130129
@Test
131130
func newSequenceIdDiscardsInFlightSync() async throws {
132131
let internalQueue = TestFactories.createInternalQueue()
@@ -145,13 +144,6 @@ struct InternalDefaultRealtimeObjectsTests {
145144

146145
#expect(realtimeObjects.testsOnly_hasSyncSequence)
147146

148-
// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
149-
internalQueue.ably_syncNoDeadlock {
150-
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
151-
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
152-
])
153-
}
154-
155147
// Start new sequence with different ID (RTO5a2)
156148
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
157149
internalQueue.ably_syncNoDeadlock {
@@ -175,7 +167,6 @@ struct InternalDefaultRealtimeObjectsTests {
175167
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
176168
let pool = realtimeObjects.testsOnly_objectsPool
177169
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
178-
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
179170
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
180171
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
181172
}
@@ -366,9 +357,10 @@ struct InternalDefaultRealtimeObjectsTests {
366357
struct OnChannelAttachedTests {
367358
// MARK: - RTO4a Tests
368359

369-
// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify any internal state
360+
// @spec RTO4a - Checks that when the `HAS_OBJECTS` flag is 1 (i.e. the server will shortly perform an `OBJECT_SYNC` sequence) we don't modify the objects pool or sync sequence
361+
// @spec RTO4d - Checks that buffered object operations are cleared on ATTACHED
370362
@Test
371-
func doesNotModifyStateWhenHasObjectsIsTrue() {
363+
func doesNotModifyPoolOrSyncSequenceWhenHasObjectsIsTrue() {
372364
let internalQueue = TestFactories.createInternalQueue()
373365
let realtimeObjects = InternalDefaultRealtimeObjectsTests.createDefaultRealtimeObjects(internalQueue: internalQueue)
374366

@@ -389,12 +381,21 @@ struct InternalDefaultRealtimeObjectsTests {
389381

390382
#expect(realtimeObjects.testsOnly_hasSyncSequence)
391383

384+
// Inject a buffered OBJECT operation
385+
internalQueue.ably_syncNoDeadlock {
386+
realtimeObjects.nosync_handleObjectProtocolMessage(objectMessages: [
387+
TestFactories.mapCreateOperationMessage(objectId: "map:buffered@789"),
388+
])
389+
}
390+
391+
#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 1)
392+
392393
// When: onChannelAttached is called with hasObjects = true
393394
internalQueue.ably_syncNoDeadlock {
394395
realtimeObjects.nosync_onChannelAttached(hasObjects: true)
395396
}
396397

397-
// Then: Nothing should be modified
398+
// Then:
398399
#expect(realtimeObjects.testsOnly_onChannelAttachedHasObjects == true)
399400

400401
// Verify ObjectsPool is unchanged
@@ -405,6 +406,9 @@ struct InternalDefaultRealtimeObjectsTests {
405406

406407
// Verify sync sequence is still active
407408
#expect(realtimeObjects.testsOnly_hasSyncSequence)
409+
410+
// RTO4d: Verify buffered object operations were cleared
411+
#expect(realtimeObjects.testsOnly_bufferedObjectOperationsCount == 0)
408412
}
409413

410414
// MARK: - RTO4b Tests

0 commit comments

Comments
 (0)