Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/lib/stream/rtsp/rtsp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,55 +163,61 @@ impl RTSPServer {
};

let rtp_caps = rtp_caps.to_string();
// Sanitize path for use as GStreamer element name (remove leading slash, replace special chars)
let path_slug = path.trim_start_matches('/').replace(['/', '.', ' '], "-");
let description = match encode.as_str() {
"H264" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
"shmsrc name=shmsrc-srv-{path_slug} socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue name=q-srv-{path_slug} leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtph264depay",
" ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=-1 pt=96",
),
path_slug = path_slug,
socket_path = socket_path,
rtp_caps = rtp_caps,
)
}
"H265" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
"shmsrc name=shmsrc-srv-{path_slug} socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue name=q-srv-{path_slug} leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtph265depay",
" ! rtph265pay name=pay0 aggregate-mode=zero-latency config-interval=-1 pt=96",
),
path_slug = path_slug,
socket_path = socket_path,
rtp_caps = rtp_caps,
)
}
"RAW" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
"shmsrc name=shmsrc-srv-{path_slug} socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue name=q-srv-{path_slug} leaky=downstream flush-on-eos=true silent=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpvrawdepay",
" ! rtpvrawpay name=pay0 pt=96",
),
path_slug = path_slug,
socket_path = socket_path,
rtp_caps = rtp_caps,
)
}
"JPEG" => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue leaky=downstream flush-on-eos=true silent=true max-size-buffers=10",
"shmsrc name=shmsrc-srv-{path_slug} socket-path={socket_path} do-timestamp=true is-live=false",
" ! queue name=q-srv-{path_slug} leaky=downstream flush-on-eos=true silent=true max-size-buffers=10",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpjpegdepay",
" ! rtpjpegpay name=pay0 pt=96",
),
path_slug = path_slug,
socket_path = socket_path,
rtp_caps = rtp_caps,
)
Expand Down
7 changes: 6 additions & 1 deletion src/lib/stream/sink/image_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl ImageSink {
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<Self> {
let queue = gst::ElementFactory::make("queue")
.name(format!("q-img-{sink_id}"))
.property_from_str("leaky", "downstream") // Throw away any data
.property("silent", true)
.property("flush-on-eos", true)
Expand All @@ -175,8 +176,11 @@ impl ImageSink {

// Create a pair of proxies. The proxysink will be used in the source's pipeline,
// while the proxysrc will be used in this sink's pipeline
let proxysink = gst::ElementFactory::make("proxysink").build()?;
let proxysink = gst::ElementFactory::make("proxysink")
.name(format!("psink-img-{sink_id}"))
.build()?;
let _proxysrc = gst::ElementFactory::make("proxysrc")
.name(format!("psrc-img-{sink_id}"))
.property("proxysink", &proxysink)
.build()?;

Expand All @@ -189,6 +193,7 @@ impl ImageSink {
.find(|element| element.name().starts_with("queue"))
{
Some(element) => {
element.set_property("name", format!("qi-img-{sink_id}").as_str());
element.set_property_from_str("leaky", "downstream"); // Throw away any data
element.set_property("silent", true);
element.set_property("flush-on-eos", true);
Expand Down
2 changes: 2 additions & 0 deletions src/lib/stream/sink/rtsp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl RtspSink {
#[instrument(level = "debug", skip_all)]
pub fn try_new(id: Arc<uuid::Uuid>, addresses: Vec<url::Url>) -> Result<Self> {
let queue = gst::ElementFactory::make("queue")
.name(format!("q-rtsp-{id}"))
.property_from_str("leaky", "downstream") // Throw away any data
.property("silent", true)
.property("flush-on-eos", true)
Expand All @@ -113,6 +114,7 @@ impl RtspSink {
let socket_path = temp_file.path().to_string_lossy().to_string();

let sink = gst::ElementFactory::make("shmsink")
.name(format!("shmsink-rtsp-{id}"))
.property_from_str("socket-path", &socket_path)
.property("sync", false)
.property("wait-for-connection", false)
Expand Down
7 changes: 6 additions & 1 deletion src/lib/stream/sink/udp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl UdpSink {
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<Self> {
let queue = gst::ElementFactory::make("queue")
.name(format!("q-udp-{sink_id}"))
.property_from_str("leaky", "downstream") // Throw away any data
.property("silent", true)
.property("flush-on-eos", true)
Expand All @@ -155,8 +156,11 @@ impl UdpSink {

// Create a pair of proxies. The proxysink will be used in the source's pipeline,
// while the proxysrc will be used in this sink's pipeline
let proxysink = gst::ElementFactory::make("proxysink").build()?;
let proxysink = gst::ElementFactory::make("proxysink")
.name(format!("psink-udp-{sink_id}"))
.build()?;
let _proxysrc = gst::ElementFactory::make("proxysrc")
.name(format!("psrc-udp-{sink_id}"))
.property("proxysink", &proxysink)
.build()?;

Expand All @@ -169,6 +173,7 @@ impl UdpSink {
.find(|element| element.name().starts_with("queue"))
{
Some(element) => {
element.set_property("name", format!("qi-udp-{sink_id}").as_str());
element.set_property_from_str("leaky", "downstream"); // Throw away any data
element.set_property("flush-on-eos", true);
element.set_property("max-size-buffers", 0u32); // Disable buffers
Expand Down
1 change: 1 addition & 0 deletions src/lib/stream/sink/webrtc_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl WebRTCSink {
sender: mpsc::UnboundedSender<Result<Message>>,
) -> Result<Self> {
let queue = gst::ElementFactory::make("queue")
.name(format!("q-wrtc-{}", bind.session_id))
.property_from_str("leaky", "downstream") // Throw away any data
.property("silent", true)
.property("flush-on-eos", true)
Expand Down
6 changes: 5 additions & 1 deletion src/lib/stream/sink/zenoh_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,11 @@ impl ZenohSink {

// Create a pair of proxies. The proxysink will be used in the source's pipeline,
// while the proxysrc will be used in this sink's pipeline
let proxysink = gst::ElementFactory::make("proxysink").build()?;
let proxysink = gst::ElementFactory::make("proxysink")
.name(format!("psink-zenoh-{sink_id}"))
.build()?;
let _proxysrc = gst::ElementFactory::make("proxysrc")
.name(format!("psrc-zenoh-{sink_id}"))
.property("proxysink", &proxysink)
.build()?;

Expand All @@ -152,6 +155,7 @@ impl ZenohSink {
.find(|element| element.name().starts_with("queue"))
{
Some(element) => {
element.set_property("name", format!("qi-zenoh-{sink_id}").as_str());
element.set_property_from_str("leaky", "downstream"); // Throw away any data
element.set_property("silent", true);
element.set_property("flush-on-eos", true);
Expand Down
Loading