Skip to content

Commit 35850d9

Browse files
Switch CoreSDK to use callback-based methods
Replace CoreSDK's `publish` and `fetchServerTime` methods to use callbacks and expect to be called on the internal queue. The usage of `async` methods was not a good mix with the rest of this plugin's heavy usage of the internal queue as its synchronisation mechanism, and led to a lot of switching between normal code and `async` code which was hard to read. Each caller now enters the internal queue once via withCheckedContinuation + dispatchQueue.async and stays there through the whole operation chain, eliminating repeated queue hops. I've left the overridable publish implementations (used by the tests) as `async` functions, because Claude ran into all sorts of concurrency checking issues when trying to port the JS integration tests to use callbacks for this, and I didn't really feel like trying to fix it (nor do these implementations need access to the plugin's internal state at the moment so there's no compelling reason for them to do their work on the internal queue). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c45cbe8 commit 35850d9

File tree

6 files changed

+255
-182
lines changed

6 files changed

+255
-182
lines changed

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import Ably
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
9-
func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo)
9+
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
1010

1111
/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
12-
func fetchServerTime() async throws(ARTErrorInfo) -> Date
12+
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)
1313

14-
/// Replaces the implementation of ``publish(objectMessages:)``.
14+
/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
1515
///
1616
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
1717
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
@@ -50,31 +50,36 @@ internal final class DefaultCoreSDK: CoreSDK {
5050

5151
// MARK: - CoreSDK conformance
5252

53-
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) {
54-
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
53+
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
54+
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
5555

5656
// Use the overridden implementation if supplied
5757
let overriddenImplementation = mutex.withLock {
5858
overriddenPublishImplementation
5959
}
6060
if let overriddenImplementation {
61-
do {
62-
try await overriddenImplementation(objectMessages)
63-
} catch {
64-
guard let artErrorInfo = error as? ARTErrorInfo else {
65-
preconditionFailure("Expected ARTErrorInfo, got \(error)")
61+
let queue = pluginAPI.internalQueue(for: client)
62+
Task {
63+
do {
64+
try await overriddenImplementation(objectMessages)
65+
queue.async { callback(.success(())) }
66+
} catch {
67+
guard let artErrorInfo = error as? ARTErrorInfo else {
68+
preconditionFailure("Expected ARTErrorInfo, got \(error)")
69+
}
70+
queue.async { callback(.failure(artErrorInfo)) }
6671
}
67-
throw artErrorInfo
6872
}
6973
return
7074
}
7175

7276
// TODO: Implement message size checking (https://github.com/ably/ably-liveobjects-swift-plugin/issues/13)
73-
try await DefaultInternalPlugin.sendObject(
77+
DefaultInternalPlugin.nosync_sendObject(
7478
objectMessages: objectMessages,
7579
channel: channel,
7680
client: client,
7781
pluginAPI: pluginAPI,
82+
callback: callback,
7883
)
7984
}
8085

@@ -84,26 +89,21 @@ internal final class DefaultCoreSDK: CoreSDK {
8489
}
8590
}
8691

87-
internal func fetchServerTime() async throws(ARTErrorInfo) -> Date {
88-
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Date, ARTErrorInfo>, _>) in
89-
let internalQueue = pluginAPI.internalQueue(for: client)
90-
91-
internalQueue.async { [client, pluginAPI] in
92-
pluginAPI.nosync_fetchServerTime(for: client) { serverTime, error in
93-
// We don't currently rely on this documented behaviour of `noSync_fetchServerTime` but we may do later, so assert it to be sure it's happening.
94-
dispatchPrecondition(condition: .onQueue(internalQueue))
95-
96-
if let error {
97-
continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
98-
} else {
99-
guard let serverTime else {
100-
preconditionFailure("nosync_fetchServerTime gave nil serverTime and nil error")
101-
}
102-
continuation.resume(returning: .success(serverTime))
103-
}
92+
internal func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void) {
93+
let internalQueue = pluginAPI.internalQueue(for: client)
94+
95+
pluginAPI.nosync_fetchServerTime(for: client) { serverTime, error in
96+
dispatchPrecondition(condition: .onQueue(internalQueue))
97+
98+
if let error {
99+
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
100+
} else {
101+
guard let serverTime else {
102+
preconditionFailure("nosync_fetchServerTime gave nil serverTime and nil error")
104103
}
104+
callback(.success(serverTime))
105105
}
106-
}.get()
106+
}
107107
}
108108

109109
internal var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState {

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,27 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
152152

153153
// MARK: - Sending `OBJECT` ProtocolMessage
154154

155-
internal static func sendObject(
155+
internal static func nosync_sendObject(
156156
objectMessages: [OutboundObjectMessage],
157157
channel: _AblyPluginSupportPrivate.RealtimeChannel,
158158
client: _AblyPluginSupportPrivate.RealtimeClient,
159159
pluginAPI: PluginAPIProtocol,
160-
) async throws(ARTErrorInfo) {
160+
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
161+
) {
161162
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
163+
let internalQueue = pluginAPI.internalQueue(for: client)
164+
165+
pluginAPI.nosync_sendObject(
166+
withObjectMessages: objectMessageBoxes,
167+
channel: channel,
168+
) { error in
169+
dispatchPrecondition(condition: .onQueue(internalQueue))
162170

163-
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
164-
let internalQueue = pluginAPI.internalQueue(for: client)
165-
166-
internalQueue.async {
167-
pluginAPI.nosync_sendObject(
168-
withObjectMessages: objectMessageBoxes,
169-
channel: channel,
170-
) { error in
171-
// We don't currently rely on this documented behaviour of `nosync_sendObject` but we may do later, so assert it to be sure it's happening.
172-
dispatchPrecondition(condition: .onQueue(internalQueue))
173-
174-
if let error {
175-
continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
176-
} else {
177-
continuation.resume(returning: .success(()))
178-
}
179-
}
171+
if let error {
172+
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
173+
} else {
174+
callback(.success(()))
180175
}
181-
}.get()
176+
}
182177
}
183178
}

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -104,34 +104,42 @@ internal final class InternalDefaultLiveCounter: Sendable {
104104
}
105105

106106
internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
107-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
108-
// RTLC12c
109-
try coreSDK.nosync_validateChannelState(
110-
notIn: [.detached, .failed, .suspended],
111-
operationDescription: "LiveCounter.increment",
112-
)
113-
114-
// RTLC12e1
115-
if !amount.isFinite {
116-
throw LiveObjectsError.counterIncrementAmountInvalid(amount: amount).toARTErrorInfo()
107+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
108+
do throws(ARTErrorInfo) {
109+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
110+
// RTLC12e1
111+
if !amount.isFinite {
112+
throw LiveObjectsError.counterIncrementAmountInvalid(amount: amount).toARTErrorInfo()
113+
}
114+
115+
// RTLC12c
116+
try coreSDK.nosync_validateChannelState(
117+
notIn: [.detached, .failed, .suspended],
118+
operationDescription: "LiveCounter.increment",
119+
)
120+
121+
let objectMessage = OutboundObjectMessage(
122+
operation: .init(
123+
// RTLC12e2
124+
action: .known(.counterInc),
125+
// RTLC12e3
126+
objectId: mutableState.liveObjectMutableState.objectID,
127+
counterOp: .init(
128+
// RTLC12e4
129+
amount: .init(value: amount),
130+
),
131+
),
132+
)
133+
134+
// RTLC12f
135+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
136+
continuation.resume(returning: result)
137+
}
138+
}
139+
} catch {
140+
continuation.resume(returning: .failure(error))
117141
}
118-
119-
return OutboundObjectMessage(
120-
operation: .init(
121-
// RTLC12e2
122-
action: .known(.counterInc),
123-
// RTLC12e3
124-
objectId: mutableState.liveObjectMutableState.objectID,
125-
counterOp: .init(
126-
// RTLC12e4
127-
amount: .init(value: amount),
128-
),
129-
),
130-
)
131-
}
132-
133-
// RTLC12f
134-
try await coreSDK.publish(objectMessages: [objectMessage])
142+
}.get()
135143
}
136144

