Skip to content

Commit 52ecf38

Browse files
Apply operations on ACK (RTO20)
Instead of waiting for the server to echo back an operation before applying it locally, operations are now applied immediately upon receiving the ACK from Realtime. Implements the behaviours from spec commit 56a0bba and ports the corresponding integration tests from ably-js commit 6b1c2de. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 35850d9 commit 52ecf38

24 files changed

+1446
-265
lines changed

AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.resolved

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ let package = Package(
1818
),
1919
],
2020
dependencies: [
21+
// TODO: Unpin before release
2122
.package(
22-
url: "https://github.com/ably/ably-cocoa",
23-
from: "1.2.55",
23+
url: "https://github.com/ably/ably-cocoa.git",
24+
revision: "dbdd4db5c0c64f4330e200ff2ca9bc9528598ff3",
2425
),
26+
// TODO: Unpin before release
2527
.package(
2628
url: "https://github.com/ably/ably-cocoa-plugin-support",
27-
from: "1.0.0",
29+
revision: "242fac1d4a829c8a63f9b3f96a71809e1f6eeffc",
2830
),
2931
.package(
3032
url: "https://github.com/apple/swift-argument-parser",

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ import Ably
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
9-
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
9+
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void)
1010

1111
/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
1212
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)
1313

1414
/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
1515
///
1616
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
17-
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
17+
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult)
1818

1919
/// Returns the current state of the Realtime channel that this wraps.
2020
var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get }
@@ -34,7 +34,7 @@ internal final class DefaultCoreSDK: CoreSDK {
3434
/// This enables the `testsOnly_overridePublish(with:)` test hook.
3535
///
3636
/// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
37-
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
37+
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)?
3838

3939
internal init(
4040
channel: _AblyPluginSupportPrivate.RealtimeChannel,
@@ -50,7 +50,7 @@ internal final class DefaultCoreSDK: CoreSDK {
5050

5151
// MARK: - CoreSDK conformance
5252

53-
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
53+
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void) {
5454
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
5555

5656
// Use the overridden implementation if supplied
@@ -61,8 +61,8 @@ internal final class DefaultCoreSDK: CoreSDK {
6161
let queue = pluginAPI.internalQueue(for: client)
6262
Task {
6363
do {
64-
try await overriddenImplementation(objectMessages)
65-
queue.async { callback(.success(())) }
64+
let publishResult = try await overriddenImplementation(objectMessages)
65+
queue.async { callback(.success(publishResult)) }
6666
} catch {
6767
guard let artErrorInfo = error as? ARTErrorInfo else {
6868
preconditionFailure("Expected ARTErrorInfo, got \(error)")
@@ -83,7 +83,7 @@ internal final class DefaultCoreSDK: CoreSDK {
8383
)
8484
}
8585

86-
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
86+
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) {
8787
mutex.withLock {
8888
overriddenPublishImplementation = newImplementation
8989
}

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
7070
/// A class that wraps an object message.
7171
///
7272
/// We need this intermediate type because we want object messages to be structs — because they're nicer to work with internally — but a struct can't conform to the class-bound `_AblyPluginSupportPrivate.ObjectMessageProtocol`.
73-
private final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
73+
internal final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
7474
internal let objectMessage: T
7575

76-
init(objectMessage: T) {
76+
internal init(objectMessage: T) {
7777
self.objectMessage = objectMessage
7878
}
7979
}
@@ -143,11 +143,32 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
143143
)
144144
}
145145

146+
internal func nosync_onChannelStateChanged(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, toState state: _AblyPluginSupportPrivate.RealtimeChannelState, reason: (any _AblyPluginSupportPrivate.PublicErrorInfo)?) {
147+
let errorReason = reason.map { ARTErrorInfo.castPluginPublicErrorInfo($0) }
148+
nosync_realtimeObjects(for: channel).nosync_onChannelStateChanged(toState: state, reason: errorReason)
149+
}
150+
146151
internal func nosync_onConnected(withConnectionDetails connectionDetails: (any ConnectionDetailsProtocol)?, channel: any RealtimeChannel) {
147-
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
152+
let realtimeObjects = nosync_realtimeObjects(for: channel)
148153

154+
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
149155
// RTO10b
150-
nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod)
156+
realtimeObjects.nosync_setGarbageCollectionGracePeriod(gracePeriod)
157+
158+
// Push the siteCode from connectionDetails
159+
let siteCode: String? = {
160+
guard let connectionDetails else {
161+
return nil
162+
}
163+
164+
// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
165+
guard (connectionDetails as AnyObject).responds(to: #selector(ConnectionDetailsProtocol.siteCode)) else {
166+
preconditionFailure("ably-cocoa's connectionDetails does not implement siteCode. Please update ably-cocoa to a version that supports apply-on-ACK.")
167+
}
168+
169+
return connectionDetails.siteCode?()
170+
}()
171+
realtimeObjects.nosync_setSiteCode(siteCode)
151172
}
152173

153174
// MARK: - Sending `OBJECT` ProtocolMessage
@@ -157,21 +178,31 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
157178
channel: _AblyPluginSupportPrivate.RealtimeChannel,
158179
client: _AblyPluginSupportPrivate.RealtimeClient,
159180
pluginAPI: PluginAPIProtocol,
160-
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
181+
callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void,
161182
) {
162183
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
163184
let internalQueue = pluginAPI.internalQueue(for: client)
164185

165-
pluginAPI.nosync_sendObject(
186+
// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
187+
guard (pluginAPI as AnyObject).responds(to: #selector(PluginAPIProtocol.nosync_sendObject(withObjectMessages:channel:completionWithResult:))) else {
188+
preconditionFailure("ably-cocoa does not implement nosync_sendObjectWithObjectMessages:channel:completionWithResult:. Please update ably-cocoa to a version that supports apply-on-ACK.")
189+
}
190+
191+
pluginAPI.nosync_sendObject!(
166192
withObjectMessages: objectMessageBoxes,
167193
channel: channel,
168-
) { error in
194+
) { error, pluginPublishResult in
169195
dispatchPrecondition(condition: .onQueue(internalQueue))
170196

171197
if let error {
172198
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
173199
} else {
174-
callback(.success(()))
200+
guard let pluginPublishResult else {
201+
preconditionFailure("Got nil publishResult and nil error")
202+
}
203+
204+
let publishResult = PublishResult(pluginPublishResult: pluginPublishResult)
205+
callback(.success(publishResult))
175206
}
176207
}
177208
}

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
103103
}
104104
}
105105

