Skip to content

Commit 5efb77f

Browse files
Merge pull request #103 from ably/80-reconcile-redundant-sync-state
Remove redundancy in sync-related data
2 parents fcf3f1d + fba49ef commit 5efb77f

File tree

2 files changed

+119
-55
lines changed

2 files changed

+119
-55
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 100 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
6262
/// If this returns false, it means that there is currently no stored sync sequence ID, SyncObjectsPool, or BufferedObjectOperations.
6363
internal var testsOnly_hasSyncSequence: Bool {
6464
mutableStateMutex.withSync { mutableState in
65-
mutableState.syncSequence != nil
65+
if case let .syncing(syncingData) = mutableState.state, syncingData.syncSequence != nil {
66+
true
67+
} else {
68+
false
69+
}
6670
}
6771
}
6872

@@ -86,26 +90,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
8690
internal var bufferedObjectOperations: [InboundObjectMessage]
8791
}
8892

89-
/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
90-
private struct SyncStatus {
91-
private(set) var isSyncComplete = false
92-
private let syncCompletionEvents: AsyncStream<Void>
93-
private let syncCompletionContinuation: AsyncStream<Void>.Continuation
94-
95-
internal init() {
96-
(syncCompletionEvents, syncCompletionContinuation) = AsyncStream.makeStream()
97-
}
98-
99-
internal mutating func signalSyncComplete() {
100-
isSyncComplete = true
101-
syncCompletionContinuation.yield()
102-
}
103-
104-
internal func waitForSyncCompletion() async {
105-
await syncCompletionEvents.first { _ in true }
106-
}
107-
}
108-
10993
internal init(
11094
logger: Logger,
11195
internalQueue: DispatchQueue,
@@ -168,18 +152,23 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
168152
// MARK: - Internal methods that power RealtimeObjects conformance
169153

170154
internal func getRoot(coreSDK: CoreSDK) async throws(ARTErrorInfo) -> InternalDefaultLiveMap {
171-
let syncStatus = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
155+
let state = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
172156
// RTO1b: If the channel is in the DETACHED or FAILED state, the library should indicate an error with code 90001
173157
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed], operationDescription: "getRoot")
174158

175-
return mutableState.syncStatus
159+
return mutableState.state
176160
}
177161

178-
if !syncStatus.isSyncComplete {
162+
if state.toObjectsSyncState != .synced {
179163
// RTO1c
180164
waitingForSyncEventsContinuation.yield()
181165
logger.log("getRoot started waiting for sync sequence to complete", level: .debug)
182-
await syncStatus.waitForSyncCompletion()
166+
await withCheckedContinuation { continuation in
167+
onInternal(event: .synced) { subscription in
168+
subscription.off()
169+
continuation.resume()
170+
}
171+
}
183172
logger.log("getRoot completed waiting for sync sequence to complete", level: .debug)
184173
}
185174

@@ -283,6 +272,24 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
283272
}
284273
}
285274

