Skip to content

Commit 36782f0

Browse files
Merge pull request #119 from ably/make-CoreSDK-use-callbacks
Make `CoreSDK` callback-based
2 parents db28b94 + 35850d9 commit 36782f0

File tree

6 files changed

+263
-184
lines changed

6 files changed

+263
-184
lines changed

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import Ably
66
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
77
internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
9-
func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo)
9+
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
1010

1111
/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
12-
func fetchServerTime() async throws(ARTErrorInfo) -> Date
12+
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)
1313

14-
/// Replaces the implementation of ``publish(objectMessages:)``.
14+
/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
1515
///
1616
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
1717
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
@@ -29,7 +29,7 @@ internal final class DefaultCoreSDK: CoreSDK {
2929
private let pluginAPI: PluginAPIProtocol
3030
private let logger: Logger
3131

32-
/// If set to true, ``publish(objectMessages:)`` will behave like a no-op.
32+
/// If set, ``publish(objectMessages:)`` delegates to this implementation.
3333
///
3434
/// This enables the `testsOnly_overridePublish(with:)` test hook.
3535
///
@@ -50,31 +50,36 @@ internal final class DefaultCoreSDK: CoreSDK {
5050

5151
// MARK: - CoreSDK conformance
5252

53-
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(ARTErrorInfo) {
54-
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
53+
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
54+
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
5555

5656
// Use the overridden implementation if supplied
5757
let overriddenImplementation = mutex.withLock {
5858
overriddenPublishImplementation
5959
}
6060
if let overriddenImplementation {
61-
do {
62-
try await overriddenImplementation(objectMessages)
63-
} catch {
64-
guard let artErrorInfo = error as? ARTErrorInfo else {
65-
preconditionFailure("Expected ARTErrorInfo, got \(error)")
61+
let queue = pluginAPI.internalQueue(for: client)
62+
Task {
63+
do {
64+
try await overriddenImplementation(objectMessages)
65+
queue.async { callback(.success(())) }
66+
} catch {
67+
guard let artErrorInfo = error as? ARTErrorInfo else {
68+
preconditionFailure("Expected ARTErrorInfo, got \(error)")
69+
}
70+
queue.async { callback(.failure(artErrorInfo)) }
6671
}
67-
throw artErrorInfo
6872
}
6973
return
7074
}
7175

7276
// TODO: Implement message size checking (https://github.com/ably/ably-liveobjects-swift-plugin/issues/13)
73-
try await DefaultInternalPlugin.sendObject(
77+
DefaultInternalPlugin.nosync_sendObject(
7478
objectMessages: objectMessages,
7579
channel: channel,
7680
client: client,
7781
pluginAPI: pluginAPI,
82+
callback: callback,
7883
)
7984
}
8085

@@ -84,26 +89,21 @@ internal final class DefaultCoreSDK: CoreSDK {
8489
}
8590
}
8691

87-
internal func fetchServerTime() async throws(ARTErrorInfo) -> Date {
88-
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Date, ARTErrorInfo>, _>) in
89-
let internalQueue = pluginAPI.internalQueue(for: client)
90-
91-
internalQueue.async { [client, pluginAPI] in
92-
pluginAPI.nosync_fetchServerTime(for: client) { serverTime, error in
93-
// We don't currently rely on this documented behaviour of `noSync_fetchServerTime` but we may do later, so assert it to be sure it's happening.
94-
dispatchPrecondition(condition: .onQueue(internalQueue))
95-
96-
if let error {
97-
continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
98-
} else {
99-
guard let serverTime else {
100-
preconditionFailure("nosync_fetchServerTime gave nil serverTime and nil error")
101-
}
102-
continuation.resume(returning: .success(serverTime))
103-
}
92+
internal func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void) {
93+
let internalQueue = pluginAPI.internalQueue(for: client)
94+
95+
pluginAPI.nosync_fetchServerTime(for: client) { serverTime, error in
96+
dispatchPrecondition(condition: .onQueue(internalQueue))
97+
98+
if let error {
99+
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
100+
} else {
101+
guard let serverTime else {
102+
preconditionFailure("nosync_fetchServerTime gave nil serverTime and nil error")
104103
}
104+
callback(.success(serverTime))
105105
}
106-
}.get()
106+
}
107107
}
108108

109109
internal var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState {

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,27 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
152152

153153
// MARK: - Sending `OBJECT` ProtocolMessage
154154

155-
internal static func sendObject(
155+
internal static func nosync_sendObject(
156156
objectMessages: [OutboundObjectMessage],
157157
channel: _AblyPluginSupportPrivate.RealtimeChannel,
158158
client: _AblyPluginSupportPrivate.RealtimeClient,
159159
pluginAPI: PluginAPIProtocol,
160-
) async throws(ARTErrorInfo) {
160+
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
161+
) {
161162
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
163+
let internalQueue = pluginAPI.internalQueue(for: client)
164+
165+
pluginAPI.nosync_sendObject(
166+
withObjectMessages: objectMessageBoxes,
167+
channel: channel,
168+
) { error in
169+
dispatchPrecondition(condition: .onQueue(internalQueue))
162170

163-
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
164-
let internalQueue = pluginAPI.internalQueue(for: client)
165-
166-
internalQueue.async {
167-
pluginAPI.nosync_sendObject(
168-
withObjectMessages: objectMessageBoxes,
169-
channel: channel,
170-
) { error in
171-
// We don't currently rely on this documented behaviour of `nosync_sendObject` but we may do later, so assert it to be sure it's happening.
172-
dispatchPrecondition(condition: .onQueue(internalQueue))
173-
174-
if let error {
175-
continuation.resume(returning: .failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
176-
} else {
177-
continuation.resume(returning: .success(()))
178-
}
179-
}
171+
if let error {
172+
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
173+
} else {
174+
callback(.success(()))
180175
}
181-
}.get()
176+
}
182177
}
183178
}

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -104,34 +104,42 @@ internal final class InternalDefaultLiveCounter: Sendable {
104104
}
105105

106106
internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
107-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
108-
// RTLC12c
109-
try coreSDK.nosync_validateChannelState(
110-
notIn: [.detached, .failed, .suspended],
111-
operationDescription: "LiveCounter.increment",
112-
)
113-
114-
// RTLC12e1
115-
if !amount.isFinite {
116-
throw LiveObjectsError.counterIncrementAmountInvalid(amount: amount).toARTErrorInfo()
107+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
108+
do throws(ARTErrorInfo) {
109+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
110+
// RTLC12e1
111+
if !amount.isFinite {
112+
throw LiveObjectsError.counterIncrementAmountInvalid(amount: amount).toARTErrorInfo()
113+
}
114+
115+
// RTLC12c
116+
try coreSDK.nosync_validateChannelState(
117+
notIn: [.detached, .failed, .suspended],
118+
operationDescription: "LiveCounter.increment",
119+
)
120+
121+
let objectMessage = OutboundObjectMessage(
122+
operation: .init(
123+
// RTLC12e2
124+
action: .known(.counterInc),
125+
// RTLC12e3
126+
objectId: mutableState.liveObjectMutableState.objectID,
127+
counterOp: .init(
128+
// RTLC12e4
129+
amount: .init(value: amount),
130+
),
131+
),
132+
)
133+
134+
// RTLC12f
135+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
136+
continuation.resume(returning: result)
137+
}
138+
}
139+
} catch {
140+
continuation.resume(returning: .failure(error))
117141
}
118-
119-
return OutboundObjectMessage(
120-
operation: .init(
121-
// RTLC12e2
122-
action: .known(.counterInc),
123-
// RTLC12e3
124-
objectId: mutableState.liveObjectMutableState.objectID,
125-
counterOp: .init(
126-
// RTLC12e4
127-
amount: .init(value: amount),
128-
),
129-
),
130-
)
131-
}
132-
133-
// RTLC12f
134-
try await coreSDK.publish(objectMessages: [objectMessage])
142+
}.get()
135143
}
136144

137145
internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -160,50 +160,66 @@ internal final class InternalDefaultLiveMap: Sendable {
160160
}
161161

162162
internal func set(key: String, value: InternalLiveMapValue, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
163-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
164-
// RTLM20c
165-
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set")
166-
167-
return OutboundObjectMessage(
168-
operation: .init(
169-
// RTLM20e2
170-
action: .known(.mapSet),
171-
// RTLM20e3
172-
objectId: mutableState.liveObjectMutableState.objectID,
173-
mapOp: .init(
174-
// RTLM20e4
175-
key: key,
176-
// RTLM20e5
177-
data: value.nosync_toObjectData,
178-
),
179-
),
180-
)
181-
}
182-
183-
try await coreSDK.publish(objectMessages: [objectMessage])
163+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
164+
do throws(ARTErrorInfo) {
165+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
166+
// RTLM20c
167+
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.set")
168+
169+
let objectMessage = OutboundObjectMessage(
170+
operation: .init(
171+
// RTLM20e2
172+
action: .known(.mapSet),
173+
// RTLM20e3
174+
objectId: mutableState.liveObjectMutableState.objectID,
175+
mapOp: .init(
176+
// RTLM20e4
177+
key: key,
178+
// RTLM20e5
179+
data: value.nosync_toObjectData,
180+
),
181+
),
182+
)
183+
184+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
185+
continuation.resume(returning: result)
186+
}
187+
}
188+
} catch {
189+
continuation.resume(returning: .failure(error))
190+
}
191+
}.get()
184192
}
185193

