Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions zenoh-ext/src/advanced_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,26 @@ impl Default for RepliesConfig {
impl QoSBuilderTrait for RepliesConfig {
#[allow(unused_mut)]
#[zenoh_macros::unstable]
/// Changes the [`CongestionControl`] to apply when routing the data.
fn congestion_control(mut self, congestion_control: CongestionControl) -> Self {
self.congestion_control = congestion_control;
self
}

#[allow(unused_mut)]
#[zenoh_macros::unstable]
/// Changes the [`Priority`] to apply when routing the data.
fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}

#[allow(unused_mut)]
#[zenoh_macros::unstable]
/// Changes the Express policy to apply when routing the data.
///
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but a negative impact on throughput.
fn express(mut self, is_express: bool) -> Self {
self.is_express = is_express;
self
Expand Down
12 changes: 10 additions & 2 deletions zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
/// Set the [`Encoding`]
#[zenoh_macros::unstable]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
Expand All @@ -232,7 +233,7 @@ impl EncodingBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
/// Changes the [`zenoh::qos::CongestionControl`] to apply when routing the data.
/// Changes the [`CongestionControl`] to apply when routing the data.
#[inline]
#[zenoh_macros::unstable]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Expand All @@ -242,7 +243,7 @@ impl QoSBuilderTrait for AdvancedPublisherBuilder<'_, '_, '_> {
}
}

/// Changes the [`zenoh::qos::Priority`] of the written data.
/// Changes the [`Priority`] to apply when routing the data.
#[inline]
#[zenoh_macros::unstable]
fn priority(self, priority: Priority) -> Self {
Expand Down Expand Up @@ -686,6 +687,7 @@ pub struct AdvancedPublicationBuilder<'a, P> {
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderPut> {
/// Set the [`Encoding`]
#[zenoh_macros::unstable]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
Expand All @@ -699,13 +701,18 @@ impl EncodingBuilderTrait for AdvancedPublicationBuilder<'_, PublicationBuilderP
#[zenoh_macros::unstable]
impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[zenoh_macros::unstable]
/// Sets an optional [`SourceInfo`](zenoh::sample::SourceInfo) to be sent along with the publication.
fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
Self {
builder: self.builder.source_info(source_info),
..self
}
}
#[zenoh_macros::unstable]
/// Sets an optional attachment to be sent along with the publication.
///
/// The argument is converted via [`OptionZBytes`], which supports both `T: Into<ZBytes>`
/// and `Option<T>` where `T: Into<ZBytes>`.
fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -718,6 +725,7 @@ impl<P> SampleBuilderTrait for AdvancedPublicationBuilder<'_, P> {
#[zenoh_macros::internal_trait]
#[zenoh_macros::unstable]
impl<P> TimestampBuilderTrait for AdvancedPublicationBuilder<'_, P> {
/// Sets an optional timestamp to be sent along with the publication.
#[zenoh_macros::unstable]
fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl<T> QoSBuilderTrait for PublicationBuilder<PublisherBuilder<'_, '_>, T> {
}
}