137145
internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -160,50 +160,66 @@ internal final class InternalDefaultLiveMap: Sendable {
160160
}
161161

162162
internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
163-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
164-
// RTLM20c
165-
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set")
166-
167-
return OutboundObjectMessage(
168-
operation: .init(
169-
// RTLM20e2
170-
action: .known(.mapSet),
171-
// RTLM20e3
172-
objectId: mutableState.liveObjectMutableState.objectID,
173-
mapOp: .init(
174-
// RTLM20e4
175-
key: key,
176-
// RTLM20e5
177-
data: value.nosync_toObjectData,
178-
),
179-
),
180-
)
181-
}
182-
183-
try await coreSDK.publish(objectMessages: [objectMessage])
163+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
164+
do throws(ARTErrorInfo) {
165+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
166+
// RTLM20c
167+
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set")
168+
169+
let objectMessage = OutboundObjectMessage(
170+
operation: .init(
171+
// RTLM20e2
172+
action: .known(.mapSet),
173+
// RTLM20e3
174+
objectId: mutableState.liveObjectMutableState.objectID,
175+
mapOp: .init(
176+
// RTLM20e4
177+
key: key,
178+
// RTLM20e5
179+
data: value.nosync_toObjectData,
180+
),
181+
),
182+
)
183+
184+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
185+
continuation.resume(returning: result)
186+
}
187+
}
188+
} catch {
189+
continuation.resume(returning: .failure(error))
190+
}
191+
}.get()
184192
}
185193

186194
internal func remove(key: String, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
187-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
188-
// RTLM21c
189-
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove")
190-
191-
return OutboundObjectMessage(
192-
operation: .init(
193-
// RTLM21e2
194-
action: .known(.mapRemove),
195-
// RTLM21e3
196-
objectId: mutableState.liveObjectMutableState.objectID,
197-
mapOp: .init(
198-
// RTLM21e4
199-
key: key,
200-
),
201-
),
202-
)
203-
}
204-
205-
// RTLM21f
206-
try await coreSDK.publish(objectMessages: [objectMessage])
195+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
196+
do throws(ARTErrorInfo) {
197+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
198+
// RTLM21c
199+
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove")
200+
201+
let objectMessage = OutboundObjectMessage(
202+
operation: .init(
203+
// RTLM21e2
204+
action: .known(.mapRemove),
205+
// RTLM21e3
206+
objectId: mutableState.liveObjectMutableState.objectID,
207+
mapOp: .init(
208+
// RTLM21e4
209+
key: key,
210+
),
211+
),
212+
)
213+
214+
// RTLM21f
215+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
216+
continuation.resume(returning: result)
217+
}
218+
}
219+
} catch {
220+
continuation.resume(returning: .failure(error))
221+
}
222+
}.get()
207223
}
208224

209225
@discardableResult

0 commit comments

Comments
 (0)