Skip to content

Commit a77505b

Browse files
Apply operations on ACK
Instead of waiting for the server to echo back an operation before applying it locally, operations are now applied immediately upon receiving the ACK from Realtime. Implements the behaviours from spec commit 56a0bba and ports the corresponding integration tests from ably-js commit 6b1c2de, plus the test fix in ably-js commit b6eec92. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 35850d9 commit a77505b

26 files changed

+1989
-393
lines changed

AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.resolved

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ let package = Package(
1818
),
1919
],
2020
dependencies: [
21+
// TODO: Unpin before release
2122
.package(
22-
url: "https://github.com/ably/ably-cocoa",
23-
from: "1.2.55",
23+
url: "https://github.com/ably/ably-cocoa.git",
24+
revision: "dbdd4db5c0c64f4330e200ff2ca9bc9528598ff3",
2425
),
26+
// TODO: Unpin before release
2527
.package(
2628
url: "https://github.com/ably/ably-cocoa-plugin-support",
27-
from: "1.0.0",
29+
revision: "242fac1d4a829c8a63f9b3f96a71809e1f6eeffc",
2830
),
2931
.package(
3032
url: "https://github.com/apple/swift-argument-parser",

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ import Ably
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
9-
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
9+
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void)
1010

1111
/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
1212
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)
1313

1414
/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
1515
///
1616
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
17-
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
17+
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult)
1818

1919
/// Returns the current state of the Realtime channel that this wraps.
2020
var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get }
@@ -34,7 +34,7 @@ internal final class DefaultCoreSDK: CoreSDK {
3434
/// This enables the `testsOnly_overridePublish(with:)` test hook.
3535
///
3636
/// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
37-
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
37+
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)?
3838

3939
internal init(
4040
channel: _AblyPluginSupportPrivate.RealtimeChannel,
@@ -50,7 +50,7 @@ internal final class DefaultCoreSDK: CoreSDK {
5050

5151
// MARK: - CoreSDK conformance
5252

53-
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
53+
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void) {
5454
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
5555

5656
// Use the overridden implementation if supplied
@@ -61,8 +61,8 @@ internal final class DefaultCoreSDK: CoreSDK {
6161
let queue = pluginAPI.internalQueue(for: client)
6262
Task {
6363
do {
64-
try await overriddenImplementation(objectMessages)
65-
queue.async { callback(.success(())) }
64+
let publishResult = try await overriddenImplementation(objectMessages)
65+
queue.async { callback(.success(publishResult)) }
6666
} catch {
6767
guard let artErrorInfo = error as? ARTErrorInfo else {
6868
preconditionFailure("Expected ARTErrorInfo, got \(error)")
@@ -83,7 +83,7 @@ internal final class DefaultCoreSDK: CoreSDK {
8383
)
8484
}
8585

86-
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
86+
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) {
8787
mutex.withLock {
8888
overriddenPublishImplementation = newImplementation
8989
}

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
7070
/// A class that wraps an object message.
7171
///
7272
/// We need this intermediate type because we want object messages to be structs — because they're nicer to work with internally — but a struct can't conform to the class-bound `_AblyPluginSupportPrivate.ObjectMessageProtocol`.
73-
private final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
73+
internal final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
7474
internal let objectMessage: T
7575

76-
init(objectMessage: T) {
76+
internal init(objectMessage: T) {
7777
self.objectMessage = objectMessage
7878
}
7979
}
@@ -143,11 +143,32 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
143143
)
144144
}
145145

146+
internal func nosync_onChannelStateChanged(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, toState state: _AblyPluginSupportPrivate.RealtimeChannelState, reason: (any _AblyPluginSupportPrivate.PublicErrorInfo)?) {
147+
let errorReason = reason.map { ARTErrorInfo.castPluginPublicErrorInfo($0) }
148+
nosync_realtimeObjects(for: channel).nosync_onChannelStateChanged(toState: state, reason: errorReason)
149+
}
150+
146151
internal func nosync_onConnected(withConnectionDetails connectionDetails: (any ConnectionDetailsProtocol)?, channel: any RealtimeChannel) {
147-
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
152+
let realtimeObjects = nosync_realtimeObjects(for: channel)
148153

154+
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
149155
// RTO10b
150-
nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod)
156+
realtimeObjects.nosync_setGarbageCollectionGracePeriod(gracePeriod)
157+
158+
// Push the siteCode from connectionDetails
159+
let siteCode: String? = {
160+
guard let connectionDetails else {
161+
return nil
162+
}
163+
164+
// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
165+
guard (connectionDetails as AnyObject).responds(to: #selector(ConnectionDetailsProtocol.siteCode)) else {
166+
preconditionFailure("ably-cocoa's connectionDetails does not implement siteCode. Please update ably-cocoa to a version that supports apply-on-ACK.")
167+
}
168+
169+
return connectionDetails.siteCode?()
170+
}()
171+
realtimeObjects.nosync_setSiteCode(siteCode)
151172
}
152173

153174
// MARK: - Sending `OBJECT` ProtocolMessage
@@ -157,21 +178,31 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
157178
channel: _AblyPluginSupportPrivate.RealtimeChannel,
158179
client: _AblyPluginSupportPrivate.RealtimeClient,
159180
pluginAPI: PluginAPIProtocol,
160-
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
181+
callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void,
161182
) {
162183
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
163184
let internalQueue = pluginAPI.internalQueue(for: client)
164185

165-
pluginAPI.nosync_sendObject(
186+
// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
187+
guard (pluginAPI as AnyObject).responds(to: #selector(PluginAPIProtocol.nosync_sendObject(withObjectMessages:channel:completionWithResult:))) else {
188+
preconditionFailure("ably-cocoa does not implement nosync_sendObjectWithObjectMessages:channel:completionWithResult:. Please update ably-cocoa to a version that supports apply-on-ACK.")
189+
}
190+
191+
pluginAPI.nosync_sendObject!(
166192
withObjectMessages: objectMessageBoxes,
167193
channel: channel,
168-
) { error in
194+
) { error, pluginPublishResult in
169195
dispatchPrecondition(condition: .onQueue(internalQueue))
170196

171197
if let error {
172198
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
173199
} else {
174-
callback(.success(()))
200+
guard let pluginPublishResult else {
201+
preconditionFailure("Got nil publishResult and nil error")
202+
}
203+
204+
let publishResult = PublishResult(pluginPublishResult: pluginPublishResult)
205+
callback(.success(publishResult))
175206
}
176207
}
177208
}

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
103103
}
104104
}
105105

