Skip to content

Commit 55ae0ba

Browse files
authored
feat(spanv2): Implement basic forwarding of span v2 to Kafka and upstream (#5039)
Implements the `Forward` trait for the span v2 processing pipeline. We will in a follow-up have to improve how we forward spans upstream, ideally we get rid of this additional json roundtrip, but currently that is the easiest way forward.
1 parent d978358 commit 55ae0ba

File tree

4 files changed

+112
-27
lines changed

4 files changed

+112
-27
lines changed

relay-server/src/managed/counted.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use relay_event_schema::protocol::OurLog;
1+
use relay_event_schema::protocol::{OurLog, Span, SpanV2};
2+
use relay_protocol::Annotated;
23
use relay_quotas::DataCategory;
34
use smallvec::SmallVec;
45

@@ -87,6 +88,18 @@ impl Counted for WithHeader<OurLog> {
8788
}
8889
}
8990

91+
impl Counted for WithHeader<SpanV2> {
92+
fn quantities(&self) -> Quantities {
93+
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
94+
}
95+
}
96+
97+
impl Counted for Annotated<Span> {
98+
fn quantities(&self) -> Quantities {
99+
smallvec::smallvec![(DataCategory::Span, 1), (DataCategory::SpanIndexed, 1)]
100+
}
101+
}
102+
90103
impl<T> Counted for &T
91104
where
92105
T: Counted,

relay-server/src/managed/managed.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,20 @@ impl RecordKeeper<'_> {
647647
}
648648
err
649649
}
650+
651+
/// Rejects an item with an internal error.
652+
///
653+
/// See also: [`Managed::internal_error`].
654+
#[track_caller]
655+
pub fn internal_error<E, Q>(&mut self, error: E, q: Q)
656+
where
657+
E: std::error::Error + 'static,
658+
Q: Counted,
659+
{
660+
relay_log::error!(error = &error as &dyn std::error::Error, "internal error");
661+
debug_assert!(false, "internal error: {error}");
662+
self.reject_err((Outcome::Invalid(DiscardReason::Internal), ()), q);
663+
}
650664
}
651665

652666
/// Iterator returned by [`Managed::split`].

relay-server/src/processing/spans/mod.rs

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use relay_event_schema::protocol::SpanV2;
44
use relay_quotas::{DataCategory, RateLimits};
55

66
use crate::Envelope;
7-
use crate::envelope::{ContainerItems, EnvelopeHeaders, Item, ItemType};
7+
use crate::envelope::{
8+
ContainerItems, ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items,
9+
};
810
use crate::managed::{
911
Counted, Managed, ManagedEnvelope, ManagedResult, OutcomeError, Quantities, Rejected,
1012
};
@@ -72,14 +74,14 @@ impl processing::Processor for SpansProcessor {
7274
&self,
7375
envelope: &mut ManagedEnvelope,
7476
) -> Option<Managed<Self::UnitOfWork>> {
75-
let _headers = envelope.envelope().headers().clone();
77+
let headers = envelope.envelope().headers().clone();
7678

7779
let spans = envelope
7880
.envelope_mut()
7981
.take_items_by(|item| matches!(*item.ty(), ItemType::Span))
8082
.into_vec();
8183

82-
let work = SerializedSpans { _headers, spans };
84+
let work = SerializedSpans { headers, spans };
8385
Some(Managed::from_envelope(envelope, work))
8486
}
8587