106-
internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
106+
internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
107107
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
108108
do throws(ARTErrorInfo) {
109109
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
@@ -131,8 +131,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
131131
),
132132
)
133133

134-
// RTLC12f
135-
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
134+
// RTLC12g
135+
realtimeObjects.nosync_publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) { result in
136136
continuation.resume(returning: result)
137137
}
138138
}
@@ -142,9 +142,9 @@ internal final class InternalDefaultLiveCounter: Sendable {
142142
}.get()
143143
}
144144

145-
internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
145+
internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
146146
// RTLC13b
147-
try await increment(amount: -amount, coreSDK: coreSDK)
147+
try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects)
148148
}
149149

150150
@discardableResult
@@ -245,13 +245,16 @@ internal final class InternalDefaultLiveCounter: Sendable {
245245
}
246246

247247
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
248+
///
249+
/// - Returns: `true` if the operation was applied, `false` if it was skipped (RTLC7g).
248250
internal func nosync_apply(
249251
_ operation: ObjectOperation,
250252
objectMessageSerial: String?,
251253
objectMessageSiteCode: String?,
252254
objectMessageSerialTimestamp: Date?,
253255
objectsPool: inout ObjectsPool,
254-
) {
256+
source: ObjectsOperationSource,
257+
) -> Bool {
255258
mutableStateMutex.withoutSync { mutableState in
256259
mutableState.apply(
257260
operation,
@@ -262,6 +265,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
262265
logger: logger,
263266
clock: clock,
264267
userCallbackQueue: userCallbackQueue,
268+
source: source,
265269
)
266270
}
267271
}
@@ -379,6 +383,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
379383
}
380384

381385
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
386+
///
387+
/// - Returns: `true` if the operation was applied, `false` if skipped (RTLC7g).
382388
internal mutating func apply(
383389
_ operation: ObjectOperation,
384390
objectMessageSerial: String?,
@@ -388,20 +394,22 @@ internal final class InternalDefaultLiveCounter: Sendable {
388394
logger: Logger,
389395
clock: SimpleClock,
390396
userCallbackQueue: DispatchQueue,
391-
) {
397+
source: ObjectsOperationSource,
398+
) -> Bool {
392399
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
393400
// RTLC7b
394401
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
395-
return
402+
return false
396403
}
397404

398-
// RTLC7c
399-
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
405+
// RTLC7c: Only update siteTimeserials for channel-sourced operations
406+
if source == .channel {
407+
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
408+
}
400409

401410
// RTLC7e
402-
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
403411
if liveObjectMutableState.isTombstone {
404-
return
412+
return false
405413
}
406414

407415
switch operation.action {
@@ -413,11 +421,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
413421
)
414422
// RTLC7d1a
415423
liveObjectMutableState.emit(update, on: userCallbackQueue)
424+
// RTLC7d1b
425+
return true
416426
case .known(.counterInc):
417427
// RTLC7d2
418428
let update = applyCounterIncOperation(operation.counterOp)
419429
// RTLC7d2a
420430
liveObjectMutableState.emit(update, on: userCallbackQueue)
431+
// RTLC7d2b
432+
return true
421433
case .known(.objectDelete):
422434
let dataBeforeApplyingOperation = data
423435

@@ -431,9 +443,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
431443

432444
// RTLC7d4a
433445
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
446+
// RTLC7d4b
447+
return true
434448
default:
435449
// RTLC7d3
436450
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
451+
return false
437452
}
438453
}
439454

0 commit comments

Comments
 (0)