Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Pion Interceptor
<br>
</h1>
<h4 align="center">RTCP and RTCP processors for building real time communications</h4>
<h4 align="center">RTP and RTCP processors for building real time communications</h4>
<p align="center">
<a href="https://pion.ly"><img src="https://img.shields.io/badge/pion-interceptor-gray.svg?longCache=true&colorB=brightgreen" alt="Pion Interceptor"></a>
<a href="https://discord.gg/PngbdqpFbt"><img src="https://img.shields.io/badge/join-us%20on%20discord-gray.svg?longCache=true&logo=discord&colorB=brightblue" alt="join us on Discord"></a> <a href="https://bsky.app/profile/pion.ly"><img src="https://img.shields.io/badge/follow-us%20on%20bluesky-gray.svg?longCache=true&logo=bluesky&colorB=brightblue" alt="Follow us on Bluesky"></a>
Expand Down Expand Up @@ -36,12 +36,12 @@ by anyone. With the following tenets in mind.
* [Google Congestion Control](https://github.com/pion/interceptor/tree/master/pkg/gcc)
* [Stats](https://github.com/pion/interceptor/tree/master/pkg/stats) A [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
* [Interval PLI](https://github.com/pion/interceptor/tree/master/pkg/intervalpli) Generate PLI on a interval. Useful when no decoder is available.
* [FlexFec](https://github.com/pion/interceptor/tree/master/pkg/flexfec) – [FlexFEC-03](https://datatracker.ietf.org/doc/html/draft-ietf-payload-flexible-fec-scheme-03) encoder implementation

### Planned Interceptors
* Bandwidth Estimation
- [NADA](https://tools.ietf.org/html/rfc8698)
* JitterBuffer, re-order packets and wait for arrival
* [FlexFec](https://tools.ietf.org/html/draft-ietf-payload-flexible-fec-scheme-20)
* [RTCP Feedback for Congestion Control](https://datatracker.ietf.org/doc/html/rfc8888) the standardized alternative to TWCC.

### Interceptor Public API
Expand Down
104 changes: 74 additions & 30 deletions pkg/flexfec/encoder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
package flexfec

import (
"errors"
"sync"

"github.com/pion/interceptor"
"github.com/pion/rtp"
)

// streamState holds the state for a single stream.
type streamState struct {
mu sync.Mutex
flexFecEncoder FlexEncoder
packetBuffer []rtp.Packet
}

// FecInterceptor implements FlexFec.
type FecInterceptor struct {
interceptor.NoOp
flexFecEncoder FlexEncoder
packetBuffer []rtp.Packet
minNumMediaPackets uint32
mu sync.Mutex
streams map[uint32]*streamState
numMediaPackets uint32
numFecPackets uint32
encoderFactory EncoderFactory
}

// FecOption can be used to set initial options on Fec encoder interceptors.
type FecOption func(d *FecInterceptor) error

// FecInterceptorFactory creates new FecInterceptors.
type FecInterceptorFactory struct {
opts []FecOption
Expand All @@ -31,54 +40,89 @@

// NewInterceptor constructs a new FecInterceptor.
func (r *FecInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
// Hardcoded for now:
// Min num media packets to encode FEC -> 5
// Min num fec packets -> 1

interceptor := &FecInterceptor{
packetBuffer: make([]rtp.Packet, 0),
minNumMediaPackets: 5,
streams: make(map[uint32]*streamState),
numMediaPackets: 5,
numFecPackets: 2,
encoderFactory: FlexEncoder03Factory{},
}

for _, opt := range r.opts {
if err := opt(interceptor); err != nil {
return nil, err
}

Check warning on line 53 in pkg/flexfec/encoder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/flexfec/encoder_interceptor.go#L52-L53

Added lines #L52 - L53 were not covered by tests
}

return interceptor, nil
}

// UnbindLocalStream removes the stream state for a specific SSRC.
func (r *FecInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.streams, info.SSRC)

Check warning on line 64 in pkg/flexfec/encoder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/flexfec/encoder_interceptor.go#L60-L64

Added lines #L60 - L64 were not covered by tests
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (r *FecInterceptor) BindLocalStream(
info *interceptor.StreamInfo, writer interceptor.RTPWriter,
) interceptor.RTPWriter {
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
// although we should support configuring the latest (flexfec-20) as well.
r.flexFecEncoder = NewFlexEncoder03(info.PayloadType, info.SSRC)
if info.PayloadTypeForwardErrorCorrection == 0 || info.SSRCForwardErrorCorrection == 0 {
return writer
}

mediaSSRC := info.SSRC

r.mu.Lock()
stream := &streamState{
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
// although we should support configuring the latest (flexfec-20) as well.
flexFecEncoder: r.encoderFactory.NewEncoder(info.PayloadTypeForwardErrorCorrection, info.SSRCForwardErrorCorrection),
packetBuffer: make([]rtp.Packet, 0),
}
r.streams[mediaSSRC] = stream
r.mu.Unlock()

return interceptor.RTPWriterFunc(
func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
r.packetBuffer = append(r.packetBuffer, rtp.Packet{
// Ignore non-media packets
if header.SSRC != mediaSSRC {
return writer.Write(header, payload, attributes)
}

var fecPackets []rtp.Packet
stream.mu.Lock()
stream.packetBuffer = append(stream.packetBuffer, rtp.Packet{
Header: *header,
Payload: payload,
})

// Send the media RTP packet
result, err := writer.Write(header, payload, attributes)
// Check if we have enough packets to generate FEC
if len(stream.packetBuffer) == int(r.numMediaPackets) {
fecPackets = stream.flexFecEncoder.EncodeFec(stream.packetBuffer, r.numFecPackets)
// Reset the packet buffer now that we've sent the corresponding FEC packets.
stream.packetBuffer = nil
}
stream.mu.Unlock()

// Send the FEC packets
var fecPackets []rtp.Packet
if len(r.packetBuffer) == int(r.minNumMediaPackets) {
fecPackets = r.flexFecEncoder.EncodeFec(r.packetBuffer, 2)
var errs []error
result, err := writer.Write(header, payload, attributes)
if err != nil {
errs = append(errs, err)
}

Check warning on line 114 in pkg/flexfec/encoder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/flexfec/encoder_interceptor.go#L113-L114

Added lines #L113 - L114 were not covered by tests

for i := range fecPackets {
fecResult, fecErr := writer.Write(&(fecPackets[i].Header), fecPackets[i].Payload, attributes)
for _, packet := range fecPackets {
header := packet.Header

if fecErr != nil && fecResult == 0 {
break
}
_, err = writer.Write(&header, packet.Payload, attributes)
if err != nil {
errs = append(errs, err)

Check warning on line 121 in pkg/flexfec/encoder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/flexfec/encoder_interceptor.go#L121

Added line #L121 was not covered by tests
}
// Reset the packet buffer now that we've sent the corresponding FEC packets.
r.packetBuffer = nil
}

return result, err
return result, errors.Join(errs...)
},
)
}
Loading
Loading