/// Changes the [`Priority`](crate::qos::Priority) of the written data.
/// Changes the [`Priority`](crate::qos::Priority) when routing the data.
#[inline]
fn priority(self, priority: Priority) -> Self {
Self {
Expand Down
13 changes: 11 additions & 2 deletions zenoh/src/api/builders/querier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,21 @@ pub struct QuerierBuilder<'a, 'b> {

#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
/// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the request.
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos = self.qos.congestion_control(congestion_control);
Self { qos, ..self }
}

/// Changes the [`Priority`](crate::qos::Priority) of the request.
fn priority(self, priority: Priority) -> Self {
let qos = self.qos.priority(priority);
Self { qos, ..self }
}

/// Changes the Express policy to apply when routing the request.
///
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but a negative impact on throughput.
fn express(self, is_express: bool) -> Self {
let qos = self.qos.express(is_express);
Self { qos, ..self }
Expand Down Expand Up @@ -287,14 +292,17 @@ impl<Handler> CancellationTokenBuilderTrait for QuerierGetBuilder<'_, '_, Handle

#[zenoh_macros::internal_trait]
impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
/// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the query request.
#[zenoh_macros::unstable]
fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
Self {
source_info: source_info.into(),
..self
}
}

/// Sets an optional attachment to be sent along with the query request.
/// The method accepts both values convertible to [`ZBytes`](crate::bytes::ZBytes)
/// and optional values of such types (`Option<T>` where `T: Into<ZBytes>`).
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -306,6 +314,7 @@ impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {

#[zenoh_macros::internal_trait]
impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
/// Set the [`Encoding`]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
let mut value = self.value.unwrap_or_default();
value.1 = encoding.into();
Expand Down
10 changes: 10 additions & 0 deletions zenoh/src/api/builders/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ pub struct SessionGetBuilder<'a, 'b, Handler> {
#[zenoh_macros::internal_trait]
impl<Handler> SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable]
/// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the request/query.
fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
Self {
source_info: source_info.into(),
..self
}
}

/// Sets an optional attachment to be sent along with the request/query.
/// The method accepts both `T` where `T: Into<ZBytes>` and `Option<T>` where `T: Into<ZBytes>` (see [`OptionZBytes`](crate::bytes::OptionZBytes)).
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -103,16 +106,22 @@ impl<Handler> SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> {

#[zenoh_macros::internal_trait]
impl<Handler> QoSBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
/// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the request.
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos = self.qos.congestion_control(congestion_control);
Self { qos, ..self }
}

/// Changes the [`Priority`](crate::qos::Priority) of the request.
fn priority(self, priority: Priority) -> Self {
let qos = self.qos.priority(priority);
Self { qos, ..self }
}

/// Changes the Express policy to apply when routing the request.
///
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but a negative impact on throughput.
fn express(self, is_express: bool) -> Self {
let qos = self.qos.express(is_express);
Self { qos, ..self }
Expand All @@ -121,6 +130,7 @@ impl<Handler> QoSBuilderTrait for SessionGetBuilder<'_, '_, Handler> {

#[zenoh_macros::internal_trait]
impl<Handler> EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
/// Set the [`Encoding`]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
let mut value = self.value.unwrap_or_default();
value.1 = encoding.into();
Expand Down
10 changes: 10 additions & 0 deletions zenoh/src/api/builders/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderDelete> {

#[zenoh_macros::internal_trait]
impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
/// Sets an optional timestamp to be sent along with the reply/response.
fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
Self {
timestamp: timestamp.into(),
Expand All @@ -121,6 +122,8 @@ impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {

#[zenoh_macros::internal_trait]
impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
/// Sets an optional attachment to be sent along with the reply/response.
/// The method accepts any `T` where `T: Into<ZBytes>` or `Option<T>`.
fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -130,6 +133,7 @@ impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
}

#[cfg(feature = "unstable")]
/// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the reply/response.
fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
Self {
source_info: source_info.into(),
Expand All @@ -150,6 +154,10 @@ impl<T> QoSBuilderTrait for ReplyBuilder<'_, '_, T> {
self
}

/// Changes the Express policy to apply when routing the reply.
///
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but a negative impact on throughput.
fn express(self, is_express: bool) -> Self {
let qos = self.qos.express(is_express);
Self { qos, ..self }
Expand All @@ -158,6 +166,7 @@ impl<T> QoSBuilderTrait for ReplyBuilder<'_, '_, T> {

#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
/// Set the [`Encoding`]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
kind: ReplyBuilderPut {
Expand Down Expand Up @@ -243,6 +252,7 @@ impl<'a> ReplyErrBuilder<'a> {

#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for ReplyErrBuilder<'_> {
/// Set the [`Encoding`]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
encoding: encoding.into(),
Expand Down
21 changes: 17 additions & 4 deletions zenoh/src/api/builders/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ use crate::pubsub::{
#[cfg(feature = "unstable")]
use crate::sample::SourceInfo;
pub trait QoSBuilderTrait {
/// Change the `congestion_control` to apply when routing the data.
/// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the data.
fn congestion_control(self, congestion_control: CongestionControl) -> Self;
/// Change the priority of the written data.
/// Changes the [`Priority`](crate::qos::Priority) when routing the data.
fn priority(self, priority: Priority) -> Self;
/// Change the `express` policy to apply when routing the data.
/// When express is set to `true`, then the message will not be batched.
Expand All @@ -49,10 +49,12 @@ pub trait TimestampBuilderTrait {
}

pub trait SampleBuilderTrait {
/// Attach source information
/// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the publication.
#[zenoh_macros::unstable]
fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self;
/// Attach user-provided data in key-value format
/// Sets an optional attachment to be sent along with the publication.
/// The method accepts any `T` where `T: Into<ZBytes>` or `Option<T>` where `T: Into<ZBytes>`.
/// See [`OptionZBytes`](crate::api::bytes::OptionZBytes) for the exact accepted forms.
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self;
}

Expand Down Expand Up @@ -184,6 +186,7 @@ impl<T> SampleBuilder<T> {

#[zenoh_macros::internal_trait]
impl<T> TimestampBuilderTrait for SampleBuilder<T> {
/// Sets an optional timestamp to be sent along with the publication.
fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
Self {
sample: Sample {
Expand All @@ -198,6 +201,7 @@ impl<T> TimestampBuilderTrait for SampleBuilder<T> {
#[zenoh_macros::internal_trait]
impl<T> SampleBuilderTrait for SampleBuilder<T> {
#[zenoh_macros::unstable]
/// Sets an optional [`SourceInfo`](crate::sample::SourceInfo) to be sent along with the publication.
fn source_info<S: Into<Option<SourceInfo>>>(self, source_info: S) -> Self {
Self {
sample: Sample {
Expand All @@ -208,6 +212,8 @@ impl<T> SampleBuilderTrait for SampleBuilder<T> {
}
}

/// Sets an optional attachment to be sent along with the publication.
/// The method accepts both `Into<ZBytes>` and `Option<Into<ZBytes>>`.
fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand All @@ -222,6 +228,7 @@ impl<T> SampleBuilderTrait for SampleBuilder<T> {

#[zenoh_macros::internal_trait]
impl<T> QoSBuilderTrait for SampleBuilder<T> {
/// Changes the [`CongestionControl`](crate::qos::CongestionControl) to apply when routing the data/publication/sample.
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos: QoSBuilder = self.sample.qos.into();
let qos = qos.congestion_control(congestion_control).into();
Expand All @@ -230,6 +237,7 @@ impl<T> QoSBuilderTrait for SampleBuilder<T> {
_t: PhantomData::<T>,
}
}
/// Changes the [`Priority`](crate::qos::Priority) to apply when routing the data.
fn priority(self, priority: Priority) -> Self {
let qos: QoSBuilder = self.sample.qos.into();
let qos = qos.priority(priority).into();
Expand All @@ -238,6 +246,10 @@ impl<T> QoSBuilderTrait for SampleBuilder<T> {
_t: PhantomData::<T>,
}
}
/// Changes the Express policy to apply when routing the data.
///
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but a negative impact on throughput.
fn express(self, is_express: bool) -> Self {
let qos: QoSBuilder = self.sample.qos.into();
let qos = qos.express(is_express).into();
Expand All @@ -250,6 +262,7 @@ impl<T> QoSBuilderTrait for SampleBuilder<T> {

#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for SampleBuilder<SampleBuilderPut> {
/// Set the [`Encoding`]
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
sample: Sample {
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ impl fmt::Debug for PublisherState {
}

/// A publisher that allows sending data through a stream.
/// A Publisher is declared by a [`Session`](crate::api::session::Session) for a given key expression
/// with method [`Session::declare_publisher`](crate::api::session::Session::declare_publisher).
///
/// Publishers are automatically undeclared when dropped.
///
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/api/querier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ impl<'a> Querier<'a> {
&self.key_expr
}

/// Get the `congestion_control` applied when routing the data.
/// Get the [`CongestionControl`] applied when routing the data.
#[inline]
pub fn congestion_control(&self) -> CongestionControl {
self.qos.congestion_control()
}

/// Get the priority of the written data.
/// Get the [`Priority`] of the querier requests.
#[inline]
pub fn priority(&self) -> Priority {
self.qos.priority()
Expand Down
Loading
Loading