275+
/// Adds a subscriber to the ``internalObjectsEventSubscriptionStorage`` (i.e. unaffected by `offAll()`).
276+
@discardableResult
277+
internal func onInternal(event: ObjectsEvent, callback: @escaping ObjectsEventCallback) -> any OnObjectsEventResponse {
278+
// TODO: Looking at this again later the whole process for adding a subscriber is really verbose and boilerplate-y, and I think the unfortunate result of me trying to be clever at some point; revisit in https://github.com/ably/ably-liveobjects-swift-plugin/issues/102
279+
mutableStateMutex.withSync { mutableState in
280+
// swiftlint:disable:next trailing_closure
281+
mutableState.onInternal(event: event, callback: callback, updateSelfLater: { [weak self] action in
282+
guard let self else {
283+
return
284+
}
285+
286+
mutableStateMutex.withSync { mutableState in
287+
action(&mutableState)
288+
}
289+
})
290+
}
291+
}
292+
286293
internal func offAll() {
287294
mutableStateMutex.withSync { mutableState in
288295
mutableState.offAll()
@@ -430,34 +437,45 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
430437

431438
private struct MutableState {
432439
internal var objectsPool: ObjectsPool
433-
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
434-
internal var syncSequence: SyncSequence?
435-
internal var syncStatus = SyncStatus()
436440
internal var onChannelAttachedHasObjects: Bool?
437441
internal var objectsEventSubscriptionStorage = SubscriptionStorage<ObjectsEvent, Void>()
438442

443+
/// Used when the object wishes to subscribe to its own events (i.e. unaffected by `offAll()`); used e.g. to wait for a sync before returning from `getRoot()`, per RTO1c.
444+
internal var internalObjectsEventSubscriptionStorage = SubscriptionStorage<ObjectsEvent, Void>()
445+
439446
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
440447
internal var garbageCollectionGracePeriod: GarbageCollectionOptions.GracePeriod
441448

442-
/// The state that drives the emission of the `syncing` and `synced` events.
449+
/// The state that drives the emission of the `syncing` and `synced` events and which stores the sync sequence data.
443450
///
444451
/// This manipulation of this value is based on https://github.com/ably/ably-js/blob/0c5baa9273ca87aec6ca594833d59c4c4d2dddbb/src/plugins/objects/objects.ts.
445-
/// TODO: Bring in line with spec once it exists (https://github.com/ably/ably-liveobjects-swift-plugin/issues/80) and reconcile it with the existing state that we have
452+
/// TODO: Bring in line with spec once it exists (https://github.com/ably/ably-liveobjects-swift-plugin/issues/80)
446453
internal var state = State.initialized
447454

448-
/// The state that drives the emission of the `syncing` and `synced` events.
449-
///
450-
/// This type is copied from https://github.com/ably/ably-js/blob/0c5baa9273ca87aec6ca594833d59c4c4d2dddbb/src/plugins/objects/objects.ts.
451-
/// TODO: Bring in line with spec once it exists (https://github.com/ably/ably-liveobjects-swift-plugin/issues/80)
455+
/// Has the same cases as `ObjectsSyncState` but with associated data to store the sync sequence data and represent the constraint that you only have a sync sequence if you're SYNCING.
452456
internal enum State {
453457
case initialized
454-
case syncing
458+
case syncing(AssociatedData.Syncing)
455459
case synced
456460

457-
var toEvent: ObjectsEvent? {
461+
/// Note: We follow the same pattern as used in the WIP ably-swift: a state's associated data is a class instance and the convention is that to update the associated data for the current state you mutate the existing instance instead of creating a new one.
462+
enum AssociatedData {
463+
class Syncing {
464+
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
465+
///
466+
/// It is optional because there are times that we transition to SYNCING even when the sync data is contained in a single ProtocolMessage (this behaviour is copied from JS and will be specified in https://github.com/ably/ably-liveobjects-swift-plugin/issues/80).
467+
var syncSequence: SyncSequence?
468+
469+
init(syncSequence: SyncSequence?) {
470+
self.syncSequence = syncSequence
471+
}
472+
}
473+
}
474+
475+
var toObjectsSyncState: ObjectsSyncState {
458476
switch self {
459477
case .initialized:
460-
nil
478+
.initialized
461479
case .syncing:
462480
.syncing
463481
case .synced:
@@ -470,11 +488,11 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
470488
to newState: State,
471489
userCallbackQueue: DispatchQueue,
472490
) {
473-
guard newState != state else {
474-
return
491+
guard newState.toObjectsSyncState != state.toObjectsSyncState else {
492+
preconditionFailure("Cannot transition to the current state")
475493
}
476494
state = newState
477-
guard let event = newState.toEvent else {
495+
guard let event = newState.toObjectsSyncState.toEvent else {
478496
return
479497
}
480498
emitObjectsEvent(event, on: userCallbackQueue)
@@ -489,9 +507,9 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
489507

490508
onChannelAttachedHasObjects = hasObjects
491509

492-
if hasObjects || state == .initialized {
510+
if (hasObjects && state.toObjectsSyncState != .syncing) || state.toObjectsSyncState == .initialized {
493511
// We will subsequently transition to .synced either by the completion of the RTO4a OBJECT_SYNC, or by the RTO4b no-HAS_OBJECTS case below
494-
transition(to: .syncing, userCallbackQueue: userCallbackQueue)
512+
transition(to: .syncing(.init(syncSequence: nil)), userCallbackQueue: userCallbackQueue)
495513
}
496514

497515
// We only care about the case where HAS_OBJECTS is not set (RTO4b); if it is set then we're going to shortly receive an OBJECT_SYNC instead (RTO4a)
@@ -505,9 +523,9 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
505523
// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.
506524

507525
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5
508-
syncSequence = nil
509-
syncStatus.signalSyncComplete()
510-
transition(to: .synced, userCallbackQueue: userCallbackQueue)
526+
if state.toObjectsSyncState != .synced {
527+
transition(to: .synced, userCallbackQueue: userCallbackQueue)
528+
}
511529
}
512530

513531
/// Implements the `OBJECT_SYNC` handling of RTO5.
@@ -524,12 +542,12 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
524542

525543
receivedObjectSyncProtocolMessagesContinuation.yield(objectMessages)
526544

527-
transition(to: .syncing, userCallbackQueue: userCallbackQueue)
528-
529545
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
530546
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
531547
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
532548
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?
549+
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
550+
let syncSequenceForSyncingState: SyncSequence?
533551

534552
if let protocolMessageChannelSerial {
535553
let syncCursor: SyncCursor
@@ -542,7 +560,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
542560
}
543561

544562
// Figure out whether to continue any existing sync sequence or start a new one
545-
var updatedSyncSequence: SyncSequence = if let syncSequence {
563+
var updatedSyncSequence: SyncSequence = if case let .syncing(syncingData) = state, let syncSequence = syncingData.syncSequence {
546564
if syncCursor.sequenceID == syncSequence.id {
547565
// RTO5a3: Continue existing sync sequence
548566
syncSequence
@@ -564,13 +582,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
564582
}
565583
})
566584

567-
syncSequence = updatedSyncSequence
568-
569585
(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
570586
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
571587
} else {
572588
(nil, nil)
573589
}
590+
591+
syncSequenceForSyncingState = updatedSyncSequence
574592
} else {
575593
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
576594
completedSyncObjectsPool = objectMessages.compactMap { objectMessage in
@@ -581,6 +599,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
581599
}
582600
}
583601
completedSyncBufferedObjectOperations = nil
602+
syncSequenceForSyncingState = nil
603+
}
604+
605+
if case let .syncing(syncingData) = state {
606+
syncingData.syncSequence = syncSequenceForSyncingState
607+
} else {
608+
transition(to: .syncing(.init(syncSequence: syncSequenceForSyncingState)), userCallbackQueue: userCallbackQueue)
584609
}
585610

586611
if let completedSyncObjectsPool {
@@ -608,9 +633,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
608633
}
609634

610635
// RTO5c3, RTO5c4, RTO5c5
611-
syncSequence = nil
612-
613-
syncStatus.signalSyncComplete()
614636
transition(to: .synced, userCallbackQueue: userCallbackQueue)
615637
}
616638
}
@@ -628,12 +650,12 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
628650

629651
logger.log("handleObjectProtocolMessage(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
630652

631-
if let existingSyncSequence = syncSequence {
653+
if case let .syncing(syncingData) = state, let existingSyncSequence = syncingData.syncSequence {
632654
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
633655
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
634656
var newSyncSequence = existingSyncSequence
635657
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
636-
syncSequence = newSyncSequence
658+
syncingData.syncSequence = newSyncSequence
637659
} else {
638660
// RTO8b: Handle the OBJECT message immediately
639661
for objectMessage in objectMessages {
@@ -723,6 +745,28 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
723745
return ObjectsEventResponse(subscription: subscription)
724746
}
725747

748+
/// Adds a subscriber to the ``internalObjectsEventSubscriptionStorage`` (i.e. unaffected by `offAll()`).
749+
@discardableResult
750+
internal mutating func onInternal(event: ObjectsEvent, callback: @escaping ObjectsEventCallback, updateSelfLater: @escaping UpdateMutableState) -> any OnObjectsEventResponse {
751+
// TODO: Looking at this again later the whole process for adding a subscriber is really verbose and boilerplate-y, and I think the unfortunate result of me trying to be clever at some point; revisit in https://github.com/ably/ably-liveobjects-swift-plugin/issues/102
752+
let updateSubscriptionStorage: SubscriptionStorage<ObjectsEvent, Void>.UpdateSubscriptionStorage = { action in
753+
updateSelfLater { mutableState in
754+
action(&mutableState.internalObjectsEventSubscriptionStorage)
755+
}
756+
}
757+
758+
let subscription = internalObjectsEventSubscriptionStorage.subscribe(
759+
listener: { _, subscriptionInCallback in
760+
let response = ObjectsEventResponse(subscription: subscriptionInCallback)
761+
callback(response)
762+
},
763+
eventName: event,
764+
updateSelfLater: updateSubscriptionStorage,
765+
)
766+
767+
return ObjectsEventResponse(subscription: subscription)
768+
}
769+
726770
private struct ObjectsEventResponse: OnObjectsEventResponse {
727771
let subscription: any SubscribeResponse
728772

@@ -737,6 +781,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
737781

738782
internal func emitObjectsEvent(_ event: ObjectsEvent, on queue: DispatchQueue) {
739783
objectsEventSubscriptionStorage.emit(eventName: event, on: queue)
784+
internalObjectsEventSubscriptionStorage.emit(eventName: event, on: queue)
740785
}
741786
}
742787
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/// The type that the spec uses to represent the client's state of syncing its local Objects data with the server.
2+
///
3+
/// (TODO: This isn't actually in the spec yet, will specify in https://github.com/ably/ably-liveobjects-swift-plugin/issues/80; it's currently copied from https://github.com/ably/ably-js/blob/0c5baa9273ca87aec6ca594833d59c4c4d2dddbb/src/plugins/objects/objects.ts)
4+
internal enum ObjectsSyncState {
5+
case initialized
6+
case syncing
7+
case synced
8+
9+
internal var toEvent: ObjectsEvent? {
10+
switch self {
11+
case .initialized:
12+
nil
13+
case .syncing:
14+
.syncing
15+
case .synced:
16+
.synced
17+
}
18+
}
19+
}

0 commit comments

Comments
 (0)