Skip to content

Commit 3d02fe7

Browse files
fixes from Claude
1 parent 183c690 commit 3d02fe7

File tree

5 files changed

+109
-82
lines changed

5 files changed

+109
-82
lines changed

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -803,18 +803,14 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
803803
}
804804
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: .init())
805805
// RTO5f
806-
for objectMessage in objectMessages {
807-
updatedSyncSequence.syncObjectsPool.accumulate(objectMessage: objectMessage, logger: logger)
808-
}
806+
updatedSyncSequence.syncObjectsPool.accumulate(objectMessages, logger: logger)
809807
syncSequenceForSyncingState = updatedSyncSequence
810808

811809
completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
812810
} else {
813811
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
814812
var pool = SyncObjectsPool()
815-
for objectMessage in objectMessages {
816-
pool.accumulate(objectMessage: objectMessage, logger: logger)
817-
}
813+
pool.accumulate(objectMessages, logger: logger)
818814
completedSyncObjectsPool = pool
819815
syncSequenceForSyncingState = nil
820816
}

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -293,69 +293,71 @@ internal struct ObjectsPool {
293293
var updatesToExistingObjects: [ObjectsPool.Entry.DeferredUpdate] = []
294294

295295
// RTO5c1: For each ObjectState member in the SyncObjectsPool list
296-
for syncObjectsPoolEntry in syncObjectsPool {
297-
receivedObjectIds.insert(syncObjectsPoolEntry.state.objectId)
296+
for objectMessage in syncObjectsPool {
297+
// Every message yielded by SyncObjectsPool is guaranteed to have a non-nil `.object` with `.map` or `.counter`.
298+
let state = objectMessage.object!
299+
receivedObjectIds.insert(state.objectId)
298300

299301
// RTO5c1a: If an object with ObjectState.objectId exists in the internal ObjectsPool
300-
if let existingEntry = entries[syncObjectsPoolEntry.state.objectId] {
301-
logger.log("Updating existing object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
302+
if let existingEntry = entries[state.objectId] {
303+
logger.log("Updating existing object with ID: \(state.objectId)", level: .debug)
302304

303305
// RTO5c1a1: Override the internal data for the object as per RTLC6, RTLM6
304306
let deferredUpdate = existingEntry.nosync_replaceData(
305-
using: syncObjectsPoolEntry.state,
306-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
307+
using: state,
308+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
307309
objectsPool: &self,
308310
userCallbackQueue: userCallbackQueue,
309311
)
310312
// RTO5c1a2: Store this update to emit at end
311313
updatesToExistingObjects.append(deferredUpdate)
312314
} else {
313315
// RTO5c1b: If an object with ObjectState.objectId does not exist in the internal ObjectsPool
314-
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
316+
logger.log("Creating new object with ID: \(state.objectId)", level: .debug)
315317

316318
// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
317319
let newEntry: Entry
318320

319-
if syncObjectsPoolEntry.state.counter != nil {
321+
if state.counter != nil {
320322
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
321323
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
322324
let counter = InternalDefaultLiveCounter.createZeroValued(
323-
objectID: syncObjectsPoolEntry.state.objectId,
325+
objectID: state.objectId,
324326
logger: logger,
325327
internalQueue: internalQueue,
326328
userCallbackQueue: userCallbackQueue,
327329
clock: clock,
328330
)
329331
_ = counter.nosync_replaceData(
330-
using: syncObjectsPoolEntry.state,
331-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
332+
using: state,
333+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
332334
)
333335
newEntry = .counter(counter)
334-
} else if let objectsMap = syncObjectsPoolEntry.state.map {
336+
} else if let objectsMap = state.map {
335337
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
336338
// set its private objectId equal to ObjectState.objectId, set its private semantics
337339
// equal to ObjectState.map.semantics and override its internal data per RTLM6
338340
let map = InternalDefaultLiveMap.createZeroValued(
339-
objectID: syncObjectsPoolEntry.state.objectId,
341+
objectID: state.objectId,
340342
semantics: objectsMap.semantics,
341343
logger: logger,
342344
internalQueue: internalQueue,
343345
userCallbackQueue: userCallbackQueue,
344346
clock: clock,
345347
)
346348
_ = map.nosync_replaceData(
347-
using: syncObjectsPoolEntry.state,
348-
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
349+
using: state,
350+
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
349351
objectsPool: &self,
350352
)
351353
newEntry = .map(map)
352354
} else {
353-
// See SyncObjectsPool.Entry.state documentation.
354-
preconditionFailure("SyncObjectsPool entry for objectId \(syncObjectsPoolEntry.state.objectId) has neither counter nor map")
355+
// SyncObjectsPool guarantees every yielded message has `.map` or `.counter`.
356+
preconditionFailure("SyncObjectsPool entry for objectId \(state.objectId) has neither counter nor map")
355357
}
356358

357359
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
358-
entries[syncObjectsPoolEntry.state.objectId] = newEntry
360+
entries[state.objectId] = newEntry
359361
}
360362
}
361363

Sources/AblyLiveObjects/Internal/SyncObjectsPool.swift

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,30 @@ import Foundation
22

33
/// The RTO5f collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
44
///
5-
/// Internally stores `InboundObjectMessage` values keyed by `objectId`. The `accumulate` method implements the RTO5f
6-
/// merge logic for partial object sync.
5+
/// Every stored message is guaranteed to have a non-nil `.object` with either `.map` or `.counter` populated.
76
internal struct SyncObjectsPool: Sequence {
8-
/// A computed view of a stored `InboundObjectMessage`, yielded during iteration.
9-
///
10-
/// Preserves backward compatibility with the consumption side in `ObjectsPool.nosync_applySyncObjectsPool`.
11-
internal struct Entry {
12-
/// Guaranteed to have either `.map` or `.counter` populated.
13-
internal var state: ObjectState
14-
/// The `serialTimestamp` of the `ObjectMessage` that generated this entry.
15-
internal var objectMessageSerialTimestamp: Date?
16-
}
17-
7+
/// Keyed by `objectId`. Every value has a non-nil `.object` with either `.map` or `.counter` populated; the
8+
/// `accumulate` method enforces this invariant.
189
private var objectMessages: [String: InboundObjectMessage]
1910

2011
/// Creates an empty pool.
2112
internal init() {
2213
objectMessages = [:]
2314
}
2415

25-
/// Accumulates an `ObjectMessage` into the pool per RTO5f.
16+
/// Accumulates object messages into the pool per RTO5f.
2617
internal mutating func accumulate(
27-
objectMessage: InboundObjectMessage,
18+
_ objectMessages: [InboundObjectMessage],
19+
logger: Logger,
20+
) {
21+
for objectMessage in objectMessages {
22+
accumulate(objectMessage, logger: logger)
23+
}
24+
}
25+
26+
/// Accumulates a single `ObjectMessage` into the pool per RTO5f.
27+
private mutating func accumulate(
28+
_ objectMessage: InboundObjectMessage,
2829
logger: Logger,
2930
) {
3031
guard let object = objectMessage.object else {
@@ -33,7 +34,8 @@ internal struct SyncObjectsPool: Sequence {
3334

3435
let objectId = object.objectId
3536

36-
// RTO5f3: Reject unsupported object types before pool lookup. This provides the guarantee documented on Entry.state.
37+
// RTO5f3: Reject unsupported object types before pool lookup. Only messages whose `.object` has `.map` or `.counter`
38+
// are stored, which callers of the iteration can rely on.
3739
guard object.map != nil || object.counter != nil else {
3840
logger.log("Skipping unsupported object type during sync for objectId \(objectId)", level: .warn)
3941
return
@@ -51,7 +53,13 @@ internal struct SyncObjectsPool: Sequence {
5153
var merged = existing
5254
if let incomingEntries = object.map?.entries {
5355
var mergedObject = merged.object!
54-
var mergedMap = mergedObject.map!
56+
guard var mergedMap = mergedObject.map else {
57+
// Not a specified scenario — the server won't send a map and a non-map for the same
58+
// objectId in practice. Guard defensively rather than force-unwrapping.
59+
logger.log("Existing entry for objectId \(objectId) is not a map; replacing with incoming message", level: .error)
60+
objectMessages[objectId] = objectMessage
61+
return
62+
}
5563
var mergedEntries = mergedMap.entries ?? [:]
5664
mergedEntries.merge(incomingEntries) { _, new in new }
5765
mergedMap.entries = mergedEntries
@@ -82,16 +90,8 @@ internal struct SyncObjectsPool: Sequence {
8290
self.underlying = underlying
8391
}
8492

85-
internal mutating func next() -> Entry? {
86-
guard let message = underlying.next() else {
87-
return nil
88-
}
89-
90-
// We only store messages whose `object` is non-nil (see `accumulate`).
91-
return Entry(
92-
state: message.object!,
93-
objectMessageSerialTimestamp: message.serialTimestamp,
94-
)
93+
internal mutating func next() -> InboundObjectMessage? {
94+
underlying.next()
9595
}
9696
}
9797

Tests/AblyLiveObjectsTests/ObjectsPoolTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@ import _AblyPluginSupportPrivate
22
@testable import AblyLiveObjects
33
import Testing
44

5-
extension SyncObjectsPool {
5+
private extension SyncObjectsPool {
66
/// Test-only convenience to create a `SyncObjectsPool` from an array of `(state, serialTimestamp)` pairs,
77
/// wrapping each in an `InboundObjectMessage` and calling `accumulate`.
88
static func testsOnly_fromStates(
99
_ states: [(state: ObjectState, serialTimestamp: Date?)],
1010
logger: AblyLiveObjects.Logger = TestLogger(),
1111
) -> SyncObjectsPool {
1212
var pool = SyncObjectsPool()
13-
for pair in states {
14-
let message = TestFactories.inboundObjectMessage(
13+
let messages = states.map { pair in
14+
TestFactories.inboundObjectMessage(
1515
object: pair.state,
1616
serialTimestamp: pair.serialTimestamp,
1717
)
18-
pool.accumulate(objectMessage: message, logger: logger)
1918
}
19+
pool.accumulate(messages, logger: logger)
2020
return pool
2121
}
2222
}

0 commit comments

Comments
 (0)