Skip to content

Commit 8114e17

Browse files
Merge pull request #109 from ably/AIT-254-buffer-when-SYNCING
[AIT-254] Buffer operations whilst `SYNCING` per updated RTO8a
2 parents 3c54331 + 6a34938 commit 8114e17

File tree

1 file changed

+50
-51
lines changed

1 file changed

+50
-51
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
8585

8686
/// The `ObjectMessage`s gathered during this sync sequence.
8787
internal var syncObjectsPool: [SyncObjectsPoolEntry]
88-
89-
/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
90-
internal var bufferedObjectOperations: [InboundObjectMessage]
9188
}
9289

9390
internal init(
@@ -459,12 +456,16 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
459456
/// Note: We follow the same pattern as used in the WIP ably-swift: a state's associated data is a class instance and the convention is that to update the associated data for the current state you mutate the existing instance instead of creating a new one.
460457
enum AssociatedData {
461458
class Syncing {
459+
/// `OBJECT` ProtocolMessages that were received whilst SYNCING, to be applied once the sync sequence is complete, per RTO7a.
460+
var bufferedObjectOperations: [InboundObjectMessage]
461+
462462
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
463463
///
464464
/// It is optional because there are times that we transition to SYNCING even when the sync data is contained in a single ProtocolMessage.
465465
var syncSequence: SyncSequence?
466466

467-
init(syncSequence: SyncSequence?) {
467+
init(bufferedObjectOperations: [InboundObjectMessage], syncSequence: SyncSequence?) {
468+
self.bufferedObjectOperations = bufferedObjectOperations
468469
self.syncSequence = syncSequence
469470
}
470471
}
@@ -509,7 +510,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
509510
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
510511
if state.toObjectsSyncState != .syncing {
511512
// RTO4c
512-
transition(to: .syncing(.init(syncSequence: nil)), userCallbackQueue: userCallbackQueue)
513+
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: nil)), userCallbackQueue: userCallbackQueue)
513514
}
514515

515516
// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
@@ -540,71 +541,65 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
540541

541542
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
542543

543-
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
544-
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
545-
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
546-
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
547-
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
548-
let syncSequenceForSyncingState: SyncSequence?
549-
544+
let syncCursor: SyncCursor?
550545
if let protocolMessageChannelSerial {
551-
let syncCursor: SyncCursor
552546
do {
553547
// RTO5a
554548
syncCursor = try SyncCursor(channelSerial: protocolMessageChannelSerial)
555549
} catch {
556550
logger.log("Failed to parse sync cursor: \(error)", level: .error)
557551
return
558552
}
553+
} else {
554+
syncCursor = nil
555+
}
559556

557+
if case let .syncing(syncingData) = state {
560558
// Figure out whether to continue any existing sync sequence or start a new one
561-
var updatedSyncSequence: SyncSequence = if case let .syncing(syncingData) = state, let syncSequence = syncingData.syncSequence {
562-
if syncCursor.sequenceID == syncSequence.id {
563-
// RTO5a3: Continue existing sync sequence
564-
syncSequence
565-
} else {
566-
// RTO5a2a, RTO5a2b: new sequence started, discard previous
567-
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
568-
}
559+
let isNewSyncSequence = syncCursor == nil || syncingData.syncSequence?.id != syncCursor?.sequenceID
560+
if isNewSyncSequence {
561+
// RTO5a2a, RTO5a2b: new sequence started, discard previous. Else we continue the existing sequence per RTO5a3
562+
syncingData.syncSequence = nil
563+
syncingData.bufferedObjectOperations = []
564+
}
565+
}
566+
567+
let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
568+
if let object = objectMessage.object {
569+
SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
569570
} else {
570-
// There's no current sync sequence; start one
571-
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
571+
nil
572572
}
573+
}
573574

574-
// RTO5b
575-
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap { objectMessage in
576-
if let object = objectMessage.object {
577-
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
578-
} else {
579-
nil
580-
}
581-
})
575+
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
576+
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
577+
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
578+
let syncSequenceForSyncingState: SyncSequence?
582579

583-
(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
584-
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
580+
if let syncCursor {
581+
let syncSequenceToContinue: SyncSequence? = if case let .syncing(syncingData) = state {
582+
syncingData.syncSequence
585583
} else {
586-
(nil, nil)
584+
nil
587585
}
588-
586+
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [])
587+
// RTO5b
588+
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
589589
syncSequenceForSyncingState = updatedSyncSequence
590+
591+
completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
590592
} else {
591593
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
592-
completedSyncObjectsPool = objectMessages.compactMap { objectMessage in
593-
if let object = objectMessage.object {
594-
.init(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
595-
} else {
596-
nil
597-
}
598-
}
599-
completedSyncBufferedObjectOperations = nil
594+
completedSyncObjectsPool = syncObjectsPoolEntries
600595
syncSequenceForSyncingState = nil
601596
}
602597

603598
if case let .syncing(syncingData) = state {
604599
syncingData.syncSequence = syncSequenceForSyncingState
605600
} else {
606601
// RTO5e
607-
transition(to: .syncing(.init(syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue)
602+
transition(to: .syncing(.init(bufferedObjectOperations: [], syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue)
608603
}
609604

610605
if let completedSyncObjectsPool {
@@ -618,9 +613,14 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
618613
)
619614

620615
// RTO5c6
621-
if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty {
622-
logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
623-
for objectMessage in completedSyncBufferedObjectOperations {
616+
guard case let .syncing(syncingData) = state else {
617+
// We put ourselves into SYNCING above
618+
preconditionFailure()
619+
}
620+
let bufferedObjectOperations = syncingData.bufferedObjectOperations
621+
if !bufferedObjectOperations.isEmpty {
622+
logger.log("Applying \(bufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
623+
for objectMessage in bufferedObjectOperations {
624624
nosync_applyObjectProtocolMessageObjectMessage(
625625
objectMessage,
626626
logger: logger,
@@ -649,12 +649,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
649649

650650
logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
651651

652-
if case let .syncing(syncingData) = state, let existingSyncSequence = syncingData.syncSequence {
652+
if case let .syncing(syncingData) = state {
653653
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
654+
// Note that RTO8a says to buffer if "not SYNCED" (i.e. it includes the INITIALIZED state). But, "if SYNCING" is an equivalent check since we will only receive operations once attached, and we become SYNCING upon receipt of ATTACHED
654655
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
655-
var newSyncSequence = existingSyncSequence
656-
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
657-
syncingData.syncSequence = newSyncSequence
656+
syncingData.bufferedObjectOperations.append(contentsOf: objectMessages)
658657
} else {
659658
// RTO8b: Handle the OBJECT message immediately
660659
for objectMessage in objectMessages {

0 commit comments

Comments
 (0)