Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions icegatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (g *ICEGatherer) buildAgentOptions() ([]ice.AgentOption, error) {
options = append(options, g.timeoutOptions()...)
options = append(options, g.miscOptions()...)
options = append(options, g.renominationOptions()...)
options = append(options, g.continualGatheringOptions()...)

requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes
if len(requestedNetworkTypes) == 0 {
Expand Down Expand Up @@ -334,6 +335,15 @@ func (g *ICEGatherer) renominationOptions() []ice.AgentOption {
return opts
}

func (g *ICEGatherer) continualGatheringOptions() []ice.AgentOption {
policy := g.api.settingEngine.iceContinualGatheringPolicy
if policy == 0 {
return nil
}

return []ice.AgentOption{ice.WithContinualGatheringPolicy(policy)}
}

func legacyNAT1To1AddressRewriteRules(ips []string, candidateType ice.CandidateType) []ice.AddressRewriteRule {
catchAll := make([]string, 0, len(ips))
rules := make([]ice.AddressRewriteRule, 0, len(ips)+1)
Expand Down
265 changes: 265 additions & 0 deletions icegatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,271 @@ func TestICEGatherer_RenominationOptions(t *testing.T) {
assert.NotNil(t, se.renomination.generator)
}

func TestICEGatherer_ContinualGatheringOptions(t *testing.T) {
se := SettingEngine{}
assert.Equal(t, ice.ContinualGatheringPolicy(0), se.iceContinualGatheringPolicy)

se.SetICEContinualGatheringPolicy(ice.GatherContinually)
assert.Equal(t, ice.GatherContinually, se.iceContinualGatheringPolicy)

se2 := SettingEngine{}
se2.SetICEContinualGatheringPolicy(ice.GatherOnce)
assert.Equal(t, ice.GatherOnce, se2.iceContinualGatheringPolicy)
}

func TestICEGatherer_ContinualGatheringNeverCompletes(t *testing.T) { //nolint:cyclop
// With GatherContinually, the ICE agent keeps monitoring for network changes.
// The gathering state never transitions to "complete" and OnICECandidate(nil) is never called.

lim := test.TimeOut(time.Second * 30)
defer lim.Stop()

report := test.CheckRoutines(t)
defer report()

offerPC, answerPC, _, _, cleanup := buildContinualGatheringVNetPair(t)
defer cleanup()

offerGatheringStates := make(chan ICEGatheringState, 10)
offerPC.OnICEGatheringStateChange(func(state ICEGatheringState) {
offerGatheringStates <- state
})

// A nil candidate signals gathering complete, which should never happen with continual gathering.
offerNilCandidate := make(chan struct{}, 1)
answerNilCandidate := make(chan struct{}, 1)

// Setup trickle ICE.
offerPC.OnICECandidate(func(c *ICECandidate) {
if c == nil {
select {
case offerNilCandidate <- struct{}{}:
default:
}

return
}
if answerPC.RemoteDescription() != nil {
assert.NoError(t, answerPC.AddICECandidate(c.ToJSON()))
}
})

answerPC.OnICECandidate(func(c *ICECandidate) {
if c == nil {
select {
case answerNilCandidate <- struct{}{}:
default:
}

return
}
if offerPC.RemoteDescription() != nil {
assert.NoError(t, offerPC.AddICECandidate(c.ToJSON()))
}
})

connected := make(chan struct{})
var connectedOnce sync.Once
offerPC.OnICEConnectionStateChange(func(state ICEConnectionState) {
if state == ICEConnectionStateConnected {
connectedOnce.Do(func() {
close(connected)
})
}
})

_, err := offerPC.CreateDataChannel("test", nil)
assert.NoError(t, err)

offer, err := offerPC.CreateOffer(nil)
assert.NoError(t, err)
assert.NoError(t, offerPC.SetLocalDescription(offer))
assert.NoError(t, answerPC.SetRemoteDescription(offer))

answer, err := answerPC.CreateAnswer(nil)
assert.NoError(t, err)
assert.NoError(t, answerPC.SetLocalDescription(answer))
assert.NoError(t, offerPC.SetRemoteDescription(*answerPC.LocalDescription()))

select {
case <-connected:
case <-time.After(10 * time.Second):
assert.Fail(t, "timed out waiting for ICE to connect")
}

// Wait to give gathering a chance to incorrectly complete.
time.Sleep(2 * time.Second)

gatheringState := offerPC.ICEGatheringState()
assert.Equal(t, ICEGatheringStateGathering, gatheringState,
"with continual gathering, state should remain 'gathering', got '%s'", gatheringState)

select {
case <-offerNilCandidate:
assert.Fail(t, "OnICECandidate(nil) should never be called with continual gathering")
default:
}

select {
case <-answerNilCandidate:
assert.Fail(t, "OnICECandidate(nil) should never be called with continual gathering on answer side")
default:
}

// Verify state transitions: should have seen "gathering" but never "complete".
seenGathering := false
seenComplete := false
drainLoop:
for {
select {
case state := <-offerGatheringStates:
if state == ICEGatheringStateGathering {
seenGathering = true
}
if state == ICEGatheringStateComplete {
seenComplete = true
}
default:
break drainLoop
}
}

assert.True(t, seenGathering, "should have seen gathering state")
assert.False(t, seenComplete, "should NOT have seen complete state with continual gathering")
}

func TestICEGatherer_ContinualGatheringNetworkChange(t *testing.T) {
// Continual gathering monitors for network changes and gathers new candidates
// when interfaces are added. This test verifies initial candidate gathering works.
// The network change portion is commented out until vnet supports AddIPToNIC.

lim := test.TimeOut(time.Second * 30)
defer lim.Stop()

report := test.CheckRoutines(t)
defer report()

offerPC, answerPC, _, offerNet, cleanup := buildContinualGatheringVNetPair(t)
defer cleanup()

candidatesMu := sync.Mutex{}
candidateAddresses := make(map[string]struct{})

// Setup trickle ICE.
offerPC.OnICECandidate(func(c *ICECandidate) {
if c == nil {
return
}
candidatesMu.Lock()
candidateAddresses[c.Address] = struct{}{}
candidatesMu.Unlock()

if answerPC.RemoteDescription() != nil {
assert.NoError(t, answerPC.AddICECandidate(c.ToJSON()))
}
})

answerPC.OnICECandidate(func(c *ICECandidate) {
if c != nil && offerPC.RemoteDescription() != nil {
assert.NoError(t, offerPC.AddICECandidate(c.ToJSON()))
}
})

connected := make(chan struct{})
var connectedOnce sync.Once
offerPC.OnICEConnectionStateChange(func(state ICEConnectionState) {
if state == ICEConnectionStateConnected {
connectedOnce.Do(func() {
close(connected)
})
}
})

_, err := offerPC.CreateDataChannel("test", nil)
assert.NoError(t, err)

offer, err := offerPC.CreateOffer(nil)
assert.NoError(t, err)
assert.NoError(t, offerPC.SetLocalDescription(offer))
assert.NoError(t, answerPC.SetRemoteDescription(offer))

answer, err := answerPC.CreateAnswer(nil)
assert.NoError(t, err)
assert.NoError(t, answerPC.SetLocalDescription(answer))
assert.NoError(t, offerPC.SetRemoteDescription(*answerPC.LocalDescription()))

select {
case <-connected:
case <-time.After(10 * time.Second):
assert.Fail(t, "timed out waiting for ICE to connect")
}

candidatesMu.Lock()
_, hasInitialIP := candidateAddresses["1.2.3.4"]
candidatesMu.Unlock()
assert.True(t, hasInitialIP, "should have gathered candidate for initial IP 1.2.3.4")

newIP := net.ParseIP("1.2.3.10")
assert.NoError(t, offerNet.AddAddress("eth0", &net.IPNet{
IP: newIP,
Mask: net.CIDRMask(24, 32),
}))

// The network monitor polls every 2 seconds by default.
assert.Eventually(t, func() bool {
candidatesMu.Lock()
_, hasNewIP := candidateAddresses["1.2.3.10"]
candidatesMu.Unlock()

return hasNewIP
}, 5*time.Second, 100*time.Millisecond, "should gather new candidate for added IP 1.2.3.10")
}

//nolint:unparam
func buildContinualGatheringVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router, *vnet.Net, func()) {
t.Helper()

router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "1.2.3.0/24",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
assert.NoError(t, err)

offerNet, err := vnet.NewNet(&vnet.NetConfig{
StaticIPs: []string{"1.2.3.4"},
})
assert.NoError(t, err)
assert.NoError(t, router.AddNet(offerNet))

answerNet, err := vnet.NewNet(&vnet.NetConfig{
StaticIPs: []string{"1.2.3.5"},
})
assert.NoError(t, err)
assert.NoError(t, router.AddNet(answerNet))

assert.NoError(t, router.Start())

se := SettingEngine{}
se.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
se.SetNetworkTypes([]NetworkType{NetworkTypeUDP4})
se.SetICEContinualGatheringPolicy(ice.GatherContinually)

se.SetNet(offerNet)
offerPC, err := NewAPI(WithSettingEngine(se)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

se.SetNet(answerNet)
answerPC, err := NewAPI(WithSettingEngine(se)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

cleanup := func() {
closePairNow(t, offerPC, answerPC)
assert.NoError(t, router.Stop())
}

return offerPC, answerPC, router, offerNet, cleanup
}

func TestICEGatherer_RenominationOptionsDisabled(t *testing.T) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()
Expand Down
12 changes: 12 additions & 0 deletions settingengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type SettingEngine struct {
iceProxyDialer proxy.Dialer
iceDisableActiveTCP bool
iceBindingRequestHandler func(m *stun.Message, local, remote ice.Candidate, pair *ice.CandidatePair) bool //nolint:lll
iceContinualGatheringPolicy ice.ContinualGatheringPolicy
disableMediaEngineCopy bool
disableMediaEngineMultipleCodecs bool
srtpProtectionProfiles []dtls.SRTPProtectionProfile
Expand Down Expand Up @@ -662,6 +663,17 @@ func (e *SettingEngine) SetICEBindingRequestHandler(
e.iceBindingRequestHandler = bindingRequestHandler
}

// SetICEContinualGatheringPolicy sets the policy for gathering ICE candidates.
// When set to GatherContinually, the ICE agent monitors for network changes and
// gathers new candidates when interfaces are added or removed. This changes the
// observable ICE behavior:
// - ICEGatheringState remains in "gathering" and never transitions to "complete".
// - OnICECandidate(nil) is never called, as gathering never finishes.
// Applications should use trickle ICE and not wait for gathering to complete.
func (e *SettingEngine) SetICEContinualGatheringPolicy(policy ice.ContinualGatheringPolicy) {
e.iceContinualGatheringPolicy = policy
}

// SetFireOnTrackBeforeFirstRTP sets if firing the OnTrack event should happen
// before any RTP packets are received. Setting this to true will
// have the Track's Codec and PayloadTypes be initially set to their
Expand Down
13 changes: 13 additions & 0 deletions settingengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@ func TestICERenomination(t *testing.T) {
})
}

func TestICEContinualGatheringPolicy(t *testing.T) {
t.Run("DefaultIsZero", func(t *testing.T) {
s := SettingEngine{}
assert.Equal(t, ice.ContinualGatheringPolicy(0), s.iceContinualGatheringPolicy)
})

t.Run("SetPolicy", func(t *testing.T) {
s := SettingEngine{}
s.SetICEContinualGatheringPolicy(ice.GatherContinually)
assert.Equal(t, ice.GatherContinually, s.iceContinualGatheringPolicy)
})
}

func TestDetachDataChannels(t *testing.T) {
s := SettingEngine{}
assert.False(t, s.detach.DataChannels)
Expand Down
Loading