Skip to content

Commit e183c0e

Browse files
Apply operations on ACK (RTO20)
Instead of waiting for the server to echo back an operation before applying it locally, operations are now applied immediately upon receiving the ACK from Realtime. Implements the behaviours from spec commit 56a0bba and ports the corresponding integration tests from ably-js commit 6b1c2de. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 35850d9 commit e183c0e

22 files changed

+1321
-238
lines changed

AblyLiveObjects.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 5 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.resolved

Lines changed: 5 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ let package = Package(
1919
],
2020
dependencies: [
2121
.package(
22-
url: "https://github.com/ably/ably-cocoa",
23-
from: "1.2.55",
22+
url: "https://github.com/ably/ably-cocoa.git",
23+
revision: "5ce0a5866ce691b9bbdbd62b8c03d3adac356ba6",
2424
),
2525
.package(
26-
url: "https://github.com/ably/ably-cocoa-plugin-support",
27-
from: "1.0.0",
26+
url: "https://github.com/ably/ably-cocoa-plugin-support.git",
27+
revision: "242fac1d4a829c8a63f9b3f96a71809e1f6eeffc",
2828
),
2929
.package(
3030
url: "https://github.com/apple/swift-argument-parser",

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,21 @@ import Ably
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
9-
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
9+
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void)
1010

1111
/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
1212
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)
1313

1414
/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
1515
///
1616
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
17-
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
17+
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult)
1818

1919
/// Returns the current state of the Realtime channel that this wraps.
2020
var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get }
21+
22+
/// Returns the `siteCode` from the latest `connectionDetails`, or nil if unavailable.
23+
func nosync_siteCode() -> String?
2124
}
2225

2326
internal final class DefaultCoreSDK: CoreSDK {
@@ -34,7 +37,7 @@ internal final class DefaultCoreSDK: CoreSDK {
3437
/// This enables the `testsOnly_overridePublish(with:)` test hook.
3538
///
3639
/// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
37-
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
40+
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)?
3841

3942
internal init(
4043
channel: _AblyPluginSupportPrivate.RealtimeChannel,
@@ -50,7 +53,7 @@ internal final class DefaultCoreSDK: CoreSDK {
5053

5154
// MARK: - CoreSDK conformance
5255

53-
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
56+
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void) {
5457
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
5558

5659
// Use the overridden implementation if supplied
@@ -61,8 +64,8 @@ internal final class DefaultCoreSDK: CoreSDK {
6164
let queue = pluginAPI.internalQueue(for: client)
6265
Task {
6366
do {
64-
try await overriddenImplementation(objectMessages)
65-
queue.async { callback(.success(())) }
67+
let publishResult = try await overriddenImplementation(objectMessages)
68+
queue.async { callback(.success((publishResult))) }
6669
} catch {
6770
guard let artErrorInfo = error as? ARTErrorInfo else {
6871
preconditionFailure("Expected ARTErrorInfo, got \(error)")
@@ -83,7 +86,7 @@ internal final class DefaultCoreSDK: CoreSDK {
8386
)
8487
}
8588

86-
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
89+
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) {
8790
mutex.withLock {
8891
overriddenPublishImplementation = newImplementation
8992
}
@@ -109,6 +112,10 @@ internal final class DefaultCoreSDK: CoreSDK {
109112
internal var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState {
110113
pluginAPI.nosync_state(for: channel)
111114
}
115+
116+
internal func nosync_siteCode() -> String? {
117+
DefaultInternalPlugin.siteCode(client: client, pluginAPI: pluginAPI)
118+
}
112119
}
113120

114121
// MARK: - Channel State Validation

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
7070
/// A class that wraps an object message.
7171
///
7272
/// We need this intermediate type because we want object messages to be structs — because they're nicer to work with internally — but a struct can't conform to the class-bound `_AblyPluginSupportPrivate.ObjectMessageProtocol`.
73-
private final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
73+
internal final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
7474
internal let objectMessage: T
7575

76-
init(objectMessage: T) {
76+
internal init(objectMessage: T) {
7777
self.objectMessage = objectMessage
7878
}
7979
}
@@ -143,6 +143,11 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
143143
)
144144
}
145145

146+
internal func nosync_onChannelStateChanged(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, toState state: _AblyPluginSupportPrivate.RealtimeChannelState, reason: (any _AblyPluginSupportPrivate.PublicErrorInfo)?) {
147+
let errorReason = reason.map { ARTErrorInfo.castPluginPublicErrorInfo($0) }
148+
nosync_realtimeObjects(for: channel).nosync_onChannelStateChanged(toState: state, reason: errorReason)
149+
}
150+
146151
internal func nosync_onConnected(withConnectionDetails connectionDetails: (any ConnectionDetailsProtocol)?, channel: any RealtimeChannel) {
147152
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
148153

@@ -157,22 +162,53 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
157162
channel: _AblyPluginSupportPrivate.RealtimeChannel,
158163
client: _AblyPluginSupportPrivate.RealtimeClient,
159164
pluginAPI: PluginAPIProtocol,
160-
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
165+
callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void,
161166
) {
162167
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
163168
let internalQueue = pluginAPI.internalQueue(for: client)
169+
170+
let selector = NSSelectorFromString("nosync_sendObjectWithObjectMessages:channel:completionWithResult:")
171+
guard (pluginAPI as AnyObject).responds(to: selector) else {
172+
fatalError("ably-cocoa does not implement nosync_sendObjectWithObjectMessages:channel:completionWithResult:. Please update ably-cocoa to a version that supports apply-on-ACK.")
173+
}
164174

165-
pluginAPI.nosync_sendObject(
175+
pluginAPI.nosync_sendObject!(
166176
withObjectMessages: objectMessageBoxes,
167177
channel: channel,
168-
) { error in
178+
) { error, pluginPublishResult in
169179
dispatchPrecondition(condition: .onQueue(internalQueue))
170180

171181
if let error {
172182
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
173183
} else {
174-
callback(.success(()))
184+
guard let pluginPublishResult else {
185+
preconditionFailure("Got nil publishResult and nil error")
186+
}
187+
188+
let publishResult = PublishResult(pluginPublishResult: pluginPublishResult)
189+
callback(.success((publishResult)))
175190
}
176191
}
177192
}
193+
194+
// MARK: - Connection Details
195+
196+
/// Returns the `siteCode` from the latest `connectionDetails`, or nil if unavailable.
197+
internal static func siteCode(
198+
client: _AblyPluginSupportPrivate.RealtimeClient,
199+
pluginAPI: PluginAPIProtocol,
200+
) -> String? {
201+
guard let connectionDetails = pluginAPI.nosync_latestConnectionDetails(for: client) else {
202+
return nil
203+
}
204+
205+
// siteCode is @optional on the protocol; use responds(to:) to distinguish
206+
// "not implemented" from "returns nil"
207+
let selector = NSSelectorFromString("siteCode")
208+
guard (connectionDetails as AnyObject).responds(to: selector) else {
209+
fatalError("ably-cocoa's connectionDetails does not implement siteCode. Please update ably-cocoa to a version that supports apply-on-ACK.")
210+
}
211+
212+
return connectionDetails.siteCode?()
213+
}
178214
}

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
103103
}
104104
}
105105

106-
internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
106+
internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
107107
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
108108
do throws(ARTErrorInfo) {
109109
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
@@ -131,8 +131,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
131131
),
132132
)
133133

134-
// RTLC12f
135-
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
134+
// RTLC12g
135+
realtimeObjects.nosync_publishAndApply(objectMessages: [objectMessage]) { result in
136136
continuation.resume(returning: result)
137137
}
138138
}
@@ -142,9 +142,9 @@ internal final class InternalDefaultLiveCounter: Sendable {
142142
}.get()
143143
}
144144

