Skip to content

Commit 183c690

Browse files
Implement partial object sync (RTO5f)
Implements the changes from spec commits 1f22417, 9f4d7de, and 963ec30, which allow the server to split a large object across multiple OBJECT_SYNC protocol messages. The new integration test is ported from ably-js commit d0bc431. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b22659e commit 183c690

File tree

8 files changed

+410
-102
lines changed

8 files changed

+410
-102
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -790,14 +790,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
790790
}
791791
}
792792

793-
let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
794-
if let object = objectMessage.object {
795-
SyncObjectsPool.Entry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
796-
} else {
797-
nil
798-
}
799-
}
800-
801793
// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
802794
let completedSyncObjectsPool: SyncObjectsPool?
803795
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
@@ -810,14 +802,20 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
810802
nil
811803
}
812804
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: .init())
813-
// RTO5b
814-
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
805+
// RTO5f
806+
for objectMessage in objectMessages {
807+
updatedSyncSequence.syncObjectsPool.accumulate(objectMessage: objectMessage, logger: logger)
808+
}
815809
syncSequenceForSyncingState = updatedSyncSequence
816810

817811
completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
818812
} else {
819813
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
820-
completedSyncObjectsPool = SyncObjectsPool(entries: syncObjectsPoolEntries)
814+
var pool = SyncObjectsPool()
815+
for objectMessage in objectMessages {
816+
pool.accumulate(objectMessage: objectMessage, logger: logger)
817+
}
818+
completedSyncObjectsPool = pool
821819
syncSequenceForSyncingState = nil
822820
}
823821

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ internal struct ObjectsPool {
314314
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
315315

316316
// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
317-
let newEntry: Entry?
317+
let newEntry: Entry
318318

319319
if syncObjectsPoolEntry.state.counter != nil {
320320
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
@@ -350,15 +350,12 @@ internal struct ObjectsPool {
350350
)
351351
newEntry = .map(map)
352352
} else {
353-
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action
354-
logger.log("Unsupported object state message received for objectId: \(syncObjectsPoolEntry.state.objectId)", level: .warn)
355-
newEntry = nil
353+
// See SyncObjectsPool.Entry.state documentation.
354+
preconditionFailure("SyncObjectsPool entry for objectId \(syncObjectsPoolEntry.state.objectId) has neither counter nor map")
356355
}
357356

358-
if let newEntry {
359-
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
360-
entries[syncObjectsPoolEntry.state.objectId] = newEntry
361-
}
357+
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
358+
entries[syncObjectsPoolEntry.state.objectId] = newEntry
362359
}
363360
}
364361

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,101 @@
11
import Foundation
22

