Skip to content

Commit 8bb6f23

Browse files
committed
Introduce balanced channel in ginepro.
1 parent cf9f0e9 commit 8bb6f23

File tree

11 files changed

+344
-115
lines changed

11 files changed

+344
-115
lines changed

Cargo.lock

Lines changed: 266 additions & 85 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-scheduler/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
lru = { version = "0.13.0", default-features = false }
2222
mock_instant = { version = "0.5.3", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-service/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ bytes = { version = "1.10.1", default-features = false }
2020
futures = { version = "0.3.31", default-features = false }
2121
http-body-util = { version = "0.1.3", default-features = false }
2222
hyper = { version = "1.6.0", default-features = false }
23-
opentelemetry = { version = "0.29.1", default-features = false }
24-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
23+
opentelemetry = { version = "0.30.0", default-features = false }
24+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
2525
"default",
2626
"semconv_experimental",
2727
] }

nativelink-store/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ mongodb = { version = "3", features = [
7575
"compat-3-0-0",
7676
"rustls-tls",
7777
], default-features = false }
78-
opentelemetry = { version = "0.29.1", default-features = false }
78+
opentelemetry = { version = "0.30.0", default-features = false }
7979
parking_lot = { version = "0.12.3", features = [
8080
"arc_lock",
8181
"send_guard",

nativelink-util/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ rust_library(
5656
"@crates//:blake3",
5757
"@crates//:bytes",
5858
"@crates//:futures",
59+
"@crates//:ginepro",
5960
"@crates//:hex",
6061
"@crates//:hyper-1.7.0",
6162
"@crates//:hyper-util",
@@ -84,6 +85,7 @@ rust_library(
8485
"@crates//:tracing",
8586
"@crates//:tracing-opentelemetry",
8687
"@crates//:tracing-subscriber",
88+
"@crates//:url",
8789
"@crates//:uuid",
8890
"@crates//:walkdir",
8991
],

nativelink-util/Cargo.toml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@ hyper = { version = "1.7.0", default-features = false }
2424
hyper-util = { version = "0.1.11", default-features = false }
2525
lru = { version = "0.13.0", default-features = false }
2626
mock_instant = { version = "0.5.3", default-features = false }
27-
opentelemetry = { version = "0.29.0", default-features = false }
28-
opentelemetry-appender-tracing = { version = "0.29.1", default-features = false }
29-
opentelemetry-http = { version = "0.29.0", default-features = false }
30-
opentelemetry-otlp = { version = "0.29.0", default-features = false, features = [
27+
opentelemetry = { version = "0.30.0", default-features = false }
28+
opentelemetry-appender-tracing = { version = "0.30.0", default-features = false }
29+
opentelemetry-http = { version = "0.30.0", default-features = false }
30+
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
3131
"grpc-tonic",
3232
"logs",
3333
"metrics",
3434
"trace",
3535
"zstd-tonic",
3636
] }
37-
opentelemetry-semantic-conventions = { version = "0.29.0", default-features = false, features = [
37+
opentelemetry-semantic-conventions = { version = "0.30.0", default-features = false, features = [
3838
"default",
3939
"semconv_experimental",
4040
] }
41-
opentelemetry_sdk = { version = "0.29.0", default-features = false }
41+
opentelemetry_sdk = { version = "0.30.0", default-features = false }
4242
parking_lot = { version = "0.12.3", features = [
4343
"arc_lock",
4444
"send_guard",
@@ -73,7 +73,7 @@ tonic = { version = "0.13.0", features = [
7373
], default-features = false }
7474
tower = { version = "0.5.2", default-features = false }
7575
tracing = { version = "0.1.41", default-features = false }
76-
tracing-opentelemetry = { version = "0.30.0", default-features = false, features = [
76+
tracing-opentelemetry = { version = "0.31.0", default-features = false, features = [
7777
"metrics",
7878
] }
7979
tracing-subscriber = { version = "0.3.19", features = [
@@ -87,6 +87,8 @@ uuid = { version = "1.16.0", default-features = false, features = [
8787
"v6",
8888
] }
8989
walkdir = { version = "2.5.0", default-features = false }
90+
ginepro = "0.9.0"
91+
url = "2.5.7"
9092

9193
[dev-dependencies]
9294
nativelink-macro = { path = "../nativelink-macro" }

nativelink-util/src/telemetry.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use core::default::Default;
16-
use std::env;
17-
use std::sync::OnceLock;
18-
1915
use base64::Engine;
2016
use base64::prelude::BASE64_STANDARD_NO_PAD;
17+
use core::default::Default;
18+
use ginepro::LoadBalancedChannel;
2119
use hyper::http::Response;
2220
use nativelink_error::{Code, ResultExt, make_err};
2321
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
@@ -26,15 +24,19 @@ use opentelemetry::trace::{TraceContextExt, Tracer, TracerProvider};
2624
use opentelemetry::{KeyValue, global};
2725
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
2826
use opentelemetry_http::HeaderExtractor;
29-
use opentelemetry_otlp::{LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig};
27+
use opentelemetry_otlp::{
28+
LogExporter, MetricExporter, Protocol, SpanExporter, WithExportConfig, WithTonicConfig,
29+
};
3030
use opentelemetry_sdk::Resource;
3131
use opentelemetry_sdk::logs::SdkLoggerProvider;
3232
use opentelemetry_sdk::metrics::SdkMeterProvider;
3333
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
3434
use opentelemetry_sdk::trace::SdkTracerProvider;
3535
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
3636
use prost::Message;
37-
use tracing::debug;
37+
use std::env;
38+
use std::sync::{OnceLock};
39+
use tracing::{debug, info};
3840
use tracing::metadata::LevelFilter;
3941
use tracing_opentelemetry::{MetricsLayer, layer};
4042
use tracing_subscriber::filter::Directive;
@@ -104,7 +106,7 @@ fn tracing_stdout_layer() -> impl Layer<Registry> {
104106
///
105107
/// Returns `Err` if logging was already initialized or if the exporters can't
106108
/// be initialized.
107-
pub fn init_tracing() -> Result<(), nativelink_error::Error> {
109+
pub async fn init_tracing() -> Result<(), nativelink_error::Error> {
108110
static INITIALIZED: OnceLock<()> = OnceLock::new();
109111

110112
if INITIALIZED.get().is_some() {
@@ -129,13 +131,18 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
129131
]);
130132
global::set_text_map_propagator(propagator);
131133

134+
let maybe_channel = maybe_load_balanced_channel().await;
135+
132136
// Logs
137+
let mut log_exporter_builder = LogExporter::builder().with_tonic();
138+
if let Some(channel) = maybe_channel.clone() {
139+
log_exporter_builder = log_exporter_builder.with_channel(channel.into());
140+
}
133141
let otlp_log_layer = OpenTelemetryTracingBridge::new(
134142
&SdkLoggerProvider::builder()
135143
.with_resource(resource.clone())
136144
.with_batch_exporter(
137-
LogExporter::builder()
138-
.with_tonic()
145+
log_exporter_builder
139146
.with_protocol(Protocol::Grpc)
140147
.build()
141148
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -146,13 +153,16 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
146153
.with_filter(otlp_filter());
147154

148155
// Traces
156+
let mut span_exporter_builder = SpanExporter::builder().with_tonic();
157+
if let Some(channel) = maybe_channel.clone() {
158+
span_exporter_builder = span_exporter_builder.with_channel(channel.into());
159+
}
149160
let otlp_trace_layer = layer()
150161
.with_tracer(
151162
SdkTracerProvider::builder()
152163
.with_resource(resource.clone())
153164
.with_batch_exporter(
154-
SpanExporter::builder()
155-
.with_tonic()
165+
span_exporter_builder
156166
.with_protocol(Protocol::Grpc)
157167
.build()
158168
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -164,11 +174,14 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
164174
.with_filter(otlp_filter());
165175

166176
// Metrics
177+
let mut metric_exporter_builder = MetricExporter::builder().with_tonic();
178+
if let Some(channel) = maybe_channel {
179+
metric_exporter_builder = metric_exporter_builder.with_channel(channel.into());
180+
}
167181
let meter_provider = SdkMeterProvider::builder()
168182
.with_resource(resource)
169183
.with_periodic_exporter(
170-
MetricExporter::builder()
171-
.with_tonic()
184+
metric_exporter_builder
172185
.with_protocol(Protocol::Grpc)
173186
.build()
174187
.map_err(|e| make_err!(Code::Internal, "{e}"))
@@ -192,6 +205,36 @@ pub fn init_tracing() -> Result<(), nativelink_error::Error> {
192205
Ok(())
193206
}
194207

208+
const NL_OTEL_ENDPOINT: &str = "NL_OTEL_ENDPOINT";
209+
210+
async fn maybe_load_balanced_channel() -> Option<LoadBalancedChannel> {
211+
match env::var(NL_OTEL_ENDPOINT) {
212+
Ok(endpoint) => {
213+
let url = Url::parse(endpoint.as_str()).map_err(|e| {
214+
make_err!(Code::Internal, "Unable to parse endpoint {endpoint}: {e:?}")
215+
}).unwrap();
216+
217+
let host = url
218+
.host()
219+
.err_tip(|| format!("Unable to get host from endpoint {endpoint}"))
220+
.unwrap();
221+
let port = url
222+
.port()
223+
.err_tip(|| format!("Unable to get port from endpoint {endpoint}"))
224+
.unwrap();
225+
226+
Some(
227+
LoadBalancedChannel::builder((host.to_string(), port))
228+
.channel()
229+
.await
230+
.map_err(|e| make_err!(Code::Internal, "Invalid hostname '{endpoint}': {e}"))
231+
.unwrap(),
232+
)
233+
}
234+
Err(_) => None,
235+
}
236+
}
237+
195238
/// Custom metadata key field for Bazel metadata.
196239
const BAZEL_METADATA_KEY: &str = "bazel.metadata";
197240

@@ -202,6 +245,7 @@ const BAZEL_REQUESTMETADATA_HEADER: &str = "build.bazel.remote.execution.v2.requ
202245

203246
use opentelemetry::baggage::BaggageExt;
204247
use opentelemetry::context::FutureExt;
248+
use url::Url;
205249

206250
#[derive(Debug, Clone)]
207251
pub struct OtlpMiddleware<S> {

nativelink-util/tests/metrics_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
use nativelink_util::action_messages::{ActionResult, ActionStage};
1616
use nativelink_util::metrics::{
17-
CACHE_METRICS, CacheMetricAttrs, EXECUTION_METRICS, ExecutionMetricAttrs, ExecutionStage, WORKER_METRICS,
18-
make_execution_attributes,
17+
CACHE_METRICS, CacheMetricAttrs, EXECUTION_METRICS, ExecutionMetricAttrs, ExecutionStage,
18+
WORKER_METRICS, make_execution_attributes,
1919
};
2020
use opentelemetry::KeyValue;
2121

nativelink-worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bytes = { version = "1.10.1", default-features = false }
2222
filetime = { version = "0.2.25", default-features = false }
2323
formatx = { version = "0.2.3", default-features = false }
2424
futures = { version = "0.3.31", default-features = false }
25-
opentelemetry = { version = "0.29.1", default-features = false }
25+
opentelemetry = { version = "0.30.0", default-features = false }
2626
parking_lot = { version = "0.12.3", default-features = false }
2727
prost = { version = "0.13.5", default-features = false }
2828
relative-path = { version = "2.0.0", default-features = false, features = [

src/bin/nativelink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ fn main() -> Result<(), Box<dyn core::error::Error>> {
718718
// The OTLP exporters need to run in a Tokio context
719719
// Do this first so all the other logging works
720720
#[expect(clippy::disallowed_methods, reason = "tracing init on main runtime")]
721-
runtime.block_on(async { tokio::spawn(async { init_tracing() }).await? })?;
721+
runtime.block_on(async { tokio::spawn(async { init_tracing().await }).await? })?;
722722

723723
let mut cfg = get_config()?;
724724

0 commit comments

Comments
 (0)