Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ prost-dto = { version = "0.0.4" }
prost-types = { version = "0.14.1" }
quote = "1"
rand = "0.9.3"
rcgen = "0.13"
regex = { version = "1.12" }
reqwest = { version = "0.12", default-features = false, features = [
"json",
Expand All @@ -231,6 +232,7 @@ rocksdb = { version = "0.46.1", package = "rust-rocksdb", features = [
], git = "https://github.com/restatedev/rust-rocksdb", rev = "dcfba7946697d740e60f0b1060b6624dc1c7e94a" }
rstest = "0.26.1"
rustls = { version = "0.23.35", default-features = false, features = ["ring"] }
rustls-pemfile = "2"
schemars = { version = "1.2", features = ["bytes1"] }
semver = { version = "1.0", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down Expand Up @@ -258,6 +260,7 @@ tokio = { version = "1.48.0", default-features = false, features = [
"macros",
"parking_lot",
] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.17" }
toml = { version = "0.9" }
Expand All @@ -284,6 +287,7 @@ utoipa = { version = "5.4" }
utoipa-axum = "0.2"
uuid = { version = "1.19.0", features = ["v7", "serde"] }
vergen = { version = "8.0.0", default-features = false }
x509-parser = "0.16"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
zstd = { version = "0.13" }

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ where
TaskCenter::with_current(|tc| opts.advertised_address(tc.address_book()))
);

net_util::run_hyper_server(self.listeners, service, || ())
net_util::run_hyper_server(self.listeners, service, || (), None)
.await
.map_err(Into::into)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ static_assertions = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["tracing"] }
tokio-rustls = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tokio-util = { workspace = true, features = ["net"] }
tonic = { workspace = true, features = ["transport", "codegen", "gzip", "zstd", "router"] }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
x509-parser = { workspace = true }
tonic-prost = { workspace = true }
tonic-reflection = { workspace = true }
tower = { workspace = true }
Expand All @@ -81,6 +85,8 @@ restate-metadata-store = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true }

googletest = { workspace = true }
rcgen = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
Expand Down
39 changes: 35 additions & 4 deletions crates/core/src/network/grpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
use futures::Stream;
use http::Uri;
use hyper_util::rt::TokioIo;
use rustls::pki_types::ServerName;
use tokio::io;
use tokio::net::UnixStream;
use tokio_rustls::TlsConnector;
use tokio_stream::StreamExt;
use tonic::codec::CompressionEncoding;
use tonic::transport::Endpoint;
Expand All @@ -26,13 +28,20 @@ use restate_types::net::connect_opts::GrpcConnectionOptions;
use crate::network::grpc::DEFAULT_GRPC_COMPRESSION;
use crate::network::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use crate::network::protobuf::network::Message;
use crate::network::tls::TlsCertResolver;
use crate::network::transport_connector::find_node;
use crate::network::{ConnectError, Destination, Swimlane, TransportConnect};
use crate::{Metadata, TaskCenter, TaskKind};

#[derive(Clone, Default)]
pub struct GrpcConnector {
_private: (),
tls: Option<TlsCertResolver>,
}

impl GrpcConnector {
pub fn new(tls: Option<TlsCertResolver>) -> Self {
Self { tls }
}
}

impl TransportConnect for GrpcConnector {
Expand All @@ -53,7 +62,7 @@ impl TransportConnect for GrpcConnector {

debug!("Connecting to {} at {}", destination, address);
let networking = &Configuration::pinned().networking;
let channel = create_channel(address, swimlane, networking);
let channel = create_channel(address, swimlane, networking, &self.tls);

// Establish the connection
let client = CoreNodeSvcClient::new(channel)
Expand Down Expand Up @@ -85,8 +94,11 @@ fn create_channel<P: ListenerPort + GrpcPort>(
address: AdvertisedAddress<P>,
_swimlane: Swimlane,
options: &NetworkingOptions,
tls: &Option<TlsCertResolver>,
) -> Channel {
let address = address.into_address().expect("valid address");
let use_tls = address.is_tls() && tls.is_some();

let endpoint = match &address {
PeerNetAddress::Uds(_) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Expand All @@ -108,7 +120,6 @@ fn create_channel<P: ListenerPort + GrpcPort>(
.initial_stream_window_size(options.stream_window_size())
.initial_connection_window_size(options.connection_window_size())
.keep_alive_while_idle(true)
// this true by default, but this is to guard against any change in defaults
.tcp_nodelay(true);

match address {
Expand All @@ -120,7 +131,27 @@ fn create_channel<P: ListenerPort + GrpcPort>(
}
}))
}
PeerNetAddress::Http(_) => endpoint.connect_lazy()
PeerNetAddress::Http(uri) if use_tls => {
let resolver = tls.as_ref().unwrap().clone();
endpoint.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
let resolver = resolver.clone();
let host = uri.host().unwrap_or("localhost").to_owned();
let port = uri.port_u16().unwrap_or(5122);
async move {
let addr = format!("{host}:{port}");
let tcp_stream = tokio::net::TcpStream::connect(&addr).await?;
tcp_stream.set_nodelay(true)?;

let client_config = resolver.client_config();
let connector = TlsConnector::from(client_config);
let server_name = ServerName::try_from(host)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let tls_stream = connector.connect(server_name, tcp_stream).await?;
Ok::<_, io::Error>(TokioIo::new(tls_stream))
}
}))
}
PeerNetAddress::Http(_) => endpoint.connect_lazy(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod network_sender;
mod networking;
pub mod protobuf;
mod server_builder;
pub mod tls;
pub mod tonic_service_filter;
mod tracking;
pub mod transport_connector;
Expand Down
Loading
Loading