@@ -127,39 +129,74 @@ pub enum SpanOutput {
127129

128130
impl Forward for SpanOutput {
129131
fn serialize_envelope(self) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
130-
debug_assert!(false, "Not Implemented Yet");
131-
Err(match self {
132-
Self::NotProcessed(spans) => {
133-
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
134-
}
135-
Self::Processed(spans) => {
136-
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
137-
}
138-
})
132+
let spans = match self {
133+
Self::NotProcessed(spans) => spans,
134+
Self::Processed(spans) => spans.try_map(|spans, _| {
135+
spans
136+
.serialize()
137+
.map_err(drop)
138+
.with_outcome(Outcome::Invalid(DiscardReason::Internal))
139+
})?,
140+
};
141+
142+
Ok(spans.map(|spans, _| spans.serialize_envelope()))
139143
}
140144

141145
#[cfg(feature = "processing")]
142146
fn forward_store(
143147
self,
144-
_s: &relay_system::Addr<crate::services::store::Store>,
148+
s: &relay_system::Addr<crate::services::store::Store>,
145149
) -> Result<(), Rejected<()>> {
146-
debug_assert!(false, "Not Implemented Yet");
147-
Err(match self {
148-
Self::NotProcessed(spans) => {
149-
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
150+
use crate::envelope::ContentType;
151+
use crate::services::store::StoreEnvelope;
152+
153+
let spans = match self {
154+
SpanOutput::NotProcessed(spans) => {
155+
return Err(spans.internal_error(
156+
"spans must be processed before they can be forwarded to the store",
157+
));
150158
}
151-
Self::Processed(spans) => {
152-
spans.reject_err((Outcome::Invalid(DiscardReason::Internal), ()))
159+
SpanOutput::Processed(spans) => spans,
160+
};
161+
162+
// Converts all SpanV2 spans into their SpanV1 counterparts and packages them into an
163+
// envelope to forward them.
164+
//
165+
// This is temporary until we have proper mapping code from SpanV2 -> SpanKafka,
166+
// similar to what we do for logs.
167+
let envelope = spans.map(|spans, records| {
168+
let mut items = Items::with_capacity(spans.spans.len());
169+
for span in spans.spans {
170+
let span = span.value.map_value(relay_spans::span_v2_to_span_v1);
171+
172+
let mut item = Item::new(ItemType::Span);
173+
let payload = match span.to_json() {
174+
Ok(payload) => payload,
175+
Err(error) => {
176+
records.internal_error(error, span);
177+
continue;
178+
}
179+
};
180+
item.set_payload(ContentType::Json, payload);
181+
items.push(item);
153182
}
154-
})
183+
184+
Envelope::from_parts(spans.headers, items)
185+
});
186+
187+
s.send(StoreEnvelope {
188+
envelope: ManagedEnvelope::from(envelope).into_processed(),
189+
});
190+
191+
Ok(())
155192
}
156193
}
157194

158195
/// Spans in their serialized state, as transported in an envelope.
159196
#[derive(Debug)]
160197
pub struct SerializedSpans {
161198
/// Original envelope headers.
162-
_headers: EnvelopeHeaders,
199+
headers: EnvelopeHeaders,
163200

164201
/// A list of spans waiting to be processed.
165202
///
@@ -176,6 +213,10 @@ impl SerializedSpans {
176213
let c: u32 = self.spans.iter().filter_map(|item| item.item_count()).sum();
177214
c as usize
178215
}
216+
217+
fn serialize_envelope(self) -> Box<Envelope> {
218+
Envelope::from_parts(self.headers, Items::from_vec(self.spans))
219+
}
179220
}
180221

181222
impl Counted for SerializedSpans {
@@ -196,12 +237,31 @@ impl CountRateLimited for Managed<SerializedSpans> {
196237
#[derive(Debug)]
197238
pub struct ExpandedSpans {
198239
/// Original envelope headers.
199-
_headers: EnvelopeHeaders,
240+
headers: EnvelopeHeaders,
200241

201242
/// Expanded and parsed spans.
202243
spans: ContainerItems<SpanV2>,
203244
}
204245

246+
impl ExpandedSpans {
247+
fn serialize(self) -> Result<SerializedSpans, ContainerWriteError> {
248+
let mut spans = Vec::new();
249+
250+
if !self.spans.is_empty() {
251+
let mut item = Item::new(ItemType::Span);
252+
ItemContainer::from(self.spans)
253+
.write_to(&mut item)
254+
.inspect_err(|err| relay_log::error!("failed to serialize spans: {err}"))?;
255+
spans.push(item);
256+
}
257+
258+
Ok(SerializedSpans {
259+
headers: self.headers,
260+
spans,
261+
})
262+
}
263+
}
264+
205265
impl Counted for ExpandedSpans {
206266
fn quantities(&self) -> Quantities {
207267
let quantity = self.spans.len();

relay-server/src/processing/spans/process.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ use relay_event_schema::protocol::SpanV2;
22

33
use crate::envelope::{ContainerItems, Item, ItemContainer};
44
use crate::managed::Managed;
5-
use crate::processing::spans::{Error, ExpandedSpans, Result};
5+
use crate::processing::spans::{Error, ExpandedSpans, Result, SerializedSpans};
66
use crate::services::outcome::DiscardReason;
77

8-
use super::SerializedSpans;
9-
108
/// Parses all serialized spans.
119
///
1210
/// Individual, invalid spans are discarded.
@@ -21,7 +19,7 @@ pub fn expand(spans: Managed<SerializedSpans>) -> Managed<ExpandedSpans> {
2119
}
2220

2321
ExpandedSpans {
24-
_headers: spans._headers,
22+
headers: spans.headers,
2523
spans: all_spans,
2624
}
2725
})

0 commit comments

Comments
 (0)