diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt index be91a5909c..ea046203f2 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/CallAnalytics.kt @@ -29,7 +29,9 @@ import io.getstream.video.android.core.analytics.call.observer.PeerConnectionAna import io.getstream.video.android.core.analytics.call.observer.SfuAnalytics import io.getstream.video.android.core.analytics.call.observer.SfuAnalyticsStateHolder import io.getstream.video.android.core.analytics.call.observer.VideoAnalytics +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.Stage +import io.getstream.video.android.core.analytics.call.observer.toVideoAnalyticsIceState import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason import io.getstream.video.android.core.call.RtcSession @@ -130,8 +132,12 @@ internal class CallAnalytics( Pair(AnalyticsCallAbortReason.CUSTOM, callLeaveReason.message) } } - val publisherIceState = session.value?.publisher?.value?.iceState?.value - val subscriberIceState = session.value?.subscriber?.value?.iceState?.value + val publisherIceState = + session.value?.publisher?.value?.iceState?.value?.toVideoAnalyticsIceState() + ?: VideoAnalyticsIceState.NOT_CONNECTED + val subscriberIceState = + session.value?.subscriber?.value?.iceState?.value?.toVideoAnalyticsIceState() + ?: VideoAnalyticsIceState.NOT_CONNECTED eventReporter.abortAllPostCallInFlight( publisherIceState, subscriberIceState, diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt index 3f491585ad..e01dfe68ce 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalytics.kt @@ -21,11 +21,15 @@ import io.getstream.video.android.core.analytics.call.observer.model.Stage import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.PeerConnectionRole import io.getstream.video.android.core.call.RtcSession +import io.getstream.video.android.core.call.connection.StreamPeerConnection import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import org.webrtc.PeerConnection @@ -39,7 +43,7 @@ internal class PeerConnectionAnalytics( val stateHolder: PeerConnectionAnalyticsStateHolder = PeerConnectionAnalyticsStateHolder(), ) { - val allowedStates = listOf( + val allowedPcStates = listOf( PeerConnection.PeerConnectionState.CONNECTING, PeerConnection.PeerConnectionState.FAILED, PeerConnection.PeerConnectionState.CONNECTED, @@ -49,71 +53,91 @@ internal class PeerConnectionAnalytics( stateHolder.state.value.peerConnectionObserverJob?.cancel() val peerConnectionObserverJob = observerScope.launch { stateHolder.state.value.publisherJob?.cancel() - val publisherJob = launch { - session.filterNotNull() - .flatMapLatest { it.publisher.filterNotNull() } - .flatMapLatest { - it.state.filter { it -> - allowedStates.contains( - it, - ) - }.filterNotNull() - }.filter { - val existingStage = stateHolder.state.value.publisherStage - val newStage = getStage(it) - val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) - !isExistingStageAndNewStageAreCompleted - } - .collect { state -> - val publisherStage = getStage(state) - publisherStage?.let { - stateHolder.updatePublisherStage(publisherStage) - observerScope.launch { - session.value?.publisher?.value?.let { publisher -> - onPeerConnectionStateChanged( - publisher.hashCode(), - role = PeerConnectionRole.PUBLISH, - iceState = publisher.iceState.value, - peerConnectionState = state, - ) - } - } - } - } - } + val publisherJob = observeConnection( + session = session, + role = PeerConnectionRole.PUBLISH, + connectionOf = { it.publisher }, + currentStage = { stateHolder.state.value.publisherStage }, + updateStage = stateHolder::updatePublisherStage, + ) stateHolder.updatePublisherJob(publisherJob) + stateHolder.state.value.subscriberJob?.cancel() - val subscriberJob = launch { - session.filterNotNull() - .flatMapLatest { it.subscriber.filterNotNull() } - .flatMapLatest { - it.state.filter { it -> allowedStates.contains(it) }.filterNotNull() - }.filter { - val existingStage = stateHolder.state.value.subscriberStage + val subscriberJob = observeConnection( + session = session, + role = PeerConnectionRole.SUBSCRIBE, + connectionOf = { it.subscriber }, + currentStage = { stateHolder.state.value.subscriberStage }, + updateStage = stateHolder::updateSubscriberStage, + ) + stateHolder.updateSubscriberJob(subscriberJob) + } + stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob) + } + + /** + * Observes a single peer connection ([connectionOf]) for analytics. + * + * Each allowed peer-connection state ([allowedPcStates]) is reported together with the ICE + * state as it stands at that moment ([StreamPeerConnection.iceState] value, mapped via + * [toVideoAnalyticsIceState]). So a CONNECTED peer connection reports CONNECTED only when its + * ICE is already connected, otherwise NOT_CONNECTED. The upstream filter drops a transition + * that would merely repeat an already-completed stage (e.g. CONNECTED after FAILED), and + * [distinctUntilChanged] avoids emitting the same combination twice. + * + * The stage is updated in lockstep with the reported event (in the terminal collector), not + * when the raw peer-connection state changes. It therefore stays IN_PROGRESS while the + * peer-connection session is open and only flips to COMPLETED once the completed event is + * actually emitted, which keeps the call-leave flow's "is a stage still in progress?" check + * correct. + */ + private fun CoroutineScope.observeConnection( + session: StateFlow, + role: PeerConnectionRole, + connectionOf: (RtcSession) -> StateFlow, + currentStage: () -> Stage, + updateStage: (Stage) -> Unit, + ): Job = launch { + session.filterNotNull() + .flatMapLatest { connectionOf(it).filterNotNull() } + .flatMapLatest { connection -> + connection.state + .filter { allowedPcStates.contains(it) } + .filterNotNull() + .filter { + // Skip a peer-connection state that would only repeat an already-completed + // stage (e.g. a CONNECTED after a FAILED). Gating here — before the ICE read + // and the collector — means we don't do any work for such transitions. + val existingStage = currentStage() val newStage = getStage(it) val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED) !isExistingStageAndNewStageAreCompleted } - .collect { state -> - val stage = getStage(state) - stage?.let { - stateHolder.updateSubscriberStage(stage) - observerScope.launch { - session.value?.subscriber?.value?.let { subscriber -> - onPeerConnectionStateChanged( - subscriber.hashCode(), - role = PeerConnectionRole.SUBSCRIBE, - iceState = subscriber.iceState.value, - peerConnectionState = state, - ) - } - } - } + .map { pcState -> + PeerConnectionSnapshot( + connection.hashCode(), + pcState, + connection.iceState.value.toVideoAnalyticsIceState(), + ) } } - stateHolder.updateSubscriberJob(subscriberJob) - } - stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob) + .distinctUntilChanged() + .collect { snapshot -> + // Update the stage in lockstep with the reported event (not when the raw + // peer-connection state changed), so it only flips to COMPLETED once the completed + // event is actually emitted. The COMPLETED -> COMPLETED case is already dropped by + // the upstream filter, so no extra guard is needed here. + val pcAnalyticsStage = getStage(snapshot.peerConnectionState) + pcAnalyticsStage?.let { + updateStage(pcAnalyticsStage) + onPeerConnectionStateChanged( + peerConnectionHashCode = snapshot.peerConnectionHashCode, + role = role, + iceState = snapshot.iceState, + peerConnectionState = snapshot.peerConnectionState, + ) + } + } } private fun getStage(peerConnectionState: PeerConnection.PeerConnectionState): Stage? { @@ -135,7 +159,7 @@ internal class PeerConnectionAnalytics( internal fun onPeerConnectionStateChanged( peerConnectionHashCode: Int, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, ) { reporter.onPeerConnectionStateChanged( @@ -162,3 +186,9 @@ internal class PeerConnectionAnalytics( observePeerConnections(session) } } + +private data class PeerConnectionSnapshot( + val peerConnectionHashCode: Int, + val peerConnectionState: PeerConnection.PeerConnectionState, + val iceState: VideoAnalyticsIceState, +) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt new file mode 100644 index 0000000000..ad50ce8009 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/call/observer/VideoAnalyticsIceState.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.analytics.call.observer + +import org.webrtc.PeerConnection + +internal enum class VideoAnalyticsIceState(val text: String) { + CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED") +} + +internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): VideoAnalyticsIceState { + return when (this) { + PeerConnection.IceConnectionState.CONNECTED, + PeerConnection.IceConnectionState.COMPLETED, + -> VideoAnalyticsIceState.CONNECTED + + PeerConnection.IceConnectionState.FAILED -> VideoAnalyticsIceState.FAILED + + else -> VideoAnalyticsIceState.NOT_CONNECTED + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt index 0396d3c74f..4dd8cf56b8 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt @@ -18,6 +18,7 @@ package io.getstream.video.android.core.analytics.reporting import io.getstream.android.video.generated.models.ClientEvent import io.getstream.video.android.core.StreamVideo +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.model.EventOutcome import io.getstream.video.android.core.analytics.reporting.model.EventStage @@ -46,7 +47,7 @@ internal class ClientEventFactory(val sdkVersion: String, val userAgent: () -> S sfuId: String? = null, peerConnection: PeerConnectionRole? = null, wasPreviouslyConnected: Boolean? = null, - iceState: PeerConnection.IceConnectionState? = null, + iceState: VideoAnalyticsIceState? = null, peerConnectionState: PeerConnection.PeerConnectionState? = null, userSessionId: String? = null, screenShareAllowed: Boolean? = null, @@ -66,7 +67,7 @@ internal class ClientEventFactory(val sdkVersion: String, val userAgent: () -> S userId = StreamVideo.Companion.instanceOrNull()?.userId, callSessionId = callSessionId, elapsedTime = elapsedTime?.toInt(), - iceState = iceState?.name, + iceState = iceState?.text, outcome = outcome?.value, peerConnection = peerConnection?.value, previouslyConnectedTimestamp = null, diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt index bd82586114..8f5294403a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporter.kt @@ -21,6 +21,7 @@ import io.getstream.android.video.generated.apis.ProductvideoApi import io.getstream.android.video.generated.models.ClientEvent import io.getstream.log.taggedLogger import io.getstream.video.android.core.BuildConfig +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.dispatcher.EventDispatcher import io.getstream.video.android.core.analytics.reporting.dispatcher.ImmediateEventDispatcher @@ -284,7 +285,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, ) { when (peerConnectionState) { @@ -302,38 +303,37 @@ internal class ClientEventReporter( peerConnectionState, ) } + PeerConnection.PeerConnectionState.CONNECTED -> { - iceState?.let { - handleOnPeerConnectionConnectedState( - peerConnectionHashCode, - callId, - callType, - joinStageAttemptId, - callSessionId, - sfuId, - joinReason, - role, - iceState, - peerConnectionState, - ) - } + handleOnPeerConnectionConnectedState( + peerConnectionHashCode, + callId, + callType, + joinStageAttemptId, + callSessionId, + sfuId, + joinReason, + role, + iceState, + peerConnectionState, + ) } + PeerConnection.PeerConnectionState.FAILED -> { - iceState?.let { - handleOnPeerConnectionFailedState( - peerConnectionHashCode, - callId, - callType, - joinStageAttemptId, - callSessionId, - sfuId, - joinReason, - role, - iceState, - peerConnectionState, - ) - } + handleOnPeerConnectionFailedState( + peerConnectionHashCode, + callId, + callType, + joinStageAttemptId, + callSessionId, + sfuId, + joinReason, + role, + iceState, + peerConnectionState, + ) } + else -> {} } } @@ -347,7 +347,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState?, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { val wasPrevConnected = pcEverConnected[role] != null @@ -397,7 +397,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { @@ -428,7 +428,7 @@ internal class ClientEventReporter( sfuId: String, joinReason: JoinReason, role: PeerConnectionRole, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState, ) { val pcState = pcEventReporterStateHolder.map.remove(peerConnectionHashCode) ?: return @@ -458,7 +458,7 @@ internal class ClientEventReporter( callSessionId: String, joinReason: JoinReason, success: Boolean, - iceState: PeerConnection.IceConnectionState, + iceState: VideoAnalyticsIceState, peerConnectionState: PeerConnection.PeerConnectionState?, failureReason: String? = null, failureCode: String? = null, @@ -573,8 +573,8 @@ internal class ClientEventReporter( } internal fun abortAllPostCallInFlight( - publisherIceState: PeerConnection.IceConnectionState?, - subscriberIceState: PeerConnection.IceConnectionState?, + publisherIceState: VideoAnalyticsIceState, + subscriberIceState: VideoAnalyticsIceState, failCode: String, failMessage: String, ) { diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt index 8c2eb1fb51..0adf12fdb4 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/CallAnalyticsTest.kt @@ -21,6 +21,7 @@ import io.getstream.video.android.core.CallLeaveReason import io.getstream.video.android.core.ParticipantState import io.getstream.video.android.core.RealtimeConnection import io.getstream.video.android.core.UserActionCause +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.Stage import io.getstream.video.android.core.analytics.reporting.ClientEventReporter import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason @@ -170,8 +171,8 @@ class CallAnalyticsTest { verify(exactly = 1) { reporter.abortAllPostCallInFlight( - PeerConnection.IceConnectionState.CONNECTED, - PeerConnection.IceConnectionState.DISCONNECTED, + VideoAnalyticsIceState.CONNECTED, + VideoAnalyticsIceState.NOT_CONNECTED, AnalyticsCallAbortReason.CLIENT_ABORTED.name, any(), ) diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt index 60a78e406b..4f74c1e8d2 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/call/observer/PeerConnectionAnalyticsTest.kt @@ -88,7 +88,7 @@ class PeerConnectionAnalyticsTest { analytics(CoroutineScope(Dispatchers.Unconfined)).onPeerConnectionStateChanged( peerConnectionHashCode = 42, role = PeerConnectionRole.SUBSCRIBE, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) @@ -102,17 +102,17 @@ class PeerConnectionAnalyticsTest { sfuId = "sfu-7", joinReason = JoinReason.ReJoin, role = PeerConnectionRole.SUBSCRIBE, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) } } @Test - fun `a connecting publisher marks the stage in progress and notifies the reporter`() = runTest { + fun `a connecting publisher reports its current ice state immediately and marks the stage in progress`() = runTest { val session = mockSession( publisherState = PeerConnection.PeerConnectionState.CONNECTING, - publisherIceState = PeerConnection.IceConnectionState.CHECKING, + publisherIceState = PeerConnection.IceConnectionState.CONNECTED, ) val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) @@ -130,13 +130,167 @@ class PeerConnectionAnalyticsTest { sfuId = any(), joinReason = any(), role = PeerConnectionRole.PUBLISH, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, + ) + } + scope.cancel() + } + + @Test + fun `a connected publisher whose ice is not connected reports not connected immediately`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTED, + // ICE has not reached CONNECTED, so the reported ICE is the current (non-connected) one. + publisherIceState = PeerConnection.IceConnectionState.DISCONNECTED, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, + ) + } + scope.cancel() + } + + @Test + fun `a connecting publisher with a new ice state reports not connected`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTING, + // NEW is allowed and maps to NOT_CONNECTED. + publisherIceState = PeerConnection.IceConnectionState.NEW, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + assertEquals(Stage.IN_PROGRESS, stateHolder.state.value.publisherStage) + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, peerConnectionState = PeerConnection.PeerConnectionState.CONNECTING, ) } scope.cancel() } + @Test + fun `a failed ice state reports failed`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.FAILED, + publisherIceState = PeerConnection.IceConnectionState.FAILED, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + assertEquals(Stage.COMPLETED, stateHolder.state.value.publisherStage) + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.FAILED, + peerConnectionState = PeerConnection.PeerConnectionState.FAILED, + ) + } + scope.cancel() + } + + @Test + fun `toVideoAnalyticsIceState maps webrtc ice states to the analytics enum`() { + assertEquals( + VideoAnalyticsIceState.CONNECTED, + PeerConnection.IceConnectionState.CONNECTED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.CONNECTED, + PeerConnection.IceConnectionState.COMPLETED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.FAILED, + PeerConnection.IceConnectionState.FAILED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.NEW.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.CHECKING.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.DISCONNECTED.toVideoAnalyticsIceState(), + ) + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + PeerConnection.IceConnectionState.CLOSED.toVideoAnalyticsIceState(), + ) + val nullIceState: PeerConnection.IceConnectionState? = null + assertEquals( + VideoAnalyticsIceState.NOT_CONNECTED, + nullIceState.toVideoAnalyticsIceState(), + ) + } + + @Test + fun `a connected publisher with connected ice reports connected`() = runTest { + val session = mockSession( + publisherState = PeerConnection.PeerConnectionState.CONNECTED, + publisherIceState = PeerConnection.IceConnectionState.CONNECTED, + ) + val scope = CoroutineScope(UnconfinedTestDispatcher(testScheduler)) + + analytics(scope).observePeerConnections(MutableStateFlow(session)) + runCurrent() + + verify(exactly = 1) { + reporter.onPeerConnectionStateChanged( + peerConnectionHashCode = any(), + callId = "call-1", + callType = "default", + joinStageAttemptId = any(), + callSessionId = any(), + sfuId = any(), + joinReason = any(), + role = PeerConnectionRole.PUBLISH, + iceState = VideoAnalyticsIceState.CONNECTED, + peerConnectionState = PeerConnection.PeerConnectionState.CONNECTED, + ) + } + scope.cancel() + } + @Test fun `a connected publisher resets the stage back to completed`() = runTest { val session = mockSession( diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt index c9802ebc62..411c7eec2e 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventReporterTest.kt @@ -17,6 +17,7 @@ package io.getstream.video.android.core.analytics.reporting import io.getstream.android.video.generated.models.ClientEvent +import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState import io.getstream.video.android.core.analytics.call.observer.model.JoinReason import io.getstream.video.android.core.analytics.reporting.dispatcher.EventDispatcher import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason @@ -79,7 +80,7 @@ class ClientEventReporterTest { private fun pcStateChanged( pcState: PeerConnection.PeerConnectionState?, - iceState: PeerConnection.IceConnectionState? = null, + iceState: VideoAnalyticsIceState, role: PeerConnectionRole = PeerConnectionRole.PUBLISH, pcHashCode: Int = 100, ) = reporter.onPeerConnectionStateChanged( @@ -243,7 +244,7 @@ class ClientEventReporterTest { fun `CONNECTING opens a session and CONNECTED completes it as success`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) assertEquals(1, dispatcher.sent.size) @@ -255,7 +256,7 @@ class ClientEventReporterTest { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, ) assertEquals(2, dispatcher.sent.size) @@ -269,12 +270,12 @@ class ClientEventReporterTest { fun `FAILED completes the session as a connectivity failure`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.FAILED, - iceState = PeerConnection.IceConnectionState.FAILED, + iceState = VideoAnalyticsIceState.FAILED, ) val completed = dispatcher.sent.last() @@ -284,33 +285,41 @@ class ClientEventReporterTest { } @Test - fun `CONNECTED with a null ice state is ignored`() { + fun `CONNECTED completes the session as success even when ice is not connected`() { + // NOTE: the reporter routes purely on the peer-connection state, so a CONNECTED peer + // connection is reported as a success regardless of its ICE state (including NOT_CONNECTED). pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, ) - pcStateChanged(pcState = PeerConnection.PeerConnectionState.CONNECTED, iceState = null) + pcStateChanged( + pcState = PeerConnection.PeerConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, + ) - assertEquals(1, dispatcher.sent.size) + assertEquals(2, dispatcher.sent.size) + val completed = dispatcher.sent.last() + assertEquals(EventType.COMPLETED.value, completed.eventType) + assertEquals(EventOutcome.SUCCESS.value, completed.outcome) } @Test fun `a reconnect after CONNECTED is flagged as previously connected`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 2, ) @@ -323,7 +332,7 @@ class ClientEventReporterTest { fun `CONNECTED without an open session sends nothing`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, ) assertTrue(dispatcher.sent.isEmpty()) @@ -333,19 +342,19 @@ class ClientEventReporterTest { fun `sessions are completed per peer connection instance`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, pcHashCode = 2, ) val firstStageId = dispatcher.sent.first().stageId pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTED, - iceState = PeerConnection.IceConnectionState.CONNECTED, + iceState = VideoAnalyticsIceState.CONNECTED, pcHashCode = 1, ) @@ -359,13 +368,13 @@ class ClientEventReporterTest { fun `publisher and subscriber sessions are tracked independently`() { pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, role = PeerConnectionRole.PUBLISH, pcHashCode = 1, ) pcStateChanged( pcState = PeerConnection.PeerConnectionState.CONNECTING, - iceState = PeerConnection.IceConnectionState.CHECKING, + iceState = VideoAnalyticsIceState.NOT_CONNECTED, role = PeerConnectionRole.SUBSCRIBE, pcHashCode = 2, ) @@ -384,8 +393,8 @@ class ClientEventReporterTest { dispatcher.sent.clear() reporter.abortAllPostCallInFlight( - publisherIceState = null, - subscriberIceState = null, + publisherIceState = VideoAnalyticsIceState.NOT_CONNECTED, + subscriberIceState = VideoAnalyticsIceState.NOT_CONNECTED, AnalyticsCallAbortReason.CLIENT_ABORTED.name, "user left the call", )