Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions dtlstransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,36 +530,42 @@
func (t *DTLSTransport) streamsForSSRC(
ssrc SSRC,
streamInfo interceptor.StreamInfo,
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, interceptor.RTPProcessor, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
srtpSession, err := t.getSRTPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 541 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L541

Added line #L541 was not covered by tests
}

rtpInterceptor := t.api.interceptor.BindRemoteStream(
rtpProcessor := t.api.interceptor.BindRemoteStream(
&streamInfo,
interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
interceptor.RTPProcessorFunc(
func(s int, in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
return s, a, nil
},
),
)

rtpReader := interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
},
)

srtcpSession, err := t.getSRTCPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 563 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L563

Added line #L563 was not covered by tests
}

rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 568 in dtlstransport.go

View check run for this annotation

Codecov / codecov/patch

dtlstransport.go#L568

Added line #L568 was not covered by tests
}

rtcpInterceptor := t.api.interceptor.BindRTCPReader(interceptor.RTCPReaderFunc(
Expand All @@ -570,5 +576,5 @@
}),
)

return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil
return rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ require (
golang.org/x/sys v0.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 h1:5p3Tm/VZUdN8aqLJp1noK/fAqggXJBHSsWXQJbksmw0=
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb h1:qu70eQhcmCvNkrzYeVTDXS1RGmt14Qu5vo+sQH+q16w=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
26 changes: 22 additions & 4 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func TestPeerConnection_Interceptor(t *testing.T) {
},
)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
if a == nil {
a = interceptor.Attributes{}
}

a.Set("attribute", "value")

return reader.Read(b, a)
return reader.Process(i, b, a)
})
},
}, nil
Expand Down Expand Up @@ -146,7 +146,7 @@ func Test_Interceptor_BindUnbind(t *testing.T) { //nolint:cyclop
UnbindLocalStreamFn: func(*interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindLocalStream, 1)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
atomic.AddUint32(&cntBindRemoteStream, 1)

return reader
Expand Down Expand Up @@ -413,6 +413,24 @@ func testInterceptorNack(t *testing.T, requestNack bool) { //nolint:cyclop
close(done)
})

pcOfferConnected := make(chan struct{})
pcAnswerConnected := make(chan struct{})

pc1.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcOfferConnected)
}
})

pc2.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcAnswerConnected)
}
})

<-pcOfferConnected
<-pcAnswerConnected

go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(20 * time.Millisecond)
Expand Down
9 changes: 5 additions & 4 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
params.Codecs[0].RTPCodecCapability,
params.HeaderExtensions,
)
readStream, interceptor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)
readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)
if err != nil {
return err
}
Expand All @@ -1746,7 +1746,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
readCount--
}

i, _, err := interceptor.Read(b, nil)
i, _, err := rtpReader.Read(b, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1775,15 +1775,16 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
receiver.mu.Lock()
defer receiver.mu.Unlock()

return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor)
}

track, err := receiver.receiveForRid(
rid,
params,
streamInfo,
readStream,
interceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
)
Expand Down
10 changes: 8 additions & 2 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ func TestPeerConnection_Media_Sample(t *testing.T) {

go func() {
for {
time.Sleep(time.Millisecond * 100)
if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
time.Sleep(time.Millisecond * 100)
continue
}

if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
fmt.Println(routineErr)
}
Expand All @@ -169,6 +170,12 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
}()

go func() {
for {
if pcOffer.ICEConnectionState() == ICEConnectionStateConnected {
break
}
time.Sleep(time.Millisecond * 100)
}
parameters := sender.GetParameters()

for {
Expand All @@ -190,7 +197,6 @@ func TestPeerConnection_Media_Sample(t *testing.T) {
}
}
}()

go func() {
if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
close(awaitRTCPSenderRecv)
Expand Down
50 changes: 36 additions & 14 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ type trackStreams struct {

streamInfo, repairStreamInfo *interceptor.StreamInfo

rtpReadStream *srtp.ReadStreamSRTP
rtpInterceptor interceptor.RTPReader
rtpReadStream *srtp.ReadStreamSRTP
rtpReader interceptor.RTPReader
rtpProcessor interceptor.RTPProcessor

rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairReader interceptor.RTPReader
repairProcessor interceptor.RTPProcessor
repairStreamChannel chan rtxPacketWithAttributes

repairRtcpReadStream *srtp.ReadStreamSRTCP
Expand Down Expand Up @@ -228,13 +230,13 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no
var err error

//nolint:lll // # TODO refactor
if streams.rtpReadStream, streams.rtpInterceptor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
if streams.rtpReadStream, streams.rtpReader, streams.rtpProcessor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
return err
}

if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions)
rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtxSsrc,
*streamInfo,
)
Expand All @@ -247,7 +249,8 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error { //no
"",
streamInfo,
rtpReadStream,
rtpInterceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
); err != nil {
Expand Down Expand Up @@ -412,7 +415,11 @@ func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a intercept
}

if t := r.streamsForTrack(reader); t != nil {
return t.rtpInterceptor.Read(b, a)
i, attr, err := t.rtpReader.Read(b, a)
if err != nil {
return 0, nil, err
}
return t.rtpProcessor.Process(i, b, attr)
}

return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
Expand All @@ -425,7 +432,8 @@ func (r *RTPReceiver) receiveForRid(
params RTPParameters,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) (*TrackRemote, error) {
Expand All @@ -443,7 +451,8 @@ func (r *RTPReceiver) receiveForRid(

r.tracks[i].streamInfo = streamInfo
r.tracks[i].rtpReadStream = rtpReadStream
r.tracks[i].rtpInterceptor = rtpInterceptor
r.tracks[i].rtpReader = rtpReader
r.tracks[i].rtpProcessor = rtpProcessor
r.tracks[i].rtcpReadStream = rtcpReadStream
r.tracks[i].rtcpInterceptor = rtcpInterceptor

Expand All @@ -462,7 +471,8 @@ func (r *RTPReceiver) receiveForRtx(
rsid string,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) error {
Expand All @@ -488,15 +498,21 @@ func (r *RTPReceiver) receiveForRtx(

track.repairStreamInfo = streamInfo
track.repairReadStream = rtpReadStream
track.repairInterceptor = rtpInterceptor
track.repairReader = rtpReader
track.repairProcessor = rtpProcessor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes, 50)

go func() {
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
i, attributes, err := track.repairReader.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}
i, attributes, err = track.repairProcessor.Process(i, b, attributes)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck

Expand Down Expand Up @@ -590,7 +606,7 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil.
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
func (r *RTPReceiver) readRTX(b []byte, reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil
}
Expand All @@ -604,7 +620,13 @@ func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived
{
n := copy(b, rtxPacketReceived.pkt)
_, _, err := t.rtpProcessor.Process(n, b, nil)
if err == nil {
return &rtxPacketReceived
}
}
default:
}
}
Expand Down
2 changes: 1 addition & 1 deletion track_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}

// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacketReceived := receiver.readRTX(t); rtxPacketReceived != nil {
if rtxPacketReceived := receiver.readRTX(b, t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()
Expand Down