Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ let package = Package(
.package(
url: "https://github.com/ably/ably-cocoa",
// TODO: Unpin before next release
revision: "23954b55bea7fa24beaa6e43beb580d501e6b6e1",
revision: "25d8ef094290aed4571c878da44b9c293b9f8e85",
),
.package(
url: "https://github.com/ably/ably-cocoa-plugin-support",
// Be sure to use `exact` here and not `from`; SPM does not have any special handling of 0.x versions and will resolve 'from: "0.2.0"' to anything less than 1.0.0.
// TODO: Unpin before next release
revision: "37ad19df2cc6063c74ec88ecefc2ddf491db5ebf",
revision: "c034504a5ef426f64e7b27534e86a0c547f3b1e8",
),
.package(
url: "https://github.com/apple/swift-argument-parser",
Expand Down
19 changes: 18 additions & 1 deletion Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,24 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
let callbackQueue = pluginAPI.callbackQueue(for: client)
let options = ARTClientOptions.castPluginPublicClientOptions(pluginAPI.options(for: client))

let garbageCollectionOptions = options.garbageCollectionOptions ?? {
if let latestConnectionDetails = pluginAPI.nosync_latestConnectionDetails(for: client), let gracePeriod = latestConnectionDetails.objectsGCGracePeriod {
// If we already have connection details, then use its grace period per RTO10b2
.init(gracePeriod: .dynamic(gracePeriod.doubleValue))
} else {
// Use the default grace period
.init()
}
}()

let logger = DefaultLogger(pluginLogger: pluginLogger, pluginAPI: pluginAPI)
logger.log("LiveObjects.DefaultInternalPlugin received prepare(_:)", level: .debug)
let liveObjects = InternalDefaultRealtimeObjects(
logger: logger,
internalQueue: internalQueue,
userCallbackQueue: callbackQueue,
clock: DefaultSimpleClock(),
garbageCollectionOptions: options.garbageCollectionOptions ?? .init(),
garbageCollectionOptions: garbageCollectionOptions,
)
pluginAPI.nosync_setPluginDataValue(liveObjects, forKey: Self.pluginDataKey, channel: channel)
}
Expand Down Expand Up @@ -133,6 +143,13 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
)
}

internal func nosync_onConnected(withConnectionDetails connectionDetails: (any ConnectionDetailsProtocol)?, channel: any RealtimeChannel) {
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod

// RTO10b
nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod)
}

// MARK: - Sending `OBJECT` ProtocolMessage

internal static func sendObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo

/// The RTO10a interval at which we will perform garbage collection.
private let garbageCollectionInterval: TimeInterval
/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
private nonisolated(unsafe) var garbageCollectionGracePeriod: TimeInterval
// The task that runs the periodic garbage collection described in RTO10.
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!

Expand All @@ -29,10 +27,30 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
/// The default value comes from the suggestion in RTO10a.
internal var interval: TimeInterval = 5 * 60

/// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `gcGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime.
/// The initial RTO10b grace period for which we will retain tombstoned objects and map entries. This value may later get overridden by the `objectsGCGracePeriod` of a `CONNECTED` `ProtocolMessage` from Realtime.
///
/// This default value comes from RTO10b3; can be overridden for testing.
internal var gracePeriod: TimeInterval = 24 * 60 * 60
internal var gracePeriod: GracePeriod = .dynamic(Self.defaultGracePeriod)

/// The default value from RTO10b3.
internal static let defaultGracePeriod: TimeInterval = 24 * 60 * 60

internal enum GracePeriod: Encodable, Hashable {
/// The client will always use this grace period, and will not update the grace period from the `objectsGCGracePeriod` of a `CONNECTED` `ProtocolMessage`.
///
/// - Important: This should only be used in tests.
case fixed(TimeInterval)

/// The client will use this grace period, which may be subsequently updated by the `objectsGCGracePeriod` of a `CONNECTED` `ProtocolMessage`.
case dynamic(TimeInterval)

internal var toTimeInterval: TimeInterval {
switch self {
case let .fixed(timeInterval), let .dynamic(timeInterval):
timeInterval
}
}
}
}

internal var testsOnly_objectsPool: ObjectsPool {
Expand Down Expand Up @@ -111,10 +129,10 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
userCallbackQueue: userCallbackQueue,
clock: clock,
),
garbageCollectionGracePeriod: garbageCollectionOptions.gracePeriod,
),
)
garbageCollectionInterval = garbageCollectionOptions.interval
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod

garbageCollectionTask = Task { [weak self, garbageCollectionInterval] in
do {
Expand Down Expand Up @@ -354,7 +372,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
internal func performGarbageCollection() {
mutableStateMutex.withSync { mutableState in
mutableState.objectsPool.nosync_performGarbageCollection(
gracePeriod: garbageCollectionGracePeriod,
gracePeriod: mutableState.garbageCollectionGracePeriod.toTimeInterval,
clock: clock,
logger: logger,
eventsContinuation: completedGarbageCollectionEventsWithoutBufferingContinuation,
Expand All @@ -370,6 +388,29 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
completedGarbageCollectionEventsWithoutBuffering
}

/// Sets the garbage collection grace period.
///
/// Call this upon receiving a `CONNECTED` `ProtocolMessage`, per RTO10b2.
///
/// - Note: If the `.fixed` grace period option was chosen on instantiation, this is a no-op.
internal func nosync_setGarbageCollectionGracePeriod(_ gracePeriod: TimeInterval) {
mutableStateMutex.withoutSync { mutableState in
switch mutableState.garbageCollectionGracePeriod {
case .fixed:
// no-op
break
case .dynamic:
mutableState.garbageCollectionGracePeriod = .dynamic(gracePeriod)
}
}
}

internal var testsOnly_gcGracePeriod: TimeInterval {
mutableStateMutex.withSync { mutableState in
mutableState.garbageCollectionGracePeriod.toTimeInterval
}
}

// MARK: - Testing

/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
Expand All @@ -395,6 +436,9 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectsPoo
internal var onChannelAttachedHasObjects: Bool?
internal var objectsEventSubscriptionStorage = SubscriptionStorage<ObjectsEvent, Void>()

/// The RTO10b grace period for which we will retain tombstoned objects and map entries.
internal var garbageCollectionGracePeriod: GarbageCollectionOptions.GracePeriod

/// The state that drives the emission of the `syncing` and `synced` events.
///
/// This manipulation of this value is based on https://github.com/ably/ably-js/blob/0c5baa9273ca87aec6ca594833d59c4c4d2dddbb/src/plugins/objects/objects.ts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects {
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
coreSDK.testsOnly_overridePublish(with: newImplementation)
}

internal var testsOnly_gcGracePeriod: TimeInterval {
proxied.testsOnly_gcGracePeriod
}
}
11 changes: 11 additions & 0 deletions Tests/AblyLiveObjectsTests/Helpers/Ably+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ extension ARTRestProtocol {
}.get()
}
}

