From 58f18bae7eb90d84aea8151856ddf18ecfcefb69 Mon Sep 17 00:00:00 2001 From: David von Wrangel Date: Thu, 25 Dec 2025 22:50:06 +0100 Subject: [PATCH] Add continual gathering policy to SettingEngine --- icegatherer.go | 10 ++ icegatherer_test.go | 265 ++++++++++++++++++++++++++++++++++++++++++ settingengine.go | 12 ++ settingengine_test.go | 13 +++ 4 files changed, 300 insertions(+) diff --git a/icegatherer.go b/icegatherer.go index 0fe7e876d41..58b31cee250 100644 --- a/icegatherer.go +++ b/icegatherer.go @@ -157,6 +157,7 @@ func (g *ICEGatherer) buildAgentOptions() ([]ice.AgentOption, error) { options = append(options, g.timeoutOptions()...) options = append(options, g.miscOptions()...) options = append(options, g.renominationOptions()...) + options = append(options, g.continualGatheringOptions()...) requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes if len(requestedNetworkTypes) == 0 { @@ -334,6 +335,15 @@ func (g *ICEGatherer) renominationOptions() []ice.AgentOption { return opts } +func (g *ICEGatherer) continualGatheringOptions() []ice.AgentOption { + policy := g.api.settingEngine.iceContinualGatheringPolicy + if policy == 0 { + return nil + } + + return []ice.AgentOption{ice.WithContinualGatheringPolicy(policy)} +} + func legacyNAT1To1AddressRewriteRules(ips []string, candidateType ice.CandidateType) []ice.AddressRewriteRule { catchAll := make([]string, 0, len(ips)) rules := make([]ice.AddressRewriteRule, 0, len(ips)+1) diff --git a/icegatherer_test.go b/icegatherer_test.go index e50f79e3a36..8f9021c091f 100644 --- a/icegatherer_test.go +++ b/icegatherer_test.go @@ -2109,6 +2109,271 @@ func TestICEGatherer_RenominationOptions(t *testing.T) { assert.NotNil(t, se.renomination.generator) } +func TestICEGatherer_ContinualGatheringOptions(t *testing.T) { + se := SettingEngine{} + assert.Equal(t, ice.ContinualGatheringPolicy(0), se.iceContinualGatheringPolicy) + + se.SetICEContinualGatheringPolicy(ice.GatherContinually) + assert.Equal(t, ice.GatherContinually, se.iceContinualGatheringPolicy) + + se2 := SettingEngine{} + se2.SetICEContinualGatheringPolicy(ice.GatherOnce) + assert.Equal(t, ice.GatherOnce, se2.iceContinualGatheringPolicy) +} + +func TestICEGatherer_ContinualGatheringNeverCompletes(t *testing.T) { //nolint:cyclop + // With GatherContinually, the ICE agent keeps monitoring for network changes. + // The gathering state never transitions to "complete" and OnICECandidate(nil) is never called. + + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + offerPC, answerPC, _, _, cleanup := buildContinualGatheringVNetPair(t) + defer cleanup() + + offerGatheringStates := make(chan ICEGatheringState, 10) + offerPC.OnICEGatheringStateChange(func(state ICEGatheringState) { + offerGatheringStates <- state + }) + + // A nil candidate signals gathering complete, which should never happen with continual gathering. + offerNilCandidate := make(chan struct{}, 1) + answerNilCandidate := make(chan struct{}, 1) + + // Setup trickle ICE. + offerPC.OnICECandidate(func(c *ICECandidate) { + if c == nil { + select { + case offerNilCandidate <- struct{}{}: + default: + } + + return + } + if answerPC.RemoteDescription() != nil { + assert.NoError(t, answerPC.AddICECandidate(c.ToJSON())) + } + }) + + answerPC.OnICECandidate(func(c *ICECandidate) { + if c == nil { + select { + case answerNilCandidate <- struct{}{}: + default: + } + + return + } + if offerPC.RemoteDescription() != nil { + assert.NoError(t, offerPC.AddICECandidate(c.ToJSON())) + } + }) + + connected := make(chan struct{}) + var connectedOnce sync.Once + offerPC.OnICEConnectionStateChange(func(state ICEConnectionState) { + if state == ICEConnectionStateConnected { + connectedOnce.Do(func() { + close(connected) + }) + } + }) + + _, err := offerPC.CreateDataChannel("test", nil) + assert.NoError(t, err) + + offer, err := offerPC.CreateOffer(nil) + assert.NoError(t, err) + assert.NoError(t, offerPC.SetLocalDescription(offer)) + assert.NoError(t, answerPC.SetRemoteDescription(offer)) + + answer, err := answerPC.CreateAnswer(nil) + assert.NoError(t, err) + assert.NoError(t, answerPC.SetLocalDescription(answer)) + assert.NoError(t, offerPC.SetRemoteDescription(*answerPC.LocalDescription())) + + select { + case <-connected: + case <-time.After(10 * time.Second): + assert.Fail(t, "timed out waiting for ICE to connect") + } + + // Wait to give gathering a chance to incorrectly complete. + time.Sleep(2 * time.Second) + + gatheringState := offerPC.ICEGatheringState() + assert.Equal(t, ICEGatheringStateGathering, gatheringState, + "with continual gathering, state should remain 'gathering', got '%s'", gatheringState) + + select { + case <-offerNilCandidate: + assert.Fail(t, "OnICECandidate(nil) should never be called with continual gathering") + default: + } + + select { + case <-answerNilCandidate: + assert.Fail(t, "OnICECandidate(nil) should never be called with continual gathering on answer side") + default: + } + + // Verify state transitions: should have seen "gathering" but never "complete". + seenGathering := false + seenComplete := false +drainLoop: + for { + select { + case state := <-offerGatheringStates: + if state == ICEGatheringStateGathering { + seenGathering = true + } + if state == ICEGatheringStateComplete { + seenComplete = true + } + default: + break drainLoop + } + } + + assert.True(t, seenGathering, "should have seen gathering state") + assert.False(t, seenComplete, "should NOT have seen complete state with continual gathering") +} + +func TestICEGatherer_ContinualGatheringNetworkChange(t *testing.T) { + // Continual gathering monitors for network changes and gathers new candidates + // when interfaces are added. This test verifies initial candidate gathering works. + // The network change portion is commented out until vnet supports AddIPToNIC. + + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + offerPC, answerPC, _, offerNet, cleanup := buildContinualGatheringVNetPair(t) + defer cleanup() + + candidatesMu := sync.Mutex{} + candidateAddresses := make(map[string]struct{}) + + // Setup trickle ICE. + offerPC.OnICECandidate(func(c *ICECandidate) { + if c == nil { + return + } + candidatesMu.Lock() + candidateAddresses[c.Address] = struct{}{} + candidatesMu.Unlock() + + if answerPC.RemoteDescription() != nil { + assert.NoError(t, answerPC.AddICECandidate(c.ToJSON())) + } + }) + + answerPC.OnICECandidate(func(c *ICECandidate) { + if c != nil && offerPC.RemoteDescription() != nil { + assert.NoError(t, offerPC.AddICECandidate(c.ToJSON())) + } + }) + + connected := make(chan struct{}) + var connectedOnce sync.Once + offerPC.OnICEConnectionStateChange(func(state ICEConnectionState) { + if state == ICEConnectionStateConnected { + connectedOnce.Do(func() { + close(connected) + }) + } + }) + + _, err := offerPC.CreateDataChannel("test", nil) + assert.NoError(t, err) + + offer, err := offerPC.CreateOffer(nil) + assert.NoError(t, err) + assert.NoError(t, offerPC.SetLocalDescription(offer)) + assert.NoError(t, answerPC.SetRemoteDescription(offer)) + + answer, err := answerPC.CreateAnswer(nil) + assert.NoError(t, err) + assert.NoError(t, answerPC.SetLocalDescription(answer)) + assert.NoError(t, offerPC.SetRemoteDescription(*answerPC.LocalDescription())) + + select { + case <-connected: + case <-time.After(10 * time.Second): + assert.Fail(t, "timed out waiting for ICE to connect") + } + + candidatesMu.Lock() + _, hasInitialIP := candidateAddresses["1.2.3.4"] + candidatesMu.Unlock() + assert.True(t, hasInitialIP, "should have gathered candidate for initial IP 1.2.3.4") + + newIP := net.ParseIP("1.2.3.10") + assert.NoError(t, offerNet.AddAddress("eth0", &net.IPNet{ + IP: newIP, + Mask: net.CIDRMask(24, 32), + })) + + // The network monitor polls every 2 seconds by default. + assert.Eventually(t, func() bool { + candidatesMu.Lock() + _, hasNewIP := candidateAddresses["1.2.3.10"] + candidatesMu.Unlock() + + return hasNewIP + }, 5*time.Second, 100*time.Millisecond, "should gather new candidate for added IP 1.2.3.10") +} + +//nolint:unparam +func buildContinualGatheringVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router, *vnet.Net, func()) { + t.Helper() + + router, err := vnet.NewRouter(&vnet.RouterConfig{ + CIDR: "1.2.3.0/24", + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + assert.NoError(t, err) + + offerNet, err := vnet.NewNet(&vnet.NetConfig{ + StaticIPs: []string{"1.2.3.4"}, + }) + assert.NoError(t, err) + assert.NoError(t, router.AddNet(offerNet)) + + answerNet, err := vnet.NewNet(&vnet.NetConfig{ + StaticIPs: []string{"1.2.3.5"}, + }) + assert.NoError(t, err) + assert.NoError(t, router.AddNet(answerNet)) + + assert.NoError(t, router.Start()) + + se := SettingEngine{} + se.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled) + se.SetNetworkTypes([]NetworkType{NetworkTypeUDP4}) + se.SetICEContinualGatheringPolicy(ice.GatherContinually) + + se.SetNet(offerNet) + offerPC, err := NewAPI(WithSettingEngine(se)).NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + se.SetNet(answerNet) + answerPC, err := NewAPI(WithSettingEngine(se)).NewPeerConnection(Configuration{}) + assert.NoError(t, err) + + cleanup := func() { + closePairNow(t, offerPC, answerPC) + assert.NoError(t, router.Stop()) + } + + return offerPC, answerPC, router, offerNet, cleanup +} + func TestICEGatherer_RenominationOptionsDisabled(t *testing.T) { lim := test.TimeOut(time.Second * 10) defer lim.Stop() diff --git a/settingengine.go b/settingengine.go index ea665cdfbf1..c378d15317a 100644 --- a/settingengine.go +++ b/settingengine.go @@ -105,6 +105,7 @@ type SettingEngine struct { iceProxyDialer proxy.Dialer iceDisableActiveTCP bool iceBindingRequestHandler func(m *stun.Message, local, remote ice.Candidate, pair *ice.CandidatePair) bool //nolint:lll + iceContinualGatheringPolicy ice.ContinualGatheringPolicy disableMediaEngineCopy bool disableMediaEngineMultipleCodecs bool srtpProtectionProfiles []dtls.SRTPProtectionProfile @@ -662,6 +663,17 @@ func (e *SettingEngine) SetICEBindingRequestHandler( e.iceBindingRequestHandler = bindingRequestHandler } +// SetICEContinualGatheringPolicy sets the policy for gathering ICE candidates. +// When set to GatherContinually, the ICE agent monitors for network changes and +// gathers new candidates when interfaces are added or removed. This changes the +// observable ICE behavior: +// - ICEGatheringState remains in "gathering" and never transitions to "complete". +// - OnICECandidate(nil) is never called, as gathering never finishes. +// Applications should use trickle ICE and not wait for gathering to complete. +func (e *SettingEngine) SetICEContinualGatheringPolicy(policy ice.ContinualGatheringPolicy) { + e.iceContinualGatheringPolicy = policy +} + // SetFireOnTrackBeforeFirstRTP sets if firing the OnTrack event should happen // before any RTP packets are received. Setting this to true will // have the Track's Codec and PayloadTypes be initially set to their diff --git a/settingengine_test.go b/settingengine_test.go index 7df0fd3f90a..f5f21f516f2 100644 --- a/settingengine_test.go +++ b/settingengine_test.go @@ -106,6 +106,19 @@ func TestICERenomination(t *testing.T) { }) } +func TestICEContinualGatheringPolicy(t *testing.T) { + t.Run("DefaultIsZero", func(t *testing.T) { + s := SettingEngine{} + assert.Equal(t, ice.ContinualGatheringPolicy(0), s.iceContinualGatheringPolicy) + }) + + t.Run("SetPolicy", func(t *testing.T) { + s := SettingEngine{} + s.SetICEContinualGatheringPolicy(ice.GatherContinually) + assert.Equal(t, ice.GatherContinually, s.iceContinualGatheringPolicy) + }) +} + func TestDetachDataChannels(t *testing.T) { s := SettingEngine{} assert.False(t, s.detach.DataChannels)