106-
internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
106+
internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
107107
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
108108
do throws(ARTErrorInfo) {
109109
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
@@ -131,8 +131,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
131131
),
132132
)
133133

134-
// RTLC12f
135-
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
134+
// RTLC12g
135+
realtimeObjects.nosync_publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) { result in
136136
continuation.resume(returning: result)
137137
}
138138
}
@@ -142,9 +142,9 @@ internal final class InternalDefaultLiveCounter: Sendable {
142142
}.get()
143143
}
144144

145-
internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
145+
internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
146146
// RTLC13b
147-
try await increment(amount: -amount, coreSDK: coreSDK)
147+
try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects)
148148
}
149149

150150
@discardableResult
@@ -245,16 +245,20 @@ internal final class InternalDefaultLiveCounter: Sendable {
245245
}
246246

247247
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
248+
///
249+
/// - Returns: `true` if the operation was applied, `false` if it was skipped (RTLC7g).
248250
internal func nosync_apply(
249251
_ operation: ObjectOperation,
252+
source: ObjectsOperationSource,
250253
objectMessageSerial: String?,
251254
objectMessageSiteCode: String?,
252255
objectMessageSerialTimestamp: Date?,
253256
objectsPool: inout ObjectsPool,
254-
) {
257+
) -> Bool {
255258
mutableStateMutex.withoutSync { mutableState in
256259
mutableState.apply(
257260
operation,
261+
source: source,
258262
objectMessageSerial: objectMessageSerial,
259263
objectMessageSiteCode: objectMessageSiteCode,
260264
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
@@ -379,29 +383,34 @@ internal final class InternalDefaultLiveCounter: Sendable {
379383
}
380384

381385
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
386+
///
387+
/// - Returns: `true` if the operation was applied, `false` if skipped (RTLC7g).
382388
internal mutating func apply(
383389
_ operation: ObjectOperation,
390+
source: ObjectsOperationSource,
384391
objectMessageSerial: String?,
385392
objectMessageSiteCode: String?,
386393
objectMessageSerialTimestamp: Date?,
387394
objectsPool: inout ObjectsPool,
388395
logger: Logger,
389396
clock: SimpleClock,
390397
userCallbackQueue: DispatchQueue,
391-
) {
398+
) -> Bool {
392399
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
393400
// RTLC7b
394401
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
395-
return
402+
return false
396403
}
397404

398405
// RTLC7c
399-
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
406+
if source == .channel {
407+
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
408+
}
400409

401410
// RTLC7e
402411
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
403412
if liveObjectMutableState.isTombstone {
404-
return
413+
return false
405414
}
406415

407416
switch operation.action {
@@ -413,11 +422,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
413422
)
414423
// RTLC7d1a
415424
liveObjectMutableState.emit(update, on: userCallbackQueue)
425+
// RTLC7d1b
426+
return true
416427
case .known(.counterInc):
417428
// RTLC7d2
418429
let update = applyCounterIncOperation(operation.counterOp)
419430
// RTLC7d2a
420431
liveObjectMutableState.emit(update, on: userCallbackQueue)
432+
// RTLC7d2b
433+
return true
421434
case .known(.objectDelete):
422435
let dataBeforeApplyingOperation = data
423436

@@ -431,9 +444,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
431444

432445
// RTLC7d4a
433446
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
447+
// RTLC7d4b
448+
return true
434449
default:
435450
// RTLC7d3
436451
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
452+
return false
437453
}
438454
}
439455

0 commit comments

Comments
 (0)