From 8fec929d7badb2cb882a0b358b30118bb8f93660 Mon Sep 17 00:00:00 2001 From: Baily Date: Thu, 16 Apr 2026 00:21:38 -0400 Subject: [PATCH 1/5] Add upstream MPTCP socket support --- pingora-core/src/connectors/l4.rs | 56 +++++++------- pingora-core/src/protocols/l4/ext.rs | 108 +++++++++++++++++++++++---- pingora-core/src/upstreams/peer.rs | 42 +++++++++++ 3 files changed, 166 insertions(+), 40 deletions(-) diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index d3baaa63..02bbc735 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -105,33 +105,39 @@ where } else { match peer_addr { SocketAddr::Inet(addr) => { - let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| { - #[cfg(unix)] - let raw = socket.as_raw_fd(); - #[cfg(windows)] - let raw = socket.as_raw_socket(); - - if peer.tcp_fast_open() { - set_tcp_fastopen_connect(raw)?; - } - if let Some(recv_buf) = peer.tcp_recv_buf() { - debug!("Setting recv buf size"); - set_recv_buf(raw, recv_buf)?; - } - if let Some(dscp) = peer.dscp() { - debug!("Setting dscp"); - set_dscp(raw, dscp)?; - } + let connect_future = tcp_connect( + addr, + bind_to.as_ref(), + peer.tcp_mptcp(), + peer.tcp_mptcp_fallback(), + |socket| { + #[cfg(unix)] + let raw = socket.as_raw_fd(); + #[cfg(windows)] + let raw = socket.as_raw_socket(); + + if peer.tcp_fast_open() { + set_tcp_fastopen_connect(raw)?; + } + if let Some(recv_buf) = peer.tcp_recv_buf() { + debug!("Setting recv buf size"); + set_recv_buf(raw, recv_buf)?; + } + if let Some(dscp) = peer.dscp() { + debug!("Setting dscp"); + set_dscp(raw, dscp)?; + } - if let Some(tweak_hook) = peer - .get_peer_options() - .and_then(|o| o.upstream_tcp_sock_tweak_hook.clone()) - { - tweak_hook(socket)?; - } + if let Some(tweak_hook) = peer + .get_peer_options() + .and_then(|o| o.upstream_tcp_sock_tweak_hook.clone()) + { + tweak_hook(socket)?; + } - Ok(()) - }); + Ok(()) + }, + ); let conn_res = match peer.connection_timeout() { Some(t) => pingora_timeout::timeout(t, connect_future) .await diff --git a/pingora-core/src/protocols/l4/ext.rs b/pingora-core/src/protocols/l4/ext.rs index dec725ac..4cdb6212 100644 --- a/pingora-core/src/protocols/l4/ext.rs +++ b/pingora-core/src/protocols/l4/ext.rs @@ -20,7 +20,10 @@ use libc::socklen_t; #[cfg(target_os = "linux")] use libc::{c_int, c_ulonglong, c_void}; +use log::debug; use pingora_error::{Error, ErrorType::*, OrErr, Result}; +#[cfg(target_os = "linux")] +use socket2::{Domain, Protocol, Socket, Type}; use std::io::{self, ErrorKind}; use std::mem; use std::net::SocketAddr; @@ -485,38 +488,67 @@ pub fn get_original_dest(_sock: RawSocket) -> Result> { pub(crate) async fn connect_with Result<()> + Clone>( addr: &SocketAddr, bind_to: Option<&BindTo>, + tcp_mptcp: bool, + tcp_mptcp_fallback: bool, + set_socket: F, +) -> Result { + let connect_result = + connect_with_bind_retry(addr, bind_to, tcp_mptcp, set_socket.clone()).await; + + if tcp_mptcp && tcp_mptcp_fallback { + match connect_result { + Ok(stream) => return Ok(stream), + Err(e) => { + debug!("MPTCP connect to {addr} failed, retrying over TCP: {e}"); + return connect_with_bind_retry(addr, bind_to, false, set_socket) + .await + .or_else(|mut fallback_error| { + fallback_error.set_cause(e); + Err(fallback_error) + }); + } + } + } + + connect_result +} + +async fn connect_with_bind_retry Result<()> + Clone>( + addr: &SocketAddr, + bind_to: Option<&BindTo>, + tcp_mptcp: bool, set_socket: F, ) -> Result { if bind_to.as_ref().is_some_and(|b| b.will_fallback()) { // if we see an EADDRNOTAVAIL error clear the port range and try again - let connect_result = inner_connect_with(addr, bind_to, set_socket.clone()).await; + let connect_result = inner_connect_with(addr, bind_to, tcp_mptcp, set_socket.clone()).await; if let Err(e) = connect_result.as_ref() { if matches!(e.etype(), BindError) { let mut new_bind_to = BindTo::default(); new_bind_to.addr = bind_to.as_ref().and_then(|b| b.addr); // reset the port range new_bind_to.set_port_range(None).unwrap(); - return inner_connect_with(addr, Some(&new_bind_to), set_socket).await; + return inner_connect_with(addr, Some(&new_bind_to), tcp_mptcp, set_socket).await; } } connect_result } else { // not retryable - inner_connect_with(addr, bind_to, set_socket).await + inner_connect_with(addr, bind_to, tcp_mptcp, set_socket).await } } async fn inner_connect_with Result<()>>( addr: &SocketAddr, bind_to: Option<&BindTo>, + tcp_mptcp: bool, set_socket: F, ) -> Result { - let socket = if addr.is_ipv4() { - TcpSocket::new_v4() + let socket = if tcp_mptcp { + new_mptcp_socket(addr) } else { - TcpSocket::new_v6() - } - .or_err(SocketError, "failed to create socket")?; + new_tcp_socket(addr) + }?; #[cfg(unix)] { @@ -562,7 +594,45 @@ async fn inner_connect_with Result<()>>( /// `IP_BIND_ADDRESS_NO_PORT` is used /// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`]. pub async fn connect(addr: &SocketAddr, bind_to: Option<&BindTo>) -> Result { - connect_with(addr, bind_to, |_| Ok(())).await + connect_with(addr, bind_to, false, false, |_| Ok(())).await +} + +fn new_tcp_socket(addr: &SocketAddr) -> Result { + if addr.is_ipv4() { + TcpSocket::new_v4() + } else { + TcpSocket::new_v6() + } + .or_err(SocketError, "failed to create socket") +} + +#[cfg(target_os = "linux")] +fn new_mptcp_socket(addr: &SocketAddr) -> Result { + let domain = if addr.is_ipv4() { + Domain::IPV4 + } else { + Domain::IPV6 + }; + + let socket = Socket::new( + domain, + Type::STREAM, + Some(Protocol::from(libc::IPPROTO_MPTCP)), + ) + .or_err(SocketError, "failed to create MPTCP socket")?; + socket + .set_nonblocking(true) + .or_err(SocketError, "failed to set MPTCP socket nonblocking")?; + + Ok(TcpSocket::from_std_stream(socket.into())) +} + +#[cfg(not(target_os = "linux"))] +fn new_mptcp_socket(_addr: &SocketAddr) -> Result { + Error::e_explain( + SocketError, + "MPTCP upstream sockets are only supported on Linux", + ) } /// connect() to the given Unix domain socket @@ -660,16 +730,24 @@ mod test { use std::time::Instant; // connect once to make sure their is a SYN cookie to use for TFO - connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| { - set_tcp_fastopen_connect(socket.as_raw_fd()) - }) + connect_with( + &"1.1.1.1:80".parse().unwrap(), + None, + false, + false, + |socket| set_tcp_fastopen_connect(socket.as_raw_fd()), + ) .await .unwrap(); let start = Instant::now(); - connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| { - set_tcp_fastopen_connect(socket.as_raw_fd()) - }) + connect_with( + &"1.1.1.1:80".parse().unwrap(), + None, + false, + false, + |socket| set_tcp_fastopen_connect(socket.as_raw_fd()), + ) .await .unwrap(); let connection_time = start.elapsed(); diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index c9ae0a66..a5a68c82 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -255,6 +255,20 @@ pub trait Peer: Display + Clone { .unwrap_or_default() } + /// Whether upstream sockets should be opened with MPTCP when supported. + fn tcp_mptcp(&self) -> bool { + self.get_peer_options() + .map(|o| o.tcp_mptcp) + .unwrap_or_default() + } + + /// Whether an MPTCP socket creation or connect failure should retry over plain TCP. + fn tcp_mptcp_fallback(&self) -> bool { + self.get_peer_options() + .map(|o| o.tcp_mptcp_fallback) + .unwrap_or(true) + } + #[cfg(unix)] fn matches_fd(&self, fd: V) -> bool { self.address().check_fd_match(fd) @@ -448,6 +462,14 @@ pub struct PeerOptions { pub second_keyshare: bool, // whether to enable TCP fast open pub tcp_fast_open: bool, + /// Whether to create upstream sockets with `IPPROTO_MPTCP` when supported. + /// + /// MPTCP sockets are currently only supported on Linux. When enabled on other platforms, the + /// connection will fail unless `tcp_mptcp_fallback` is also enabled. + pub tcp_mptcp: bool, + /// Whether to retry the upstream connection with a plain TCP socket when the initial MPTCP + /// attempt fails. + pub tcp_mptcp_fallback: bool, // use Arc because Clone is required but not allowed in trait object pub tracer: Option, // A custom L4 connector to use to establish new L4 connections @@ -499,6 +521,8 @@ impl PeerOptions { curves: None, second_keyshare: true, // default true and noop when not using PQ curves tcp_fast_open: false, + tcp_mptcp: false, + tcp_mptcp_fallback: true, tracer: None, custom_l4: None, upstream_tcp_sock_tweak_hook: None, @@ -538,6 +562,12 @@ impl Display for PeerOptions { if let Some(cn) = &self.alternative_cn { write!(f, "alt_cn: {},", cn)?; } + if self.tcp_mptcp { + write!(f, "tcp_mptcp: true,")?; + if self.tcp_mptcp_fallback { + write!(f, "tcp_mptcp_fallback: true,")?; + } + } write!(f, "alpn: {},", self.alpn)?; if let Some(cas) = &self.ca { for ca in cas.iter() { @@ -783,3 +813,15 @@ impl Display for Proxy { ) } } + +#[cfg(test)] +mod tests { + use super::PeerOptions; + + #[test] + fn peer_options_disable_mptcp_by_default() { + let options = PeerOptions::new(); + assert!(!options.tcp_mptcp); + assert!(options.tcp_mptcp_fallback); + } +} From 5860540f4dd6d95678470f8df87dff77b6bf7c24 Mon Sep 17 00:00:00 2001 From: Baily Date: Thu, 16 Apr 2026 00:22:25 -0400 Subject: [PATCH 2/5] Document MPTCP usage in proxy example --- pingora/examples/service/proxy.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pingora/examples/service/proxy.rs b/pingora/examples/service/proxy.rs index 1c6a1df9..f2a1ee1b 100644 --- a/pingora/examples/service/proxy.rs +++ b/pingora/examples/service/proxy.rs @@ -19,6 +19,8 @@ use pingora_core::upstreams::peer::BasicPeer; pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service { let proxy_to = BasicPeer::new(proxy_addr); + // On Linux, prefer MPTCP for upstream connects with: + // proxy_to.options.tcp_mptcp = true; Service::with_listeners( "Proxy Service".to_string(), @@ -37,6 +39,8 @@ pub fn proxy_service_tls( let mut proxy_to = BasicPeer::new(proxy_addr); // set SNI to enable TLS proxy_to.sni = proxy_sni.into(); + // On Linux, prefer MPTCP for upstream connects with: + // proxy_to.options.tcp_mptcp = true; Service::with_listeners( "Proxy Service TLS".to_string(), Listeners::tls(addr, cert_path, key_path).unwrap(), From b8aac360fcbee6504c338b92fd3fc795e7f20fba Mon Sep 17 00:00:00 2001 From: Baily Date: Thu, 16 Apr 2026 00:34:37 -0400 Subject: [PATCH 3/5] Fix clippy lint in MPTCP fallback path --- pingora-core/src/protocols/l4/ext.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pingora-core/src/protocols/l4/ext.rs b/pingora-core/src/protocols/l4/ext.rs index 4cdb6212..c47cf23d 100644 --- a/pingora-core/src/protocols/l4/ext.rs +++ b/pingora-core/src/protocols/l4/ext.rs @@ -502,9 +502,9 @@ pub(crate) async fn connect_with Result<()> + Clone>( debug!("MPTCP connect to {addr} failed, retrying over TCP: {e}"); return connect_with_bind_retry(addr, bind_to, false, set_socket) .await - .or_else(|mut fallback_error| { + .map_err(|mut fallback_error| { fallback_error.set_cause(e); - Err(fallback_error) + fallback_error }); } } From a94b5c62de38014b54e81f8582eba4b41498483e Mon Sep 17 00:00:00 2001 From: Baily Date: Thu, 16 Apr 2026 10:31:37 -0400 Subject: [PATCH 4/5] Update reqwest and adjust cargo-audit CI --- .github/workflows/audit.yml | 8 +++++--- .github/workflows/build.yml | 4 +++- pingora-core/Cargo.toml | 2 +- pingora-proxy/Cargo.toml | 2 +- pingora/Cargo.toml | 2 +- 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index 6fe67dea..cac847e5 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -26,8 +26,10 @@ jobs: # https://github.com/rustsec/audit-check/issues/27 run: cargo generate-lockfile --ignore-rust-version - - name: Audit Check - # https://github.com/rustsec/audit-check/issues/2 - uses: rustsec/audit-check@master + - name: Audit dependencies + # RUSTSEC-2026-0009 requires upgrading time to >=0.3.47, which raises the MSRV above + # the 1.84.0 toolchain currently exercised by this repository. + uses: actions-rust-lang/audit@v1 with: token: ${{ secrets.GITHUB_TOKEN }} + ignore: RUSTSEC-2026-0009 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index adf2b014..d2a18d1f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -57,7 +57,9 @@ jobs: - name: Run cargo audit run: | - [[ ${{ matrix.toolchain }} != 1.91.1 ]] || (cargo install --locked cargo-audit && cargo generate-lockfile --ignore-rust-version && cargo audit) + # RUSTSEC-2026-0009 requires upgrading time to >=0.3.47, which raises the MSRV above + # the 1.84.0 toolchain currently exercised by this repository. + [[ ${{ matrix.toolchain }} != 1.91.1 ]] || (cargo install --locked cargo-audit && cargo generate-lockfile --ignore-rust-version && cargo audit --ignore RUSTSEC-2026-0009) - name: Run cargo machete run: | diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index e2854966..2939e613 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -86,7 +86,7 @@ windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] } h2 = { workspace = true, features = ["unstable"] } tokio-stream = { version = "0.1", features = ["full"] } env_logger = "0.11" -reqwest = { version = "0.11", features = [ +reqwest = { version = "0.12", features = [ "rustls-tls", ], default-features = false } hyper = "0.14" diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index d4df1378..fed62f37 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -36,7 +36,7 @@ regex = "1" rand = "0.8" [dev-dependencies] -reqwest = { version = "0.11", features = [ +reqwest = { version = "0.12", features = [ "gzip", "rustls-tls", ], default-features = false } diff --git a/pingora/Cargo.toml b/pingora/Cargo.toml index d9fb57c2..8c11fb18 100644 --- a/pingora/Cargo.toml +++ b/pingora/Cargo.toml @@ -37,7 +37,7 @@ document-features = { version = "0.2.10", optional = true } clap = { version = "4.5", features = ["derive"] } tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } env_logger = "0.11" -reqwest = { version = "0.11", features = ["rustls"], default-features = false } +reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false } hyper = "0.14" async-trait = { workspace = true } http = { workspace = true } From 18e0a84e43c0fd5b5bfec7ef9d1c380859619d14 Mon Sep 17 00:00:00 2001 From: Baily Date: Thu, 16 Apr 2026 10:55:55 -0400 Subject: [PATCH 5/5] Fix pingora-proxy tests after reqwest upgrade --- pingora-proxy/Cargo.toml | 2 +- pingora-proxy/tests/test_basic.rs | 12 ++++++------ pingora-proxy/tests/test_upstream.rs | 14 ++++++++------ 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index fed62f37..7ae14d83 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -43,7 +43,7 @@ reqwest = { version = "0.12", features = [ httparse = { workspace = true } tokio-test = "0.4" env_logger = "0.11" -hyper = "0.14" +hyper = { version = "0.14", features = ["full"] } tokio-tungstenite = "0.20.1" pingora-limits = { version = "0.8.0", path = "../pingora-limits" } pingora-load-balancing = { version = "0.8.0", path = "../pingora-load-balancing", default-features=false } diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index 77303fc3..2b935b6e 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -172,8 +172,8 @@ async fn test_h2c_to_h2c() { req.headers_mut() .insert("x-h2", HeaderValue::from_bytes(b"true").unwrap()); let res = client.request(req).await.unwrap(); - assert_eq!(res.status(), reqwest::StatusCode::OK); - assert_eq!(res.version(), reqwest::Version::HTTP_2); + assert_eq!(res.status(), hyper::StatusCode::OK); + assert_eq!(res.version(), hyper::Version::HTTP_2); let body = res.into_body().data().await.unwrap().unwrap(); assert_eq!(body.as_ref(), b"Hello World!\n"); @@ -194,8 +194,8 @@ async fn test_h1_on_h2c_port() { req.headers_mut() .insert("x-h2", HeaderValue::from_bytes(b"true").unwrap()); let res = client.request(req).await.unwrap(); - assert_eq!(res.status(), reqwest::StatusCode::OK); - assert_eq!(res.version(), reqwest::Version::HTTP_11); + assert_eq!(res.status(), hyper::StatusCode::OK); + assert_eq!(res.version(), hyper::Version::HTTP_11); let body = res.into_body().data().await.unwrap().unwrap(); assert_eq!(body.as_ref(), b"Hello World!\n"); @@ -307,11 +307,11 @@ async fn test_simple_proxy_uds() { let res = client.get(url).await.unwrap(); - assert_eq!(res.status(), reqwest::StatusCode::OK); + assert_eq!(res.status(), hyper::StatusCode::OK); let (resp, body) = res.into_parts(); let headers = &resp.headers; - assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers[hyper::header::CONTENT_LENGTH], "13"); assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock"); assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 9ae4511e..31f1f6a2 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -25,7 +25,9 @@ use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::timeout; -use tokio_tungstenite::tungstenite::{client::IntoClientRequest, Message}; +use tokio_tungstenite::tungstenite::{ + client::IntoClientRequest, http::HeaderValue as WsHeaderValue, Message, +}; #[tokio::test] async fn test_ip_binding() { @@ -144,7 +146,7 @@ async fn test_ws_server_ends_conn() { let mut req = "ws://127.0.0.1:6147".into_client_request().unwrap(); req.headers_mut() - .insert("x-port", HeaderValue::from_static("9283")); + .insert("x-port", WsHeaderValue::from_static("9283")); let (mut ws_stream, _) = tokio_tungstenite::connect_async(req).await.unwrap(); // gracefully close connection @@ -161,7 +163,7 @@ async fn test_ws_server_ends_conn() { let mut req = "ws://127.0.0.1:6147".into_client_request().unwrap(); req.headers_mut() - .insert("x-port", HeaderValue::from_static("9283")); + .insert("x-port", WsHeaderValue::from_static("9283")); let (mut ws_stream, _) = tokio_tungstenite::connect_async(req).await.unwrap(); // abrupt close connection @@ -173,7 +175,7 @@ async fn test_ws_server_ends_conn() { let mut req = "ws://127.0.0.1:6147".into_client_request().unwrap(); req.headers_mut() - .insert("x-port", HeaderValue::from_static("9283")); + .insert("x-port", WsHeaderValue::from_static("9283")); let (mut ws_stream, _) = tokio_tungstenite::connect_async(req).await.unwrap(); ws_stream.send("test".into()).await.unwrap(); @@ -374,7 +376,7 @@ async fn test_download_timeout() { .body(hyper::Body::empty()) .unwrap(); let mut res = client.request(req).await.unwrap(); - assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.status(), hyper::StatusCode::OK); let mut err = false; sleep(Duration::from_secs(2)).await; @@ -401,7 +403,7 @@ async fn test_download_timeout_min_rate() { .body(hyper::Body::empty()) .unwrap(); let mut res = client.request(req).await.unwrap(); - assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.status(), hyper::StatusCode::OK); let mut err = false; sleep(Duration::from_secs(2)).await;