Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import io.getstream.video.android.core.analytics.call.observer.PeerConnectionAna
import io.getstream.video.android.core.analytics.call.observer.SfuAnalytics
import io.getstream.video.android.core.analytics.call.observer.SfuAnalyticsStateHolder
import io.getstream.video.android.core.analytics.call.observer.VideoAnalytics
import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState
import io.getstream.video.android.core.analytics.call.observer.model.Stage
import io.getstream.video.android.core.analytics.call.observer.toVideoAnalyticsIceState
import io.getstream.video.android.core.analytics.reporting.ClientEventReporter
import io.getstream.video.android.core.analytics.reporting.model.AnalyticsCallAbortReason
import io.getstream.video.android.core.call.RtcSession
Expand Down Expand Up @@ -130,8 +132,12 @@ internal class CallAnalytics(
Pair(AnalyticsCallAbortReason.CUSTOM, callLeaveReason.message)
}
}
val publisherIceState = session.value?.publisher?.value?.iceState?.value
val subscriberIceState = session.value?.subscriber?.value?.iceState?.value
val publisherIceState =
session.value?.publisher?.value?.iceState?.value?.toVideoAnalyticsIceState()
?: VideoAnalyticsIceState.NOT_CONNECTED
val subscriberIceState =
session.value?.subscriber?.value?.iceState?.value?.toVideoAnalyticsIceState()
?: VideoAnalyticsIceState.NOT_CONNECTED
eventReporter.abortAllPostCallInFlight(
publisherIceState,
subscriberIceState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import io.getstream.video.android.core.analytics.call.observer.model.Stage
import io.getstream.video.android.core.analytics.reporting.ClientEventReporter
import io.getstream.video.android.core.analytics.reporting.model.PeerConnectionRole
import io.getstream.video.android.core.call.RtcSession
import io.getstream.video.android.core.call.connection.StreamPeerConnection
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import org.webrtc.PeerConnection

Expand All @@ -39,7 +43,7 @@ internal class PeerConnectionAnalytics(
val stateHolder: PeerConnectionAnalyticsStateHolder = PeerConnectionAnalyticsStateHolder(),
) {

val allowedStates = listOf(
val allowedPcStates = listOf(
PeerConnection.PeerConnectionState.CONNECTING,
PeerConnection.PeerConnectionState.FAILED,
PeerConnection.PeerConnectionState.CONNECTED,
Expand All @@ -49,71 +53,91 @@ internal class PeerConnectionAnalytics(
stateHolder.state.value.peerConnectionObserverJob?.cancel()
val peerConnectionObserverJob = observerScope.launch {
stateHolder.state.value.publisherJob?.cancel()
val publisherJob = launch {
session.filterNotNull()
.flatMapLatest { it.publisher.filterNotNull() }
.flatMapLatest {
it.state.filter { it ->
allowedStates.contains(
it,
)
}.filterNotNull()
}.filter {
val existingStage = stateHolder.state.value.publisherStage
val newStage = getStage(it)
val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED)
!isExistingStageAndNewStageAreCompleted
}
.collect { state ->
val publisherStage = getStage(state)
publisherStage?.let {
stateHolder.updatePublisherStage(publisherStage)
observerScope.launch {
session.value?.publisher?.value?.let { publisher ->
onPeerConnectionStateChanged(
publisher.hashCode(),
role = PeerConnectionRole.PUBLISH,
iceState = publisher.iceState.value,
peerConnectionState = state,
)
}
}
}
}
}
val publisherJob = observeConnection(
session = session,
role = PeerConnectionRole.PUBLISH,
connectionOf = { it.publisher },
currentStage = { stateHolder.state.value.publisherStage },
updateStage = stateHolder::updatePublisherStage,
)
stateHolder.updatePublisherJob(publisherJob)

stateHolder.state.value.subscriberJob?.cancel()
val subscriberJob = launch {
session.filterNotNull()
.flatMapLatest { it.subscriber.filterNotNull() }
.flatMapLatest {
it.state.filter { it -> allowedStates.contains(it) }.filterNotNull()
}.filter {
val existingStage = stateHolder.state.value.subscriberStage
val subscriberJob = observeConnection(
session = session,
role = PeerConnectionRole.SUBSCRIBE,
connectionOf = { it.subscriber },
currentStage = { stateHolder.state.value.subscriberStage },
updateStage = stateHolder::updateSubscriberStage,
)
stateHolder.updateSubscriberJob(subscriberJob)
}
stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob)
}

/**
* Observes a single peer connection ([connectionOf]) for analytics.
*
* Each allowed peer-connection state ([allowedPcStates]) is reported together with the ICE
* state as it stands at that moment ([StreamPeerConnection.iceState] value, mapped via
* [toVideoAnalyticsIceState]). So a CONNECTED peer connection reports CONNECTED only when its
* ICE is already connected, otherwise NOT_CONNECTED. The upstream filter drops a transition
* that would merely repeat an already-completed stage (e.g. CONNECTED after FAILED), and
* [distinctUntilChanged] avoids emitting the same combination twice.
*
* The stage is updated in lockstep with the reported event (in the terminal collector), not
* when the raw peer-connection state changes. It therefore stays IN_PROGRESS while the
* peer-connection session is open and only flips to COMPLETED once the completed event is
* actually emitted, which keeps the call-leave flow's "is a stage still in progress?" check
* correct.
*/
private fun CoroutineScope.observeConnection(
session: StateFlow<RtcSession?>,
role: PeerConnectionRole,
connectionOf: (RtcSession) -> StateFlow<StreamPeerConnection?>,
currentStage: () -> Stage,
updateStage: (Stage) -> Unit,
): Job = launch {
session.filterNotNull()
.flatMapLatest { connectionOf(it).filterNotNull() }
.flatMapLatest { connection ->
connection.state
.filter { allowedPcStates.contains(it) }
.filterNotNull()
.filter {
// Skip a peer-connection state that would only repeat an already-completed
// stage (e.g. a CONNECTED after a FAILED). Gating here — before the ICE read
// and the collector — means we don't do any work for such transitions.
val existingStage = currentStage()
val newStage = getStage(it)
val isExistingStageAndNewStageAreCompleted = (existingStage == Stage.COMPLETED && newStage == Stage.COMPLETED)
!isExistingStageAndNewStageAreCompleted
}
.collect { state ->
val stage = getStage(state)
stage?.let {
stateHolder.updateSubscriberStage(stage)
observerScope.launch {
session.value?.subscriber?.value?.let { subscriber ->
onPeerConnectionStateChanged(
subscriber.hashCode(),
role = PeerConnectionRole.SUBSCRIBE,
iceState = subscriber.iceState.value,
peerConnectionState = state,
)
}
}
}
.map { pcState ->
PeerConnectionSnapshot(
connection.hashCode(),
pcState,
connection.iceState.value.toVideoAnalyticsIceState(),
)
}
}
stateHolder.updateSubscriberJob(subscriberJob)
}
stateHolder.updatePeerConnectionObserverJob(peerConnectionObserverJob)
.distinctUntilChanged()
.collect { snapshot ->
// Update the stage in lockstep with the reported event (not when the raw
// peer-connection state changed), so it only flips to COMPLETED once the completed
// event is actually emitted. The COMPLETED -> COMPLETED case is already dropped by
// the upstream filter, so no extra guard is needed here.
val pcAnalyticsStage = getStage(snapshot.peerConnectionState)
pcAnalyticsStage?.let {
updateStage(pcAnalyticsStage)
onPeerConnectionStateChanged(
peerConnectionHashCode = snapshot.peerConnectionHashCode,
role = role,
iceState = snapshot.iceState,
peerConnectionState = snapshot.peerConnectionState,
)
}
}
}

private fun getStage(peerConnectionState: PeerConnection.PeerConnectionState): Stage? {
Expand All @@ -135,7 +159,7 @@ internal class PeerConnectionAnalytics(
internal fun onPeerConnectionStateChanged(
peerConnectionHashCode: Int,
role: PeerConnectionRole,
iceState: PeerConnection.IceConnectionState?,
iceState: VideoAnalyticsIceState,
peerConnectionState: PeerConnection.PeerConnectionState?,
) {
reporter.onPeerConnectionStateChanged(
Expand All @@ -162,3 +186,9 @@ internal class PeerConnectionAnalytics(
observePeerConnections(session)
}
}

private data class PeerConnectionSnapshot(
val peerConnectionHashCode: Int,
val peerConnectionState: PeerConnection.PeerConnectionState,
val iceState: VideoAnalyticsIceState,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-video-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.video.android.core.analytics.call.observer

import org.webrtc.PeerConnection

internal enum class VideoAnalyticsIceState(val text: String) {
CONNECTED("CONNECTED"), FAILED("FAILED"), NOT_CONNECTED("NOT_CONNECTED")
}

internal fun PeerConnection.IceConnectionState?.toVideoAnalyticsIceState(): VideoAnalyticsIceState {
return when (this) {
PeerConnection.IceConnectionState.CONNECTED,
PeerConnection.IceConnectionState.COMPLETED,
-> VideoAnalyticsIceState.CONNECTED

PeerConnection.IceConnectionState.FAILED -> VideoAnalyticsIceState.FAILED

else -> VideoAnalyticsIceState.NOT_CONNECTED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.getstream.android.video.generated.models.ClientEvent
import io.getstream.video.android.core.StreamVideo
import io.getstream.video.android.core.analytics.call.observer.VideoAnalyticsIceState
import io.getstream.video.android.core.analytics.call.observer.model.JoinReason
import io.getstream.video.android.core.analytics.reporting.model.EventOutcome
import io.getstream.video.android.core.analytics.reporting.model.EventStage
Expand All @@ -29,7 +30,7 @@

internal class ClientEventFactory(val sdkVersion: String, val userAgent: () -> String, val getCoordinatorId: () -> String) {

fun buildRequest(

Check warning on line 33 in stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/analytics/reporting/ClientEventFactory.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 23 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-video-android&issues=AZ7Uw5BJib7_uCoEXOGm&open=AZ7Uw5BJib7_uCoEXOGm&pullRequest=1724
callId: String? = null,
callType: String? = null,
stage: EventStage,
Expand All @@ -46,7 +47,7 @@
sfuId: String? = null,
peerConnection: PeerConnectionRole? = null,
wasPreviouslyConnected: Boolean? = null,
iceState: PeerConnection.IceConnectionState? = null,
iceState: VideoAnalyticsIceState? = null,
peerConnectionState: PeerConnection.PeerConnectionState? = null,
userSessionId: String? = null,
screenShareAllowed: Boolean? = null,
Expand All @@ -66,7 +67,7 @@
userId = StreamVideo.Companion.instanceOrNull()?.userId,
callSessionId = callSessionId,
elapsedTime = elapsedTime?.toInt(),
iceState = iceState?.name,
iceState = iceState?.text,
outcome = outcome?.value,
peerConnection = peerConnection?.value,
previouslyConnectedTimestamp = null,
Expand Down
Loading
Loading