extension ARTConnectionProtocol {
@discardableResult
func onceAsync(_ event: ARTRealtimeConnectionEvent) async -> ARTConnectionStateChange {
await withCheckedContinuation { continuation in
once(event) { stateChange in
continuation.resume(returning: stateChange)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3730,6 +3730,63 @@ private struct ObjectsIntegrationTests {

// TODO: Implement the remaining scenarios

// MARK: - GC Grace Period

@Test("gcGracePeriod is set from connectionDetails.objectsGCGracePeriod")
func gcGracePeriod_isSetFromConnectionDetails() async throws {
let client = try await realtimeWithObjects(options: .init())

try await monitorConnectionThenCloseAndFinishAsync(client) {
await client.connection.onceAsync(.connected)

let channel = client.channels.get("channel", options: channelOptionsWithObjects())
let objects = try #require(channel.objects as? PublicDefaultRealtimeObjects)
let connectionDetails = client.internal.latestConnectionDetails

// gcGracePeriod should be set after the initial connection
let initialConnectionDetailsGracePeriod = try #require(connectionDetails?.objectsGCGracePeriod)
#expect(objects.testsOnly_gcGracePeriod == initialConnectionDetailsGracePeriod.doubleValue, "Check gcGracePeriod is set after initial connection from connectionDetails.objectsGCGracePeriod")

let testProxyTransport = try #require(client.internal.transport as? TestProxyTransport)
let connectedProtocolMessage = ARTProtocolMessage()
connectedProtocolMessage.action = .connected
connectedProtocolMessage.connectionDetails = .init(clientId: nil, connectionKey: nil, maxMessageSize: 10, maxFrameSize: 10, maxInboundRate: 10, connectionStateTtl: 10, serverId: "", maxIdleInterval: 10, objectsGCGracePeriod: 0.999) // all arbitrary except objectsGCGracePeriod
client.internal.queue.ably_syncNoDeadlock {
testProxyTransport.receive(connectedProtocolMessage)
}

#expect(objects.testsOnly_gcGracePeriod == 0.999, "Check gcGracePeriod is updated on new CONNECTED event")
}
}

@Test("gcGracePeriod has a default value if connectionDetails.objectsGCGracePeriod is missing")
func gcGracePeriod_usesDefaultValue() async throws {
let client = try await realtimeWithObjects(options: .init())

try await monitorConnectionThenCloseAndFinishAsync(client) {
await client.connection.onceAsync(.connected)

let channel = client.channels.get("channel", options: channelOptionsWithObjects())
let objects = try #require(channel.objects as? PublicDefaultRealtimeObjects)

client.internal.queue.ably_syncNoDeadlock {
objects.testsOnly_proxied.nosync_setGarbageCollectionGracePeriod(0.999)
}
#expect(objects.testsOnly_gcGracePeriod == 0.999)

// send a CONNECTED event without objectsGCGracePeriod, it should use the default value instead
let testProxyTransport = try #require(client.internal.transport as? TestProxyTransport)
let connectedProtocolMessage = ARTProtocolMessage()
connectedProtocolMessage.action = .connected
connectedProtocolMessage.connectionDetails = .init(clientId: nil, connectionKey: nil, maxMessageSize: 10, maxFrameSize: 10, maxInboundRate: 10, connectionStateTtl: 10, serverId: "", maxIdleInterval: 10, objectsGCGracePeriod: nil) // all arbitrary except objectsGCGracePeriod
client.internal.queue.ably_syncNoDeadlock {
testProxyTransport.receive(connectedProtocolMessage)
}

#expect(objects.testsOnly_gcGracePeriod == InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod, "Check gcGracePeriod is set to a default value if connectionDetails.objectsGCGracePeriod is missing")
}
}

// MARK: - Tombstones GC Scenarios

enum TombstonesGCScenarios: Scenarios {
Expand Down Expand Up @@ -3895,7 +3952,7 @@ private struct ObjectsIntegrationTests {
var options = testCase.options
let garbageCollectionOptions = InternalDefaultRealtimeObjects.GarbageCollectionOptions(
interval: 0.5,
gracePeriod: 0.25,
gracePeriod: .fixed(0.25),
)
options.garbageCollectionOptions = garbageCollectionOptions

Expand All @@ -3913,7 +3970,7 @@ private struct ObjectsIntegrationTests {
let internallyTypedObjects = try #require(objects as? PublicDefaultRealtimeObjects)
let waitForTombstonedObjectsToBeCollected: @Sendable (Date) async throws -> Void = { (tombstonedAt: Date) in
// Sleep until we're sure we're past tombstonedAt + gracePeriod
let timeUntilGracePeriodExpires = (tombstonedAt + garbageCollectionOptions.gracePeriod).timeIntervalSince(.init())
let timeUntilGracePeriodExpires = (tombstonedAt + garbageCollectionOptions.gracePeriod.toTimeInterval).timeIntervalSince(.init())
if timeUntilGracePeriodExpires > 0 {
try await Task.sleep(nanoseconds: UInt64(timeUntilGracePeriodExpires * Double(NSEC_PER_SEC)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class TestProxyTransport: ARTWebSocketTransport, @unchecked Sendable {
msg.action = .connected
msg.connectionId = "x-xxxxxxxx"
msg.connectionKey = "xxxxxxx-xxxxxxxxxxxxxx-xxxxxxxx"
msg.connectionDetails = ARTConnectionDetails(clientId: clientId, connectionKey: "a8c10!t-3D0O4ejwTdvLkl-b33a8c10", maxMessageSize: 16384, maxFrameSize: 262_144, maxInboundRate: 250, connectionStateTtl: 60, serverId: "testServerId", maxIdleInterval: 15000)
msg.connectionDetails = ARTConnectionDetails(clientId: clientId, connectionKey: "a8c10!t-3D0O4ejwTdvLkl-b33a8c10", maxMessageSize: 16384, maxFrameSize: 262_144, maxInboundRate: 250, connectionStateTtl: 60, serverId: "testServerId", maxIdleInterval: 15000, objectsGCGracePeriod: 86_400_000)
super.receive(msg)
}
}
Expand Down