3-
/// The RTO5b collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
4-
internal struct SyncObjectsPool: Collection {
5-
/// The contents of the spec's `SyncObjectsPool` that is built during an `OBJECT_SYNC` sync sequence.
3+
/// The RTO5f collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
4+
///
5+
/// Internally stores `InboundObjectMessage` values keyed by `objectId`. The `accumulate` method implements the RTO5f
6+
/// merge logic for partial object sync.
7+
internal struct SyncObjectsPool: Sequence {
8+
/// A computed view of a stored `InboundObjectMessage`, yielded during iteration.
9+
///
10+
/// Preserves backward compatibility with the consumption side in `ObjectsPool.nosync_applySyncObjectsPool`.
611
internal struct Entry {
12+
/// Guaranteed to have either `.map` or `.counter` populated.
713
internal var state: ObjectState
814
/// The `serialTimestamp` of the `ObjectMessage` that generated this entry.
915
internal var objectMessageSerialTimestamp: Date?
10-
11-
// We replace the default memberwise initializer because we don't want a default argument for objectMessageSerialTimestamp (want to make sure we don't forget to set it whenever we create an entry).
12-
// swiftlint:disable:next unneeded_synthesized_initializer
13-
internal init(state: ObjectState, objectMessageSerialTimestamp: Date?) {
14-
self.state = state
15-
self.objectMessageSerialTimestamp = objectMessageSerialTimestamp
16-
}
1716
}
1817

19-
private var entries: [Entry]
18+
private var objectMessages: [String: InboundObjectMessage]
2019

2120
/// Creates an empty pool.
2221
internal init() {
23-
entries = []
22+
objectMessages = [:]
2423
}
2524

26-
/// Creates a pool from the given entries.
27-
internal init(entries: [Entry]) {
28-
self.entries = entries
29-
}
25+
/// Accumulates an `ObjectMessage` into the pool per RTO5f.
26+
internal mutating func accumulate(
27+
objectMessage: InboundObjectMessage,
28+
logger: Logger,
29+
) {
30+
guard let object = objectMessage.object else {
31+
return
32+
}
33+
34+
let objectId = object.objectId
3035

31-
/// Accumulates entries from a sync message per RTO5b.
32-
internal mutating func append(contentsOf newEntries: [Entry]) {
33-
entries.append(contentsOf: newEntries)
36+
// RTO5f3: Reject unsupported object types before pool lookup. This provides the guarantee documented on Entry.state.
37+
guard object.map != nil || object.counter != nil else {
38+
logger.log("Skipping unsupported object type during sync for objectId \(objectId)", level: .warn)
39+
return
40+
}
41+
42+
if let existing = objectMessages[objectId] {
43+
// RTO5f2: An entry already exists for this objectId (partial object state).
44+
if object.map != nil {
45+
// RTO5f2a: Incoming message has a map.
46+
if object.tombstone {
47+
// RTO5f2a1: Incoming tombstone is true — replace the entire entry.
48+
objectMessages[objectId] = objectMessage
49+
} else {
50+
// RTO5f2a2: Merge map entries into the existing message.
51+
var merged = existing
52+
if let incomingEntries = object.map?.entries {
53+
var mergedObject = merged.object!
54+
var mergedMap = mergedObject.map!
55+
var mergedEntries = mergedMap.entries ?? [:]
56+
mergedEntries.merge(incomingEntries) { _, new in new }
57+
mergedMap.entries = mergedEntries
58+
mergedObject.map = mergedMap
59+
merged.object = mergedObject
60+
}
61+
objectMessages[objectId] = merged
62+
}
63+
} else {
64+
// RTO5f2b: Incoming message has a counter — log error, skip.
65+
logger.log("Received partial counter sync for objectId \(objectId); skipping", level: .error)
66+
}
67+
} else {
68+
// RTO5f1: No entry exists for this objectId — store the message.
69+
objectMessages[objectId] = objectMessage
70+
}
3471
}
3572

36-
// MARK: - Collection conformance
73+
internal var count: Int { objectMessages.count }
74+
internal var isEmpty: Bool { objectMessages.isEmpty }
75+
76+
// MARK: - Sequence conformance
3777

38-
internal var startIndex: Int { entries.startIndex }
39-
internal var endIndex: Int { entries.endIndex }
40-
internal func index(after i: Int) -> Int { entries.index(after: i) }
41-
internal subscript(position: Int) -> Entry { entries[position] }
78+
internal struct Iterator: IteratorProtocol {
79+
private var underlying: Dictionary<String, InboundObjectMessage>.Values.Iterator
80+
81+
fileprivate init(_ underlying: Dictionary<String, InboundObjectMessage>.Values.Iterator) {
82+
self.underlying = underlying
83+
}
84+
85+
internal mutating func next() -> Entry? {
86+
guard let message = underlying.next() else {
87+
return nil
88+
}
89+
90+
// We only store messages whose `object` is non-nil (see `accumulate`).
91+
return Entry(
92+
state: message.object!,
93+
objectMessageSerialTimestamp: message.serialTimestamp,
94+
)
95+
}
96+
}
97+
98+
internal func makeIterator() -> Iterator {
99+
Iterator(objectMessages.values.makeIterator())
100+
}
42101
}

Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ struct TestFactories {
268268
object: ObjectState? = nil,
269269
serial: String? = nil,
270270
siteCode: String? = nil,
271+
serialTimestamp: Date? = nil,
271272
) -> InboundObjectMessage {
272273
InboundObjectMessage(
273274
id: id,
@@ -279,6 +280,7 @@ struct TestFactories {
279280
object: object,
280281
serial: serial,
281282
siteCode: siteCode,
283+
serialTimestamp: serialTimestamp,
282284
)
283285
}
284286

Tests/AblyLiveObjectsTests/InternalDefaultRealtimeObjectsTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct InternalDefaultRealtimeObjectsTests {
6060
// @spec RTO5a1
6161
// @spec RTO5a3
6262
// @spec RTO5a4
63-
// @spec RTO5b
63+
// @spec RTO5f
6464
// @spec RTO5c3
6565
// @spec RTO5c4
6666
// @spec RTO5c5

Tests/AblyLiveObjectsTests/JS Integration Tests/ObjectsIntegrationTests.swift

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,107 @@ private struct ObjectsIntegrationTests {
815815
#expect(try #require(map.get(key: "baz")?.stringValue) == "qux", "Check map has materialised entries")
816816
},
817817
),
818+
.init(
819+
disabled: false,
820+
allTransportsAndProtocols: false,
821+
description: "partial OBJECT_SYNC merges map entries across multiple messages for the same objectId",
822+
action: { ctx throws in
823+
let root = ctx.root
824+
let objectsHelper = ctx.objectsHelper
825+
let channel = ctx.channel
826+
827+
let mapId = objectsHelper.fakeMapObjectId()
828+
829+
// assign map object to root
830+
await objectsHelper.processObjectStateMessageOnChannel(
831+
channel: channel,
832+
syncSerial: "serial:cursor1",
833+
state: [
834+
objectsHelper.mapObject(
835+
objectId: "root",
836+
siteTimeserials: ["aaa": lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)],
837+
initialEntries: [
838+
"map": .object([
839+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
840+
"data": .object(["objectId": .string(mapId)]),
841+
]),
842+
],
843+
),
844+
],
845+
)
846+
847+
// send partial sync messages for the same map object, each with different materialised entries.
848+
// initialEntries are identical across all partial messages for the same object — a server guarantee.
849+
let partialMessages: [(syncSerial: String, materialisedEntries: [String: WireValue])] = [
850+
(
851+
syncSerial: "serial:cursor2",
852+
materialisedEntries: [
853+
"key1": .object([
854+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
855+
"data": .object(["number": .number(1)]),
856+
]),
857+
"key2": .object([
858+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
859+
"data": .object(["string": .string("two")]),
860+
]),
861+
]
862+
),
863+
(
864+
syncSerial: "serial:cursor3",
865+
materialisedEntries: [
866+
"key3": .object([
867+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
868+
"data": .object(["number": .number(3)]),
869+
]),
870+
"key4": .object([
871+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
872+
"data": .object(["boolean": .bool(true)]),
873+
]),
874+
]
875+
),
876+
(
877+
syncSerial: "serial:", // end sync sequence
878+
materialisedEntries: [
879+
"key5": .object([
880+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
881+
"data": .object(["string": .string("five")]),
882+
]),
883+
]
884+
),
885+
]
886+
887+
for partial in partialMessages {
888+
await objectsHelper.processObjectStateMessageOnChannel(
889+
channel: channel,
890+
syncSerial: partial.syncSerial,
891+
state: [
892+
objectsHelper.mapObject(
893+
objectId: mapId,
894+
siteTimeserials: ["aaa": lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)],
895+
initialEntries: [
896+
"initialKey": .object([
897+
"timeserial": .string(lexicoTimeserial(seriesId: "aaa", timestamp: 0, counter: 0)),
898+
"data": .object(["string": .string("initial")]),
899+
]),
900+
],
901+
materialisedEntries: partial.materialisedEntries,
902+
),
903+
],
904+
)
905+
}
906+
907+
let map = try #require(root.get(key: "map")?.liveMapValue)
908+
909+
#expect(try #require(map.get(key: "initialKey")?.stringValue) == "initial", "Check keys from the create operation are present")
910+
911+
// check that materialised entries from all partial messages were merged
912+
#expect(try #require(map.get(key: "key1")?.numberValue) == 1, "Check key1 from first partial sync")
913+
#expect(try #require(map.get(key: "key2")?.stringValue) == "two", "Check key2 from first partial sync")
914+
#expect(try #require(map.get(key: "key3")?.numberValue) == 3, "Check key3 from second partial sync")
915+
#expect(try #require(map.get(key: "key4")?.boolValue as Bool?) == true, "Check key4 from second partial sync")
916+
#expect(try #require(map.get(key: "key5")?.stringValue) == "five", "Check key5 from third partial sync")
917+
},
918+
),
818919
.init(
819920
disabled: false,
820921
allTransportsAndProtocols: false,

0 commit comments

Comments
 (0)