145-
internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
145+
internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: InternalDefaultRealtimeObjects) async throws(ARTErrorInfo) {
146146
// RTLC13b
147-
try await increment(amount: -amount, coreSDK: coreSDK)
147+
try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects)
148148
}
149149

150150
@discardableResult
@@ -245,13 +245,17 @@ internal final class InternalDefaultLiveCounter: Sendable {
245245
}
246246

247247
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
248+
///
249+
/// - Returns: `true` if the operation was applied, `false` if it was skipped (RTLC7g).
250+
@discardableResult
248251
internal func nosync_apply(
249252
_ operation: ObjectOperation,
250253
objectMessageSerial: String?,
251254
objectMessageSiteCode: String?,
252255
objectMessageSerialTimestamp: Date?,
253256
objectsPool: inout ObjectsPool,
254-
) {
257+
source: ObjectsOperationSource,
258+
) -> Bool {
255259
mutableStateMutex.withoutSync { mutableState in
256260
mutableState.apply(
257261
operation,
@@ -262,6 +266,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
262266
logger: logger,
263267
clock: clock,
264268
userCallbackQueue: userCallbackQueue,
269+
source: source,
265270
)
266271
}
267272
}
@@ -379,6 +384,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
379384
}
380385

381386
/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
387+
///
388+
/// - Returns: `true` if the operation was applied, `false` if skipped (RTLC7g).
382389
internal mutating func apply(
383390
_ operation: ObjectOperation,
384391
objectMessageSerial: String?,
@@ -388,20 +395,22 @@ internal final class InternalDefaultLiveCounter: Sendable {
388395
logger: Logger,
389396
clock: SimpleClock,
390397
userCallbackQueue: DispatchQueue,
391-
) {
398+
source: ObjectsOperationSource,
399+
) -> Bool {
392400
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
393401
// RTLC7b
394402
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
395-
return
403+
return false
396404
}
397405

398-
// RTLC7c
399-
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
406+
// RTLC7c: Only update siteTimeserials for channel-sourced operations
407+
if source == .channel {
408+
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
409+
}
400410

401411
// RTLC7e
402-
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
403412
if liveObjectMutableState.isTombstone {
404-
return
413+
return false
405414
}
406415

407416
switch operation.action {
@@ -413,11 +422,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
413422
)
414423
// RTLC7d1a
415424
liveObjectMutableState.emit(update, on: userCallbackQueue)
425+
// RTLC7d1b
426+
return true
416427
case .known(.counterInc):
417428
// RTLC7d2
418429
let update = applyCounterIncOperation(operation.counterOp)
419430
// RTLC7d2a
420431
liveObjectMutableState.emit(update, on: userCallbackQueue)
432+
// RTLC7d2b
433+
return true
421434
case .known(.objectDelete):
422435
let dataBeforeApplyingOperation = data
423436

@@ -431,9 +444,12 @@ internal final class InternalDefaultLiveCounter: Sendable {
431444

432445
// RTLC7d4a
433446
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
447+
// RTLC7d4b
448+
return true
434449
default:
435450
// RTLC7d3
436451
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
452+
return false
437453
}
438454
}
439455

0 commit comments

Comments
 (0)