From 2af45566cf998d3b9c0ab00bbb83f9d863be3c45 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Wed, 17 Jun 2026 13:44:27 +0530 Subject: [PATCH 1/6] improve: fix sending correct ice state --- .../core/analytics/call/CallAnalytics.kt | 10 +- .../call/observer/PeerConnectionAnalytics.kt | 179 ++++++++---- .../analytics/reporting/ClientEventFactory.kt | 5 +- .../reporting/ClientEventReporter.kt | 74 ++--- .../core/analytics/call/CallAnalyticsTest.kt | 5 +- .../observer/PeerConnectionAnalyticsTest.kt | 257 +++++++++++++++++- .../reporting/ClientEventReporterTest.kt | 49 ++-- 7 files changed, 454 insertions(+), 125 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt index be91a5909c..ea046203f2 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt @@ -29,7 +29,9 @@ import io.getstream.video.android.core.analytics.call.observer.PeerConnectionAna import io.getstream.video.android.core.analytics.call.observer.SfuAnalytics import io.getstream.video.android.core.analytics.call.observer.SfuAnalyticsStateHolder import io.getstream.video.android.core.analytics.call.observer.VideoAnalytics +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.Stage +import io.getstream.video.android.core.analytics.call.observer.toVideoAnalyticsIceState import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason import io.getstream.video.android.core.call.RtcSession @@ -130,8 +132,12 @@ internal class CallAnalytics( Pair(AnalyticsCallAbortReason.CUSTOM, callLeaveReason.message) } } - val publisherIceState = session.value?.publisher?.value?.iceState?.value - val subscriberIceState = session.value?.subscriber?.value?.iceState?.value + val publisherIceState = + session.value?.publisher?.value?.iceState?.value?.toVideoAnalyticsIceState() + ?: VideoAnalyticsIceState.NOT_CONNECTED + val subscriberIceState = + session.value?.subscriber?.value?.iceState?.value?.toVideoAnalyticsIceState() + ?: VideoAnalyticsIceState.NOT_CONNECTED eventReporter.abortAllPostCallInFlight( publisherIceState, subscriberIceState, diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index 3f491585ad..5ae2a6b27e 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -21,12 +21,19 @@ import io.getstream.video.android.core.analytics.call.observer.model.Stage import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.PeerConnectionRole import io.getstream.video.android.core.call.RtcSession +import io.getstream.video.android.core.call.connection.StreamPeerConnection import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeoutOrNull import org.webrtc.PeerConnection internal class PeerConnectionAnalytics( @@ -39,7 +46,14 @@ internal class PeerConnectionAnalytics( val stateHolder: PeerConnectionAnalyticsStateHolder = PeerConnectionAnalyticsStateHolder(), ) { - val allowedStates = listOf( + companion object { + /** + * How long a CONNECTED peer connection waits for its ICE state to reach + * [PeerConnection.IceConnectionState.CONNECTED] before reporting the current ICE state. + */ + private const val ICE_CONNECTED_GRACE_MILLIS = 2_000L + } + val allowedPcStates = listOf( PeerConnection.PeerConnectionState.CONNECTING, PeerConnection.PeerConnectionState.FAILED, PeerConnection.PeerConnectionState.CONNECTED, @@ -49,73 +63,105 @@ internal class PeerConnectionAnalytics( stateHolder.state.value.peerConnectionObserverJob?.cancel() val peerConnectionObserverJob = observerScope.launch { stateHolder.state.value.publisherJob?.cancel() - val publisherJob = launch { - session.filterNotNull() - .flatMapLatest { it.publisher.filterNotNull() } - .flatMapLatest { - it.state.filter { it -> - allowedStates.contains( - it, - ) - }.filterNotNull() - }.filter { - val existingStage = stateHolder.state.value.publisherStage - val newStage = getStage(it) - val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) - !isExistingStageAndNewStageAreCompleted - } - .collect { state -> - val publisherStage = getStage(state) - publisherStage?.let { - stateHolder.updatePublisherStage(publisherStage) - observerScope.launch { - session.value?.publisher?.value?.let { publisher -> - onPeerConnectionStateChanged( - publisher.hashCode(), - role = PeerConnectionRole.PUBLISH, - iceState = publisher.iceState.value, - peerConnectionState = state, - ) - } - } - } - } - } + val publisherJob = observeConnection( + session = session, + role = PeerConnectionRole.PUBLISH, + connectionOf = { it.publisher }, + currentStage = { stateHolder.state.value.publisherStage }, + updateStage = stateHolder::updatePublisherStage, + ) stateHolder.updatePublisherJob(publisherJob) + stateHolder.state.value.subscriberJob?.cancel() - val subscriberJob = launch { - session.filterNotNull() - .flatMapLatest { it.subscriber.filterNotNull() } - .flatMapLatest { - it.state.filter { it -> allowedStates.contains(it) }.filterNotNull() - }.filter { - val existingStage = stateHolder.state.value.subscriberStage - val newStage = getStage(it) + val subscriberJob = observeConnection( + session = session, + role = PeerConnectionRole.SUBSCRIBE, + connectionOf = { it.subscriber }, + currentStage = { stateHolder.state.value.subscriberStage }, + updateStage = stateHolder::updateSubscriberStage, + ) + stateHolder.updateSubscriberJob(subscriberJob) + } + stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob) + } + + /** + * Observes a single peer connection ([connectionOf]) for analytics. + * + * The peer-connection [StreamPeerConnection.state] drives the stage tracking: whenever it + * enters one of the [allowedPcStates], the stage is updated (this is independent of the ICE + * state, since the stage is also consumed by the call-leave flow). + * + * Once the peer connection enters an allowed state, the ICE state reported alongside it depends + * on the peer-connection state: + * - [PeerConnection.PeerConnectionState.CONNECTING] / [PeerConnection.PeerConnectionState.FAILED]: + * report the current ICE state immediately. + * - [PeerConnection.PeerConnectionState.CONNECTED]: wait up to [ICE_CONNECTED_GRACE_MILLIS] for + * the ICE state to become [PeerConnection.IceConnectionState.CONNECTED]; if it does not, report + * whatever the ICE state is when the grace period elapses. + * + * [mapLatest] makes the ICE resolution follow the latest peer-connection state (a newer + * peer-connection state cancels an in-progress grace wait), and [distinctUntilChanged] avoids + * emitting the same combination twice. + */ + private fun CoroutineScope.observeConnection( + session: StateFlow, + role: PeerConnectionRole, + connectionOf: (RtcSession) -> StateFlow, + currentStage: () -> Stage, + updateStage: (Stage) -> Unit, + ): Job = launch { + session.filterNotNull() + .flatMapLatest { connectionOf(it).filterNotNull() } + .flatMapLatest { connection -> + connection.state + .filter { allowedPcStates.contains(it) } + .filterNotNull() + .onEach { pcState -> + val existingStage = currentStage() + val newStage = getStage(pcState) val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) - !isExistingStageAndNewStageAreCompleted + if (newStage != null && !isExistingStageAndNewStageAreCompleted) { + updateStage(newStage) + } } - .collect { state -> - val stage = getStage(state) - stage?.let { - stateHolder.updateSubscriberStage(stage) - observerScope.launch { - session.value?.subscriber?.value?.let { subscriber -> - onPeerConnectionStateChanged( - subscriber.hashCode(), - role = PeerConnectionRole.SUBSCRIBE, - iceState = subscriber.iceState.value, - peerConnectionState = state, - ) + .mapLatest { pcState -> + val iceState = if (pcState == PeerConnection.PeerConnectionState.CONNECTED) { + // Give ICE a grace period to reach CONNECTED before reporting the + // connected peer connection; otherwise report the current ICE state. + withTimeoutOrNull(ICE_CONNECTED_GRACE_MILLIS) { + connection.iceState.first { + it == PeerConnection.IceConnectionState.CONNECTED } - } + } ?: connection.iceState.value + } else { + // CONNECTING / FAILED: report the current ICE state immediately. + connection.iceState.value } + PeerConnectionSnapshot( + connection.hashCode(), + pcState, + iceState.toVideoAnalyticsIceState(), + ) } } - stateHolder.updateSubscriberJob(subscriberJob) - } - stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob) + .distinctUntilChanged() + .collect { snapshot -> + onPeerConnectionStateChanged( + peerConnectionHashCode = snapshot.peerConnectionHashCode, + role = role, + iceState = snapshot.iceState, + peerConnectionState = snapshot.peerConnectionState, + ) + } } + private data class PeerConnectionSnapshot( + val peerConnectionHashCode: Int, + val peerConnectionState: PeerConnection.PeerConnectionState, + val iceState: VideoAnalyticsIceState, + ) + private fun getStage(peerConnectionState: PeerConnection.PeerConnectionState): Stage? { return when (peerConnectionState) { PeerConnection.PeerConnectionState.CONNECTING -> { @@ -135,7 +181,7 @@ internal class PeerConnectionAnalytics( internal fun onPeerConnectionStateChanged( peerConnectionHashCode: Int, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, ) { reporter.onPeerConnectionStateChanged( @@ -162,3 +208,18 @@ internal class PeerConnectionAnalytics( observePeerConnections(session) } } +internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): VideoAnalyticsIceState { + return when (this) { + PeerConnection.IceConnectionState.CONNECTED, + PeerConnection.IceConnectionState.COMPLETED, + -> VideoAnalyticsIceState.CONNECTED + + PeerConnection.IceConnectionState.FAILED -> VideoAnalyticsIceState.FAILED + + else -> VideoAnalyticsIceState.NOT_CONNECTED + } +} + +internal enum class VideoAnalyticsIceState(val text: String) { + CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED") +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt index 0396d3c74f..4dd8cf56b8 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt @@ -18,6 +18,7 @@ package io.getstream.video.android.core.analytics.reporting import io.getstream.android.video.generated.models.ClientEvent import io.getstream.video.android.core.StreamVideo +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.model.EventOutcome import io.getstream.video.android.core.analytics.reporting.model.EventStage @@ -46,7 +47,7 @@ internal class ClientEventFactory(val sdkVersion: String, val userAgent: () -> S sfuId: String? = null, peerConnection: PeerConnectionRole? = null, wasPreviouslyConnected: Boolean? = null, - iceState: PeerConnection.IceConnectionState? = null, + iceState: VideoAnalyticsIceState? = null, peerConnectionState: PeerConnection.PeerConnectionState? = null, userSessionId: String? = null, screenShareAllowed: Boolean? = null, @@ -66,7 +67,7 @@ internal class ClientEventFactory(val sdkVersion: String, val userAgent: () -> S userId = StreamVideo.Companion.instanceOrNull()?.userId, callSessionId = callSessionId, elapsedTime = elapsedTime?.toInt(), - iceState = iceState?.name, + iceState = iceState?.text, outcome = outcome?.value, peerConnection = peerConnection?.value, previouslyConnectedTimestamp = null, diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt index bd82586114..49e9a84afd 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt @@ -21,6 +21,7 @@ import io.getstream.android.video.generated.apis.ProductvideoApi import io.getstream.android.video.generated.models.ClientEvent import io.getstream.log.taggedLogger import io.getstream.video.android.core.BuildConfig +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.dispatcher.EventDispatcher import io.getstream.video.android.core.analytics.reporting.dispatcher.ImmediateEventDispatcher @@ -284,9 +285,13 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, ) { + Log.d( + "Noob", + "[onPeerConnectionStateChanged], iceState: $iceState, peerConnectionState: $peerConnectionState, role: $role", + ) when (peerConnectionState) { PeerConnection.PeerConnectionState.CONNECTING -> { handleOnPeerConnectionConnectingState( @@ -302,38 +307,37 @@ internal class ClientEventReporter( peerConnectionState, ) } + PeerConnection.PeerConnectionState.CONNECTED -> { - iceState?.let { - handleOnPeerConnectionConnectedState( - peerConnectionHashCode, - callId, - callType, - joinStageAttemptId, - callSessionId, - sfuId, - joinReason, - role, - iceState, - peerConnectionState, - ) - } + handleOnPeerConnectionConnectedState( + peerConnectionHashCode, + callId, + callType, + joinStageAttemptId, + callSessionId, + sfuId, + joinReason, + role, + iceState, + peerConnectionState, + ) } + PeerConnection.PeerConnectionState.FAILED -> { - iceState?.let { - handleOnPeerConnectionFailedState( - peerConnectionHashCode, - callId, - callType, - joinStageAttemptId, - callSessionId, - sfuId, - joinReason, - role, - iceState, - peerConnectionState, - ) - } + handleOnPeerConnectionFailedState( + peerConnectionHashCode, + callId, + callType, + joinStageAttemptId, + callSessionId, + sfuId, + joinReason, + role, + iceState, + peerConnectionState, + ) } + else -> {} } } @@ -347,7 +351,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { val wasPrevConnected = pcEverConnected[role] != null @@ -397,7 +401,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { @@ -428,7 +432,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { val pcState = pcEventReporterStateHolder.map.remove(peerConnectionHashCode) ?: return @@ -458,7 +462,7 @@ internal class ClientEventReporter( callSessionId: String, joinReason: JoinReason, success: Boolean, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, failureReason: String? = null, failureCode: String? = null, @@ -573,8 +577,8 @@ internal class ClientEventReporter( } internal fun abortAllPostCallInFlight( - publisherIceState: PeerConnection.IceConnectionState?, - subscriberIceState: PeerConnection.IceConnectionState?, + publisherIceState: VideoAnalyticsIceState, + subscriberIceState: VideoAnalyticsIceState, failCode: String, failMessage: String, ) { diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt index 8c2eb1fb51..0adf12fdb4 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt @@ -21,6 +21,7 @@ import io.getstream.video.android.core.CallLeaveReason import io.getstream.video.android.core.ParticipantState import io.getstream.video.android.core.RealtimeConnection import io.getstream.video.android.core.UserActionCause +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.Stage import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason @@ -170,8 +171,8 @@ class CallAnalyticsTest { verify(exactly = 1) { reporter.abortAllPostCallInFlight( - PeerConnection.IceConnectionState.CONNECTED, - PeerConnection.IceConnectionState.DISCONNECTED, + VideoAnalyticsIceState.CONNECTED, + VideoAnalyticsIceState.NOT_CONNECTED, AnalyticsCallAbortReason.CLIENT_ABORTED.name, any(), ) diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt index 60a78e406b..1d892d90a0 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt @@ -32,6 +32,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals @@ -88,7 +89,7 @@ class PeerConnectionAnalyticsTest { analytics(CoroutineScope(Dispatchers.Unconfined)).onPeerConnectionStateChanged( peerConnectionHashCode = 42, role = PeerConnectionRole.SUBSCRIBE, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) @@ -102,17 +103,17 @@ class PeerConnectionAnalyticsTest { sfuId = "sfu-7", joinReason = JoinReason.ReJoin, role = PeerConnectionRole.SUBSCRIBE, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) } } @Test - fun `a connecting publisher marks the stage in progress and notifies the reporter`() = runTest { + fun `a connecting publisher reports its current ice state immediately and marks the stage in progress`() = runTest { val session = mockSession( publisherState = PeerConnection.PeerConnectionState.CONNECTING, - publisherIceState = PeerConnection.IceConnectionState.CHECKING, + publisherIceState = PeerConnection.IceConnectionState.CONNECTED, ) val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) @@ -130,13 +131,259 @@ class PeerConnectionAnalyticsTest { sfuId = any(), joinReason = any(), role = PeerConnectionRole.PUBLISH, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, + ) + } + scope.cancel() + } + + @Test + fun `a connected publisher whose ice never connects reports the current ice after the grace period`() = runTest { + val publisher = mockk() + every { publisher.state } returns + MutableStateFlow(PeerConnection.PeerConnectionState.CONNECTED) + // ICE stays DISCONNECTED (never reaches CONNECTED), so the grace period will elapse. + every { publisher.iceState } returns + MutableStateFlow(PeerConnection.IceConnectionState.DISCONNECTED) + + val subscriber = mockk() + every { subscriber.state } returns + MutableStateFlow(null) + every { subscriber.iceState } returns + MutableStateFlow(null) + + val session = mockk() + every { session.publisher } returns MutableStateFlow(publisher) + every { session.subscriber } returns MutableStateFlow(subscriber) + + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + // Stage is tracked immediately, but nothing is reported while the grace period is running. + assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) + verify(exactly = 0) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = any(), + callType = any(), + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = any(), + peerConnectionState = any(), + ) + } + + // Once the grace period elapses, the current (non-connected) ICE state is reported. + advanceUntilIdle() + + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, + ) + } + scope.cancel() + } + + @Test + fun `a connected publisher reports connected when ice connects within the grace period`() = runTest { + val publisher = mockk() + // ICE has not reached CONNECTED yet, so the connected peer connection keeps waiting. + val iceState = + MutableStateFlow( + PeerConnection.IceConnectionState.DISCONNECTED, + ) + every { publisher.state } returns + MutableStateFlow(PeerConnection.PeerConnectionState.CONNECTED) + every { publisher.iceState } returns iceState + + val subscriber = mockk() + every { subscriber.state } returns + MutableStateFlow(null) + every { subscriber.iceState } returns + MutableStateFlow(null) + + val session = mockk() + every { session.publisher } returns MutableStateFlow(publisher) + every { session.subscriber } returns MutableStateFlow(subscriber) + + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + verify(exactly = 0) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = any(), + callType = any(), + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = any(), + peerConnectionState = any(), + ) + } + + iceState.value = PeerConnection.IceConnectionState.CONNECTED + runCurrent() + + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, + ) + } + scope.cancel() + } + + @Test + fun `a connecting publisher with a new ice state reports not connected`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTING, + // NEW is allowed and maps to NOT_CONNECTED. + publisherIceState = PeerConnection.IceConnectionState.NEW, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + assertEquals(Stage.IN_PROGRESS, stateHolder.state.value.publisherStage) + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) } scope.cancel() } + @Test + fun `a failed ice state reports failed`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.FAILED, + publisherIceState = PeerConnection.IceConnectionState.FAILED, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.FAILED, + peerConnectionState = PeerConnection.PeerConnectionState.FAILED, + ) + } + scope.cancel() + } + + @Test + fun `toVideoAnalyticsIceState maps webrtc ice states to the analytics enum`() { + assertEquals( + VideoAnalyticsIceState.CONNECTED, + PeerConnection.IceConnectionState.CONNECTED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.CONNECTED, + PeerConnection.IceConnectionState.COMPLETED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.FAILED, + PeerConnection.IceConnectionState.FAILED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.NEW.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.CHECKING.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.DISCONNECTED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.CLOSED.toVideoAnalyticsIceState(), + ) + val nullIceState: PeerConnection.IceConnectionState? = null + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + nullIceState.toVideoAnalyticsIceState(), + ) + } + + @Test + fun `a connected publisher with ice already connected reports connected without waiting`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTED, + publisherIceState = PeerConnection.IceConnectionState.CONNECTED, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + // No time is advanced: the grace period must not delay an already-connected ICE. + runCurrent() + + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, + ) + } + scope.cancel() + } + @Test fun `a connected publisher resets the stage back to completed`() = runTest { val session = mockSession( diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt index c9802ebc62..411c7eec2e 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt @@ -17,6 +17,7 @@ package io.getstream.video.android.core.analytics.reporting import io.getstream.android.video.generated.models.ClientEvent +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.dispatcher.EventDispatcher import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason @@ -79,7 +80,7 @@ class ClientEventReporterTest { private fun pcStateChanged( pcState: PeerConnection.PeerConnectionState?, - iceState: PeerConnection.IceConnectionState? = null, + iceState: VideoAnalyticsIceState, role: PeerConnectionRole = PeerConnectionRole.PUBLISH, pcHashCode: Int = 100, ) = reporter.onPeerConnectionStateChanged( @@ -243,7 +244,7 @@ class ClientEventReporterTest { fun `CONNECTING opens a session and CONNECTED completes it as success`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) assertEquals(1, dispatcher.sent.size) @@ -255,7 +256,7 @@ class ClientEventReporterTest { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, ) assertEquals(2, dispatcher.sent.size) @@ -269,12 +270,12 @@ class ClientEventReporterTest { fun `FAILED completes the session as a connectivity failure`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.FAILED, - iceState = PeerConnection.IceConnectionState.FAILED, + iceState = VideoAnalyticsIceState.FAILED, ) val completed = dispatcher.sent.last() @@ -284,33 +285,41 @@ class ClientEventReporterTest { } @Test - fun `CONNECTED with a null ice state is ignored`() { + fun `CONNECTED completes the session as success even when ice is not connected`() { + // NOTE: the reporter routes purely on the peer-connection state, so a CONNECTED peer + // connection is reported as a success regardless of its ICE state (including NOT_CONNECTED). pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) - pcStateChanged(pcState = PeerConnection.PeerConnectionState.CONNECTED, iceState = null) + pcStateChanged( + pcState = PeerConnection.PeerConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, + ) - assertEquals(1, dispatcher.sent.size) + assertEquals(2, dispatcher.sent.size) + val completed = dispatcher.sent.last() + assertEquals(EventType.COMPLETED.value, completed.eventType) + assertEquals(EventOutcome.SUCCESS.value, completed.outcome) } @Test fun `a reconnect after CONNECTED is flagged as previously connected`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 2, ) @@ -323,7 +332,7 @@ class ClientEventReporterTest { fun `CONNECTED without an open session sends nothing`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, ) assertTrue(dispatcher.sent.isEmpty()) @@ -333,19 +342,19 @@ class ClientEventReporterTest { fun `sessions are completed per peer connection instance`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 2, ) val firstStageId = dispatcher.sent.first().stageId pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, pcHashCode = 1, ) @@ -359,13 +368,13 @@ class ClientEventReporterTest { fun `publisher and subscriber sessions are tracked independently`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, role = PeerConnectionRole.PUBLISH, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, role = PeerConnectionRole.SUBSCRIBE, pcHashCode = 2, ) @@ -384,8 +393,8 @@ class ClientEventReporterTest { dispatcher.sent.clear() reporter.abortAllPostCallInFlight( - publisherIceState = null, - subscriberIceState = null, + publisherIceState = VideoAnalyticsIceState.NOT_CONNECTED, + subscriberIceState = VideoAnalyticsIceState.NOT_CONNECTED, AnalyticsCallAbortReason.CLIENT_ABORTED.name, "user left the call", ) From ca4ca85b6b5045eabb74e26a19eae925630d9d5e Mon Sep 17 00:00:00 2001 From: rahullohra Date: Wed, 17 Jun 2026 16:22:48 +0530 Subject: [PATCH 2/6] improve: correctly update analytics stage --- .../call/observer/PeerConnectionAnalytics.kt | 56 +++++++++++-------- .../observer/PeerConnectionAnalyticsTest.kt | 10 +++- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index 5ae2a6b27e..c6c8747c3b 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -31,7 +31,6 @@ import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.mapLatest -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull import org.webrtc.PeerConnection @@ -88,9 +87,12 @@ internal class PeerConnectionAnalytics( /** * Observes a single peer connection ([connectionOf]) for analytics. * - * The peer-connection [StreamPeerConnection.state] drives the stage tracking: whenever it - * enters one of the [allowedPcStates], the stage is updated (this is independent of the ICE - * state, since the stage is also consumed by the call-leave flow). + * The stage is updated in lockstep with the reported event (in the terminal collector), not + * when the raw peer-connection state changes. It therefore stays IN_PROGRESS while the + * peer-connection session is open and only flips to COMPLETED once the completed event is + * actually emitted. This keeps the call-leave flow's "is a stage still in progress?" check + * correct during the CONNECTED grace wait, so leaving mid-grace still aborts the open session + * instead of treating it as already completed. * * Once the peer connection enters an allowed state, the ICE state reported alongside it depends * on the peer-connection state: @@ -117,13 +119,14 @@ internal class PeerConnectionAnalytics( connection.state .filter { allowedPcStates.contains(it) } .filterNotNull() - .onEach { pcState -> + .filter { + // Skip a peer-connection state that would only repeat an already-completed + // stage (e.g. a CONNECTED after a FAILED). Gating here — before the grace + // wait and the collector — means we don't do any work for such transitions. val existingStage = currentStage() - val newStage = getStage(pcState) + val newStage = getStage(it) val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) - if (newStage != null && !isExistingStageAndNewStageAreCompleted) { - updateStage(newStage) - } + !isExistingStageAndNewStageAreCompleted } .mapLatest { pcState -> val iceState = if (pcState == PeerConnection.PeerConnectionState.CONNECTED) { @@ -131,7 +134,8 @@ internal class PeerConnectionAnalytics( // connected peer connection; otherwise report the current ICE state. withTimeoutOrNull(ICE_CONNECTED_GRACE_MILLIS) { connection.iceState.first { - it == PeerConnection.IceConnectionState.CONNECTED + it == PeerConnection.IceConnectionState.CONNECTED || + it == PeerConnection.IceConnectionState.COMPLETED } } ?: connection.iceState.value } else { @@ -147,21 +151,23 @@ internal class PeerConnectionAnalytics( } .distinctUntilChanged() .collect { snapshot -> - onPeerConnectionStateChanged( - peerConnectionHashCode = snapshot.peerConnectionHashCode, - role = role, - iceState = snapshot.iceState, - peerConnectionState = snapshot.peerConnectionState, - ) + // Update the stage in lockstep with the reported event (not when the raw + // peer-connection state changed), so it only flips to COMPLETED once the completed + // event is actually emitted. The COMPLETED -> COMPLETED case is already dropped by + // the upstream filter, so no extra guard is needed here. + val pcAnalyticsStage = getStage(snapshot.peerConnectionState) + pcAnalyticsStage?.let { + updateStage(pcAnalyticsStage) + onPeerConnectionStateChanged( + peerConnectionHashCode = snapshot.peerConnectionHashCode, + role = role, + iceState = snapshot.iceState, + peerConnectionState = snapshot.peerConnectionState, + ) + } } } - private data class PeerConnectionSnapshot( - val peerConnectionHashCode: Int, - val peerConnectionState: PeerConnection.PeerConnectionState, - val iceState: VideoAnalyticsIceState, - ) - private fun getStage(peerConnectionState: PeerConnection.PeerConnectionState): Stage? { return when (peerConnectionState) { PeerConnection.PeerConnectionState.CONNECTING -> { @@ -220,6 +226,12 @@ internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): Vide } } +private data class PeerConnectionSnapshot( + val peerConnectionHashCode: Int, + val peerConnectionState: PeerConnection.PeerConnectionState, + val iceState: VideoAnalyticsIceState, +) + internal enum class VideoAnalyticsIceState(val text: String) { CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED") } diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt index 1d892d90a0..749a0f08e4 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt @@ -161,8 +161,10 @@ class PeerConnectionAnalyticsTest { analytics(scope).observePeerConnections(MutableStateFlow(session)) runCurrent() - // Stage is tracked immediately, but nothing is reported while the grace period is running. - assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) + // While the grace period is running nothing is reported, and the stage is NOT yet marked + // completed: the stage flips only when the completed event is actually emitted, so a leave + // mid-grace still sees the session as in progress. + assertEquals(Stage.NOT_STARTED, stateHolder.state.value.publisherStage) verify(exactly = 0) { reporter.onPeerConnectionStateChanged( peerConnectionHashCode = any(), @@ -178,9 +180,11 @@ class PeerConnectionAnalyticsTest { ) } - // Once the grace period elapses, the current (non-connected) ICE state is reported. + // Once the grace period elapses, the current (non-connected) ICE state is reported and the + // stage is marked completed in lockstep. advanceUntilIdle() + assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) verify(exactly = 1) { reporter.onPeerConnectionStateChanged( peerConnectionHashCode = any(), From b7d018c8b127238b11d2a65e603016b6ad319684 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Wed, 17 Jun 2026 17:12:38 +0530 Subject: [PATCH 3/6] improve: refactor --- .../android/core/analytics/reporting/ClientEventReporter.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt index 49e9a84afd..8f5294403a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt @@ -288,10 +288,6 @@ internal class ClientEventReporter( iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, ) { - Log.d( - "Noob", - "[onPeerConnectionStateChanged], iceState: $iceState, peerConnectionState: $peerConnectionState, role: $role", - ) when (peerConnectionState) { PeerConnection.PeerConnectionState.CONNECTING -> { handleOnPeerConnectionConnectingState( From d665edb551cf5ea0d53e159432948357c468189a Mon Sep 17 00:00:00 2001 From: rahullohra Date: Wed, 17 Jun 2026 18:56:30 +0530 Subject: [PATCH 4/6] fix: remove grace period to wait for ice state --- .../call/observer/PeerConnectionAnalytics.kt | 24 +--- .../observer/PeerConnectionAnalyticsTest.kt | 113 ++---------------- 2 files changed, 9 insertions(+), 128 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index c6c8747c3b..8685ea67f5 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -28,11 +28,9 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull -import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.mapLatest import kotlinx.coroutines.launch -import kotlinx.coroutines.withTimeoutOrNull import org.webrtc.PeerConnection internal class PeerConnectionAnalytics( @@ -45,13 +43,6 @@ internal class PeerConnectionAnalytics( val stateHolder: PeerConnectionAnalyticsStateHolder = PeerConnectionAnalyticsStateHolder(), ) { - companion object { - /** - * How long a CONNECTED peer connection waits for its ICE state to reach - * [PeerConnection.IceConnectionState.CONNECTED] before reporting the current ICE state. - */ - private const val ICE_CONNECTED_GRACE_MILLIS = 2_000L - } val allowedPcStates = listOf( PeerConnection.PeerConnectionState.CONNECTING, PeerConnection.PeerConnectionState.FAILED, @@ -129,23 +120,10 @@ internal class PeerConnectionAnalytics( !isExistingStageAndNewStageAreCompleted } .mapLatest { pcState -> - val iceState = if (pcState == PeerConnection.PeerConnectionState.CONNECTED) { - // Give ICE a grace period to reach CONNECTED before reporting the - // connected peer connection; otherwise report the current ICE state. - withTimeoutOrNull(ICE_CONNECTED_GRACE_MILLIS) { - connection.iceState.first { - it == PeerConnection.IceConnectionState.CONNECTED || - it == PeerConnection.IceConnectionState.COMPLETED - } - } ?: connection.iceState.value - } else { - // CONNECTING / FAILED: report the current ICE state immediately. - connection.iceState.value - } PeerConnectionSnapshot( connection.hashCode(), pcState, - iceState.toVideoAnalyticsIceState(), + connection.iceState.value.toVideoAnalyticsIceState(), ) } } diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt index 749a0f08e4..4f74c1e8d2 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt @@ -32,7 +32,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.UnconfinedTestDispatcher -import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import org.junit.Assert.assertEquals @@ -139,51 +138,17 @@ class PeerConnectionAnalyticsTest { } @Test - fun `a connected publisher whose ice never connects reports the current ice after the grace period`() = runTest { - val publisher = mockk() - every { publisher.state } returns - MutableStateFlow(PeerConnection.PeerConnectionState.CONNECTED) - // ICE stays DISCONNECTED (never reaches CONNECTED), so the grace period will elapse. - every { publisher.iceState } returns - MutableStateFlow(PeerConnection.IceConnectionState.DISCONNECTED) - - val subscriber = mockk() - every { subscriber.state } returns - MutableStateFlow(null) - every { subscriber.iceState } returns - MutableStateFlow(null) - - val session = mockk() - every { session.publisher } returns MutableStateFlow(publisher) - every { session.subscriber } returns MutableStateFlow(subscriber) - + fun `a connected publisher whose ice is not connected reports not connected immediately`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTED, + // ICE has not reached CONNECTED, so the reported ICE is the current (non-connected) one. + publisherIceState = PeerConnection.IceConnectionState.DISCONNECTED, + ) val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + analytics(scope).observePeerConnections(MutableStateFlow(session)) runCurrent() - // While the grace period is running nothing is reported, and the stage is NOT yet marked - // completed: the stage flips only when the completed event is actually emitted, so a leave - // mid-grace still sees the session as in progress. - assertEquals(Stage.NOT_STARTED, stateHolder.state.value.publisherStage) - verify(exactly = 0) { - reporter.onPeerConnectionStateChanged( - peerConnectionHashCode = any(), - callId = any(), - callType = any(), - joinStageAttemptId = any(), - callSessionId = any(), - sfuId = any(), - joinReason = any(), - role = PeerConnectionRole.PUBLISH, - iceState = any(), - peerConnectionState = any(), - ) - } - - // Once the grace period elapses, the current (non-connected) ICE state is reported and the - // stage is marked completed in lockstep. - advanceUntilIdle() - assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) verify(exactly = 1) { reporter.onPeerConnectionStateChanged( @@ -202,67 +167,6 @@ class PeerConnectionAnalyticsTest { scope.cancel() } - @Test - fun `a connected publisher reports connected when ice connects within the grace period`() = runTest { - val publisher = mockk() - // ICE has not reached CONNECTED yet, so the connected peer connection keeps waiting. - val iceState = - MutableStateFlow( - PeerConnection.IceConnectionState.DISCONNECTED, - ) - every { publisher.state } returns - MutableStateFlow(PeerConnection.PeerConnectionState.CONNECTED) - every { publisher.iceState } returns iceState - - val subscriber = mockk() - every { subscriber.state } returns - MutableStateFlow(null) - every { subscriber.iceState } returns - MutableStateFlow(null) - - val session = mockk() - every { session.publisher } returns MutableStateFlow(publisher) - every { session.subscriber } returns MutableStateFlow(subscriber) - - val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) - analytics(scope).observePeerConnections(MutableStateFlow(session)) - runCurrent() - - verify(exactly = 0) { - reporter.onPeerConnectionStateChanged( - peerConnectionHashCode = any(), - callId = any(), - callType = any(), - joinStageAttemptId = any(), - callSessionId = any(), - sfuId = any(), - joinReason = any(), - role = PeerConnectionRole.PUBLISH, - iceState = any(), - peerConnectionState = any(), - ) - } - - iceState.value = PeerConnection.IceConnectionState.CONNECTED - runCurrent() - - verify(exactly = 1) { - reporter.onPeerConnectionStateChanged( - peerConnectionHashCode = any(), - callId = "call-1", - callType = "default", - joinStageAttemptId = any(), - callSessionId = any(), - sfuId = any(), - joinReason = any(), - role = PeerConnectionRole.PUBLISH, - iceState = VideoAnalyticsIceState.CONNECTED, - peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, - ) - } - scope.cancel() - } - @Test fun `a connecting publisher with a new ice state reports not connected`() = runTest { val session = mockSession( @@ -360,7 +264,7 @@ class PeerConnectionAnalyticsTest { } @Test - fun `a connected publisher with ice already connected reports connected without waiting`() = runTest { + fun `a connected publisher with connected ice reports connected`() = runTest { val session = mockSession( publisherState = PeerConnection.PeerConnectionState.CONNECTED, publisherIceState = PeerConnection.IceConnectionState.CONNECTED, @@ -368,7 +272,6 @@ class PeerConnectionAnalyticsTest { val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) analytics(scope).observePeerConnections(MutableStateFlow(session)) - // No time is advanced: the grace period must not delay an already-connected ICE. runCurrent() verify(exactly = 1) { From a40f57a7b230093c48d70b883f6ff69ce824e2ee Mon Sep 17 00:00:00 2001 From: rahullohra Date: Wed, 17 Jun 2026 19:01:33 +0530 Subject: [PATCH 5/6] fix: refactor --- .../call/observer/PeerConnectionAnalytics.kt | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index 8685ea67f5..dca0257305 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -29,7 +29,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import org.webrtc.PeerConnection @@ -78,24 +78,18 @@ internal class PeerConnectionAnalytics( /** * Observes a single peer connection ([connectionOf]) for analytics. * + * Each allowed peer-connection state ([allowedPcStates]) is reported together with the ICE + * state as it stands at that moment ([StreamPeerConnection.iceState] value, mapped via + * [toVideoAnalyticsIceState]). So a CONNECTED peer connection reports CONNECTED only when its + * ICE is already connected, otherwise NOT_CONNECTED. The upstream filter drops a transition + * that would merely repeat an already-completed stage (e.g. CONNECTED after FAILED), and + * [distinctUntilChanged] avoids emitting the same combination twice. + * * The stage is updated in lockstep with the reported event (in the terminal collector), not * when the raw peer-connection state changes. It therefore stays IN_PROGRESS while the * peer-connection session is open and only flips to COMPLETED once the completed event is - * actually emitted. This keeps the call-leave flow's "is a stage still in progress?" check - * correct during the CONNECTED grace wait, so leaving mid-grace still aborts the open session - * instead of treating it as already completed. - * - * Once the peer connection enters an allowed state, the ICE state reported alongside it depends - * on the peer-connection state: - * - [PeerConnection.PeerConnectionState.CONNECTING] / [PeerConnection.PeerConnectionState.FAILED]: - * report the current ICE state immediately. - * - [PeerConnection.PeerConnectionState.CONNECTED]: wait up to [ICE_CONNECTED_GRACE_MILLIS] for - * the ICE state to become [PeerConnection.IceConnectionState.CONNECTED]; if it does not, report - * whatever the ICE state is when the grace period elapses. - * - * [mapLatest] makes the ICE resolution follow the latest peer-connection state (a newer - * peer-connection state cancels an in-progress grace wait), and [distinctUntilChanged] avoids - * emitting the same combination twice. + * actually emitted, which keeps the call-leave flow's "is a stage still in progress?" check + * correct. */ private fun CoroutineScope.observeConnection( session: StateFlow, @@ -112,14 +106,14 @@ internal class PeerConnectionAnalytics( .filterNotNull() .filter { // Skip a peer-connection state that would only repeat an already-completed - // stage (e.g. a CONNECTED after a FAILED). Gating here — before the grace - // wait and the collector — means we don't do any work for such transitions. + // stage (e.g. a CONNECTED after a FAILED). Gating here — before the ICE read + // and the collector — means we don't do any work for such transitions. val existingStage = currentStage() val newStage = getStage(it) val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) !isExistingStageAndNewStageAreCompleted } - .mapLatest { pcState -> + .map { pcState -> PeerConnectionSnapshot( connection.hashCode(), pcState, From 779ae00487e87354f6dddfb18fe5a9ec2d513738 Mon Sep 17 00:00:00 2001 From: rahullohra Date: Fri, 19 Jun 2026 17:45:15 +0530 Subject: [PATCH 6/6] fix: refactor --- .../call/observer/PeerConnectionAnalytics.kt | 15 -------- .../call/observer/VideoAnalyticsIceState.kt | 35 +++++++++++++++++++ 2 files changed, 35 insertions(+), 15 deletions(-) create mode 100644 stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index dca0257305..e01dfe68ce 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -186,24 +186,9 @@ internal class PeerConnectionAnalytics( observePeerConnections(session) } } -internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): VideoAnalyticsIceState { - return when (this) { - PeerConnection.IceConnectionState.CONNECTED, - PeerConnection.IceConnectionState.COMPLETED, - -> VideoAnalyticsIceState.CONNECTED - - PeerConnection.IceConnectionState.FAILED -> VideoAnalyticsIceState.FAILED - - else -> VideoAnalyticsIceState.NOT_CONNECTED - } -} private data class PeerConnectionSnapshot( val peerConnectionHashCode: Int, val peerConnectionState: PeerConnection.PeerConnectionState, val iceState: VideoAnalyticsIceState, ) - -internal enum class VideoAnalyticsIceState(val text: String) { - CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED") -} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt new file mode 100644 index 0000000000..ad50ce8009 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.analytics.call.observer + +import org.webrtc.PeerConnection + +internal enum class VideoAnalyticsIceState(val text: String) { + CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED") +} + +internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): VideoAnalyticsIceState { + return when (this) { + PeerConnection.IceConnectionState.CONNECTED, + PeerConnection.IceConnectionState.COMPLETED, + -> VideoAnalyticsIceState.CONNECTED + + PeerConnection.IceConnectionState.FAILED -> VideoAnalyticsIceState.FAILED + + else -> VideoAnalyticsIceState.NOT_CONNECTED + } +}