186194
internal func remove(key: String, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
187-
let objectMessage = try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
188-
// RTLM21c
189-
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove")
190-
191-
return OutboundObjectMessage(
192-
operation: .init(
193-
// RTLM21e2
194-
action: .known(.mapRemove),
195-
// RTLM21e3
196-
objectId: mutableState.liveObjectMutableState.objectID,
197-
mapOp: .init(
198-
// RTLM21e4
199-
key: key,
200-
),
201-
),
202-
)
203-
}
204-
205-
// RTLM21f
206-
try await coreSDK.publish(objectMessages: [objectMessage])
195+
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
196+
do throws(ARTErrorInfo) {
197+
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
198+
// RTLM21c
199+
try coreSDK.nosync_validateChannelState(notIn: [.detached, .failed, .suspended], operationDescription: "LiveMap.remove")
200+
201+
let objectMessage = OutboundObjectMessage(
202+
operation: .init(
203+
// RTLM21e2
204+
action: .known(.mapRemove),
205+
// RTLM21e3
206+
objectId: mutableState.liveObjectMutableState.objectID,
207+
mapOp: .init(
208+
// RTLM21e4
209+
key: key,
210+
),
211+
),
212+
)
213+
214+
// RTLM21f
215+
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
216+
continuation.resume(returning: result)
217+
}
218+
}
219+
} catch {
220+
continuation.resume(returning: .failure(error))
221+
}
222+
}.get()
207223
}
208224

209225
@discardableResult

0 commit comments

Comments
 (0)