@@ -352,15 +352,14 @@ private final class AckInterceptor: @unchecked Sendable {
352352 }
353353}
354354
355- /// Injects an ATTACHED protocol message into the channel, optionally with the HAS_OBJECTS flag.
356- /// This triggers a sync sequence when `hasObjects` is true.
357- private func injectAttachedMessage( channel: ARTRealtimeChannel , hasObjects: Bool ) async {
355+ /// Injects an ATTACHED protocol message into the channel with the given flags.
356+ private func injectAttachedMessage( channel: ARTRealtimeChannel , flags: Int64 ) async {
358357 await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
359358 channel. internal. queue. async {
360359 let pm = ARTProtocolMessage ( )
361360 pm. action = . attached
362361 pm. channel = channel. name
363- pm. flags = hasObjects ? Int64 ( 1 << 7 ) : 0 // ARTProtocolMessageFlagHasObjects
362+ pm. flags = flags
364363 channel. internal. onChannelMessage ( pm)
365364 continuation. resume ( )
366365 }
@@ -615,7 +614,7 @@ private struct ObjectsIntegrationTests {
615614 #expect( try #require( ctx. root. get ( key: " foo " ) ? . stringValue) == " bar " , " Check root has key before ATTACHED " )
616615
617616 // inject ATTACHED without HAS_OBJECTS flag
618- await injectAttachedMessage ( channel: ctx. channel, hasObjects : false )
617+ await injectAttachedMessage ( channel: ctx. channel, flags : 0 )
619618
620619 // local state should be cleared — root should have no keys
621620 #expect( try ctx. root. size == 0 , " Check root has no keys after ATTACHED without HAS_OBJECTS " )
@@ -2272,64 +2271,206 @@ private struct ObjectsIntegrationTests {
22722271 . init(
22732272 disabled: false ,
22742273 allTransportsAndProtocols: false ,
2275- description: " buffered object operation messages are discarded when new OBJECT_SYNC sequence starts " ,
2274+ description: " buffered object operation messages are discarded on ATTACHED " ,
22762275 action: { ctx in
22772276 let root = ctx. root
22782277 let objectsHelper = ctx. objectsHelper
22792278 let channel = ctx. channel
2280- let client = ctx. client
22812279
22822280 // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages
22832281 await objectsHelper. processObjectStateMessageOnChannel (
22842282 channel: channel,
22852283 syncSerial: " serial:cursor " ,
22862284 )
22872285
2288- // Inject operations, expect them to be discarded when sync with new sequence id starts
2289- // Note that unlike in the JS test we do not perform this concurrently because if we were to do that in Swift Concurrency we would not be able to guarantee that the operations are applied in the correct order (if they're not then messages will be discarded due to serials being out of order)
2290- for (i, keyData) in primitiveKeyData. enumerated ( ) {
2291- var wireData = keyData. data. mapValues { WireValue ( jsonValue: $0) }
2286+ // Inject operation during sync sequence, expect it to be discarded when ATTACHED arrives
2287+ await objectsHelper. processObjectOperationMessageOnChannel (
2288+ channel: channel,
2289+ serial: lexicoTimeserial ( seriesId: " aaa " , timestamp: 0 , counter: 0 ) ,
2290+ siteCode: " aaa " ,
2291+ state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " foo " , data: . object( [ " string " : . string( " bar " ) ] ) ) ] ,
2292+ )
22922293
2293- if let bytesValue = wireData [ " bytes " ] , client. internal. options. useBinaryProtocol {
2294- let bytesString = try #require( bytesValue. stringValue)
2295- wireData [ " bytes " ] = try . data( #require( . init( base64Encoded: bytesString) ) )
2296- }
2294+ // Any ATTACHED message must clear buffered operations and start a new sync sequence
2295+ await injectAttachedMessage ( channel: channel, flags: 1 << 7 ) // HAS_OBJECTS flag is bit 7
22972296
2298- await objectsHelper . processObjectOperationMessageOnChannel (
2299- channel : channel ,
2300- serial : lexicoTimeserial ( seriesId : " aaa " , timestamp : Int64 ( i ) , counter : 0 ) ,
2301- siteCode : " aaa " ,
2302- state : [ objectsHelper . mapSetOp ( objectId : " root " , key : keyData . key , data : . object ( wireData ) ) ] ,
2303- )
2304- }
2297+ // Inject another operation that should be applied when sync ends
2298+ await objectsHelper . processObjectOperationMessageOnChannel (
2299+ channel : channel ,
2300+ serial : lexicoTimeserial ( seriesId : " bbb " , timestamp : 0 , counter : 0 ) ,
2301+ siteCode : " bbb " ,
2302+ state : [ objectsHelper . mapSetOp ( objectId : " root " , key : " baz " , data : . object ( [ " string " : . string ( " qux " ) ] ) ) ] ,
2303+ )
23052304
2306- // Start new sync with new sequence id
2305+ // End sync
23072306 await objectsHelper. processObjectStateMessageOnChannel (
23082307 channel: channel,
2309- syncSerial: " otherserial:cursor " ,
2308+ syncSerial: " serial: " ,
2309+ )
2310+
2311+ // Check root doesn't have data from operations received before ATTACHED
2312+ let fooValue = try root. get ( key: " foo " )
2313+ #expect( fooValue == nil , " Check buffered ops before ATTACHED were discarded and not applied on root " )
2314+
2315+ // Check root has data from operations received after ATTACHED
2316+ #expect( try #require( root. get ( key: " baz " ) ? . stringValue) == " qux " , " Check root has data from operations received after ATTACHED " )
2317+ } ,
2318+ ) ,
2319+ . init(
2320+ // Note: This comment re regression test is preserved from the JS test it's copied from, but this bug never actually existed in the Swift implementation.
2321+ // Regression test: an earlier implementation did not clear buffered operations when receiving
2322+ // an ATTACHED with RESUMED=true on an already-attached channel. The RESUMED flag is irrelevant
2323+ // — buffering is determined by HAS_OBJECTS, and any ATTACHED must clear buffered operations.
2324+ disabled: false ,
2325+ allTransportsAndProtocols: false ,
2326+ description: " buffered object operation messages are discarded when already-attached channel receives ATTACHED with RESUMED flag " ,
2327+ action: { ctx in
2328+ let root = ctx. root
2329+ let objectsHelper = ctx. objectsHelper
2330+ let channel = ctx. channel
2331+
2332+ // Channel is already attached from the test setup
2333+ #expect( channel. state == . attached, " Check channel is already attached before test begins " )
2334+
2335+ // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages
2336+ await objectsHelper. processObjectStateMessageOnChannel (
2337+ channel: channel,
2338+ syncSerial: " serial:cursor " ,
2339+ )
2340+
2341+ // Inject operation, expect it to be discarded when ATTACHED arrives (even with RESUMED)
2342+ await objectsHelper. processObjectOperationMessageOnChannel (
2343+ channel: channel,
2344+ serial: lexicoTimeserial ( seriesId: " aaa " , timestamp: 0 , counter: 0 ) ,
2345+ siteCode: " aaa " ,
2346+ state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " foo " , data: . object( [ " string " : . string( " bar " ) ] ) ) ] ,
23102347 )
23112348
2312- // Inject another operation that should be applied when latest sync ends
2349+ // The RESUMED flag is irrelevant for LiveObjects buffering — any ATTACHED must clear
2350+ // buffered operations and start a new sync sequence
2351+ await injectAttachedMessage ( channel: channel, flags: ( 1 << 7 ) | ( 1 << 2 ) ) // HAS_OBJECTS and RESUMED flags
2352+
2353+ // Inject another operation after ATTACHED
23132354 await objectsHelper. processObjectOperationMessageOnChannel (
23142355 channel: channel,
23152356 serial: lexicoTimeserial ( seriesId: " bbb " , timestamp: 0 , counter: 0 ) ,
23162357 siteCode: " bbb " ,
2358+ state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " baz " , data: . object( [ " string " : . string( " qux " ) ] ) ) ] ,
2359+ )
2360+
2361+ // End sync
2362+ await objectsHelper. processObjectStateMessageOnChannel (
2363+ channel: channel,
2364+ syncSerial: " serial: " ,
2365+ )
2366+
2367+ // Check root doesn't have data from operations received before ATTACHED
2368+ let fooValue = try root. get ( key: " foo " )
2369+ #expect( fooValue == nil , " Check buffered ops before RESUMED ATTACHED were discarded and not applied on root " )
2370+
2371+ // Check root has data from operations received after ATTACHED
2372+ #expect( try #require( root. get ( key: " baz " ) ? . stringValue) == " qux " , " Check root has data from operations received after RESUMED ATTACHED " )
2373+ } ,
2374+ ) ,
2375+ . init(
2376+ // Regression test: an earlier implementation incorrectly cleared buffered operations when a new
2377+ // OBJECT_SYNC sequence started. Only an ATTACHED message should clear buffered operations, not
2378+ // a new OBJECT_SYNC sequence.
2379+ disabled: false ,
2380+ allTransportsAndProtocols: false ,
2381+ description: " buffered object operation messages are NOT discarded on new OBJECT_SYNC sequence " ,
2382+ action: { ctx in
2383+ let root = ctx. root
2384+ let objectsHelper = ctx. objectsHelper
2385+ let channel = ctx. channel
2386+
2387+ // Start new sync sequence with a cursor so client will wait for the next OBJECT_SYNC messages
2388+ await objectsHelper. processObjectStateMessageOnChannel (
2389+ channel: channel,
2390+ syncSerial: " serial:cursor " ,
2391+ )
2392+
2393+ // Inject operation during first sync sequence
2394+ await objectsHelper. processObjectOperationMessageOnChannel (
2395+ channel: channel,
2396+ serial: lexicoTimeserial ( seriesId: " aaa " , timestamp: 0 , counter: 0 ) ,
2397+ siteCode: " aaa " ,
23172398 state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " foo " , data: . object( [ " string " : . string( " bar " ) ] ) ) ] ,
23182399 )
23192400
2401+ // Start new sync with new sequence id — buffered operations should NOT be discarded
2402+ await objectsHelper. processObjectStateMessageOnChannel (
2403+ channel: channel,
2404+ syncSerial: " otherserial:cursor " ,
2405+ )
2406+
2407+ // Inject another operation during second sync sequence
2408+ await objectsHelper. processObjectOperationMessageOnChannel (
2409+ channel: channel,
2410+ serial: lexicoTimeserial ( seriesId: " bbb " , timestamp: 0 , counter: 0 ) ,
2411+ siteCode: " bbb " ,
2412+ state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " baz " , data: . object( [ " string " : . string( " qux " ) ] ) ) ] ,
2413+ )
2414+
23202415 // End sync
23212416 await objectsHelper. processObjectStateMessageOnChannel (
23222417 channel: channel,
23232418 syncSerial: " otherserial: " ,
23242419 )
23252420
2326- // Check root doesn't have data from operations received during first sync
2327- for keyData in primitiveKeyData {
2328- #expect( try root. get ( key: keyData. key) == nil , " Check \" \( keyData. key) \" key doesn't exist on root when OBJECT_SYNC has ended " )
2329- }
2421+ // Check root has data from operations received during first sync sequence
2422+ let fooStringValue = try #require( root. get ( key: " foo " ) ? . stringValue)
2423+ #expect( fooStringValue == " bar " , " Check root has data from operations received during first OBJECT_SYNC sequence " )
23302424
23312425 // Check root has data from operations received during second sync
2332- #expect( try #require( root. get ( key: " foo " ) ? . stringValue) == " bar " , " Check root has data from operations received during second OBJECT_SYNC sequence " )
2426+ let bazStringValue = try #require( root. get ( key: " baz " ) ? . stringValue)
2427+ #expect( bazStringValue == " qux " , " Check root has data from operations received during second OBJECT_SYNC sequence " )
2428+ } ,
2429+ ) ,
2430+ . init(
2431+ disabled: false ,
2432+ allTransportsAndProtocols: false ,
2433+ description: " operations are buffered when OBJECT_SYNC is received after completed sync without expected preceding ATTACHED " ,
2434+ action: { ctx in
2435+ let root = ctx. root
2436+ let objectsHelper = ctx. objectsHelper
2437+ let channel = ctx. channel
2438+
2439+ // Complete an initial sync sequence first
2440+ await objectsHelper. processObjectStateMessageOnChannel (
2441+ channel: channel,
2442+ syncSerial: " serial: " ,
2443+ )
2444+
2445+ // Simulate receiving OBJECT_SYNC without preceding ATTACHED.
2446+ // Normally, for server-initiated resync the server is expected to send ATTACHED with RESUMED=false first.
2447+ // However, if that doesn't happen, the client handles it as a best-effort case by starting to buffer from this point.
2448+ await objectsHelper. processObjectStateMessageOnChannel (
2449+ channel: channel,
2450+ syncSerial: " resync:cursor " ,
2451+ )
2452+
2453+ // Inject operations during this server-initiated resync — they should be buffered
2454+ await objectsHelper. processObjectOperationMessageOnChannel (
2455+ channel: channel,
2456+ serial: lexicoTimeserial ( seriesId: " aaa " , timestamp: 0 , counter: 0 ) ,
2457+ siteCode: " aaa " ,
2458+ state: [ objectsHelper. mapSetOp ( objectId: " root " , key: " foo " , data: . object( [ " string " : . string( " bar " ) ] ) ) ] ,
2459+ )
2460+
2461+ // Check root doesn't have data yet — operations should be buffered during resync
2462+ let fooValueDuringResync = try root. get ( key: " foo " )
2463+ #expect( fooValueDuringResync == nil , " Check \" foo \" key doesn't exist during server-initiated resync " )
2464+
2465+ // End the resync
2466+ await objectsHelper. processObjectStateMessageOnChannel (
2467+ channel: channel,
2468+ syncSerial: " resync: " ,
2469+ )
2470+
2471+ // Check buffered operations are now applied
2472+ let fooStringValue = try #require( root. get ( key: " foo " ) ? . stringValue)
2473+ #expect( fooStringValue == " bar " , " Check root has correct value for \" foo \" key after server-initiated resync completed " )
23332474 } ,
23342475 ) ,
23352476 . init(
@@ -2491,7 +2632,7 @@ private struct ObjectsIntegrationTests {
24912632 . init(
24922633 disabled: false ,
24932634 allTransportsAndProtocols: false ,
2494- description: " subsequent object operation messages are applied immediately after OBJECT_SYNC ended and buffers are applied " ,
2635+ description: " subsequent object operation messages are applied immediately after OBJECT_SYNC ended and buffered operations are applied " ,
24952636 action: { ctx in
24962637 let root = ctx. root
24972638 let objectsHelper = ctx. objectsHelper
@@ -4466,7 +4607,7 @@ private struct ObjectsIntegrationTests {
44664607 let counterId = internalCounter. proxied. testsOnly_objectID
44674608
44684609 // Inject ATTACHED with HAS_OBJECTS to trigger SYNCING state
4469- await injectAttachedMessage ( channel: channel, hasObjects : true )
4610+ await injectAttachedMessage ( channel: channel, flags : 1 << 7 ) // HAS_OBJECTS flag is bit 7
44704611
44714612 // Set up ACK interceptor so we can control when the ACK is delivered
44724613 let ackInterceptor = AckInterceptor ( client: client)
@@ -4540,7 +4681,7 @@ private struct ObjectsIntegrationTests {
45404681 let counterId = internalCounter. proxied. testsOnly_objectID
45414682
45424683 // Inject ATTACHED+HAS_OBJECTS to trigger sync
4543- await injectAttachedMessage ( channel: channel, hasObjects : true )
4684+ await injectAttachedMessage ( channel: channel, flags : 1 << 7 ) // HAS_OBJECTS flag is bit 7
45444685
45454686 // Complete sync with state that uses a fake siteCode.
45464687 // Using a clearly fake siteCode ensures the echo (which has the real siteCode)
@@ -4605,7 +4746,7 @@ private struct ObjectsIntegrationTests {
46054746 try await root. set ( key: " counter " , value: . liveCounter( counter) )
46064747
46074748 // Inject ATTACHED+HAS_OBJECTS to trigger SYNCING state
4608- await injectAttachedMessage ( channel: channel, hasObjects : true )
4749+ await injectAttachedMessage ( channel: channel, flags : 1 << 7 ) // HAS_OBJECTS flag is bit 7
46094750
46104751 // Set up ACK interceptor and start increment
46114752 let ackInterceptor = AckInterceptor ( client: client)
0 commit comments