Skip to content

Commit 60fb984

Browse files
authored
NACK responder: bypass auxiliary SSRCs (#310)
1 parent c06f448 commit 60fb984

File tree

2 files changed

+80
-3
lines changed

2 files changed

+80
-3
lines changed

pkg/nack/responder_interceptor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
113113
n.streamsMu.Unlock()
114114

115115
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
116+
// If this packet doesn't belong to the main SSRC, do not add it to rtpBuffer
117+
if header.SSRC != info.SSRC {
118+
return writer.Write(header, payload, attributes)
119+
}
120+
116121
pkt, err := n.packetFactory.NewPacket(header, payload, info.SSRCRetransmission, info.PayloadTypeRetransmission)
117122
if err != nil {
118123
return 0, err

pkg/nack/responder_interceptor_test.go

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestResponderInterceptor(t *testing.T) {
5757
}()
5858

5959
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
60-
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
60+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))
6161

6262
select {
6363
case p := <-stream.WrittenRTP():
@@ -252,7 +252,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
252252
}()
253253

254254
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
255-
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
255+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))
256256

257257
select {
258258
case p := <-stream.WrittenRTP():
@@ -272,7 +272,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
272272
},
273273
})
274274

275-
// seq number 13 was never sent, so it can't be resent
275+
// seq number 13 was never sent, so it can't be present
276276
for _, seqNum := range []uint16{11, 12, 15} {
277277
select {
278278
case p := <-stream.WrittenRTP():
@@ -290,3 +290,75 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
290290
case <-time.After(10 * time.Millisecond):
291291
}
292292
}
293+
294+
func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) {
295+
f, err := NewResponderInterceptor(
296+
ResponderSize(8),
297+
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
298+
)
299+
require.NoError(t, err)
300+
301+
i, err := f.NewInterceptor("")
302+
require.NoError(t, err)
303+
304+
stream := test.NewMockStream(&interceptor.StreamInfo{
305+
SSRC: 1,
306+
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
307+
}, i)
308+
defer func() {
309+
require.NoError(t, stream.Close())
310+
}()
311+
312+
// Send some packets with both SSRCs to check that only SSRC=1 added to the buffer
313+
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
314+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))
315+
// This packet should be bypassed and not added to the buffer.
316+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 2}}))
317+
318+
select {
319+
case p := <-stream.WrittenRTP():
320+
require.Equal(t, seqNum, p.SequenceNumber)
321+
require.Equal(t, uint32(1), p.SSRC)
322+
case <-time.After(10 * time.Millisecond):
323+
t.Fatal("written rtp packet not found")
324+
}
325+
326+
select {
327+
case p := <-stream.WrittenRTP():
328+
require.Equal(t, seqNum, p.SequenceNumber)
329+
require.Equal(t, uint32(2), p.SSRC)
330+
case <-time.After(10 * time.Millisecond):
331+
t.Fatal("written rtp packet not found")
332+
}
333+
}
334+
335+
// This packet should be bypassed and not added to the buffer.
336+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: 13, SSRC: 2}}))
337+
select {
338+
case p := <-stream.WrittenRTP():
339+
require.Equal(t, uint16(13), p.SequenceNumber)
340+
case <-time.After(10 * time.Millisecond):
341+
t.Fatal("written rtp packet not found")
342+
}
343+
344+
stream.ReceiveRTCP([]rtcp.Packet{
345+
&rtcp.TransportLayerNack{
346+
MediaSSRC: 1,
347+
SenderSSRC: 1,
348+
Nacks: []rtcp.NackPair{
349+
{PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15
350+
},
351+
},
352+
})
353+
354+
// seq number 13 was sent with different ssrc, it should not be present
355+
for _, seqNum := range []uint16{11, 12, 15} {
356+
select {
357+
case p := <-stream.WrittenRTP():
358+
require.Equal(t, uint32(1), p.SSRC)
359+
require.Equal(t, seqNum, p.SequenceNumber)
360+
case <-time.After(10 * time.Millisecond):
361+
t.Fatal("written rtp packet not found")
362+
}
363+
}
364+
}

0 commit comments

Comments
 (0)