Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ jobs:
# https://github.com/rustsec/audit-check/issues/27
run: cargo generate-lockfile --ignore-rust-version

- name: Audit Check
# https://github.com/rustsec/audit-check/issues/2
uses: rustsec/audit-check@master
- name: Audit dependencies
# RUSTSEC-2026-0009 requires upgrading time to >=0.3.47, which raises the MSRV above
# the 1.84.0 toolchain currently exercised by this repository.
uses: actions-rust-lang/audit@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
ignore: RUSTSEC-2026-0009
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ jobs:

- name: Run cargo audit
run: |
[[ ${{ matrix.toolchain }} != 1.91.1 ]] || (cargo install --locked cargo-audit && cargo generate-lockfile --ignore-rust-version && cargo audit)
# RUSTSEC-2026-0009 requires upgrading time to >=0.3.47, which raises the MSRV above
# the 1.84.0 toolchain currently exercised by this repository.
[[ ${{ matrix.toolchain }} != 1.91.1 ]] || (cargo install --locked cargo-audit && cargo generate-lockfile --ignore-rust-version && cargo audit --ignore RUSTSEC-2026-0009)

- name: Run cargo machete
run: |
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] }
h2 = { workspace = true, features = ["unstable"] }
tokio-stream = { version = "0.1", features = ["full"] }
env_logger = "0.11"
reqwest = { version = "0.11", features = [
reqwest = { version = "0.12", features = [
"rustls-tls",
], default-features = false }
hyper = "0.14"
Expand Down
56 changes: 31 additions & 25 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,33 +105,39 @@ where
} else {
match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
#[cfg(unix)]
let raw = socket.as_raw_fd();
#[cfg(windows)]
let raw = socket.as_raw_socket();

if peer.tcp_fast_open() {
set_tcp_fastopen_connect(raw)?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(raw, recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(raw, dscp)?;
}
let connect_future = tcp_connect(
addr,
bind_to.as_ref(),
peer.tcp_mptcp(),
peer.tcp_mptcp_fallback(),
|socket| {
#[cfg(unix)]
let raw = socket.as_raw_fd();
#[cfg(windows)]
let raw = socket.as_raw_socket();

if peer.tcp_fast_open() {
set_tcp_fastopen_connect(raw)?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(raw, recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(raw, dscp)?;
}

if let Some(tweak_hook) = peer
.get_peer_options()
.and_then(|o| o.upstream_tcp_sock_tweak_hook.clone())
{
tweak_hook(socket)?;
}
if let Some(tweak_hook) = peer
.get_peer_options()
.and_then(|o| o.upstream_tcp_sock_tweak_hook.clone())
{
tweak_hook(socket)?;
}

Ok(())
});
Ok(())
},
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
Expand Down
108 changes: 93 additions & 15 deletions pingora-core/src/protocols/l4/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
use libc::socklen_t;
#[cfg(target_os = "linux")]
use libc::{c_int, c_ulonglong, c_void};
use log::debug;
use pingora_error::{Error, ErrorType::*, OrErr, Result};
#[cfg(target_os = "linux")]
use socket2::{Domain, Protocol, Socket, Type};
use std::io::{self, ErrorKind};
use std::mem;
use std::net::SocketAddr;
Expand Down Expand Up @@ -485,38 +488,67 @@ pub fn get_original_dest(_sock: RawSocket) -> Result<Option<SocketAddr>> {
pub(crate) async fn connect_with<F: FnOnce(&TcpSocket) -> Result<()> + Clone>(
addr: &SocketAddr,
bind_to: Option<&BindTo>,
tcp_mptcp: bool,
tcp_mptcp_fallback: bool,
set_socket: F,
) -> Result<TcpStream> {
let connect_result =
connect_with_bind_retry(addr, bind_to, tcp_mptcp, set_socket.clone()).await;

if tcp_mptcp && tcp_mptcp_fallback {
match connect_result {
Ok(stream) => return Ok(stream),
Err(e) => {
debug!("MPTCP connect to {addr} failed, retrying over TCP: {e}");
return connect_with_bind_retry(addr, bind_to, false, set_socket)
.await
.map_err(|mut fallback_error| {
fallback_error.set_cause(e);
fallback_error
});
}
}
}

connect_result
}

async fn connect_with_bind_retry<F: FnOnce(&TcpSocket) -> Result<()> + Clone>(
addr: &SocketAddr,
bind_to: Option<&BindTo>,
tcp_mptcp: bool,
set_socket: F,
) -> Result<TcpStream> {
if bind_to.as_ref().is_some_and(|b| b.will_fallback()) {
// if we see an EADDRNOTAVAIL error clear the port range and try again
let connect_result = inner_connect_with(addr, bind_to, set_socket.clone()).await;
let connect_result = inner_connect_with(addr, bind_to, tcp_mptcp, set_socket.clone()).await;
if let Err(e) = connect_result.as_ref() {
if matches!(e.etype(), BindError) {
let mut new_bind_to = BindTo::default();
new_bind_to.addr = bind_to.as_ref().and_then(|b| b.addr);
// reset the port range
new_bind_to.set_port_range(None).unwrap();
return inner_connect_with(addr, Some(&new_bind_to), set_socket).await;
return inner_connect_with(addr, Some(&new_bind_to), tcp_mptcp, set_socket).await;
}
}
connect_result
} else {
// not retryable
inner_connect_with(addr, bind_to, set_socket).await
inner_connect_with(addr, bind_to, tcp_mptcp, set_socket).await
}
}

async fn inner_connect_with<F: FnOnce(&TcpSocket) -> Result<()>>(
addr: &SocketAddr,
bind_to: Option<&BindTo>,
tcp_mptcp: bool,
set_socket: F,
) -> Result<TcpStream> {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()
let socket = if tcp_mptcp {
new_mptcp_socket(addr)
} else {
TcpSocket::new_v6()
}
.or_err(SocketError, "failed to create socket")?;
new_tcp_socket(addr)
}?;

#[cfg(unix)]
{
Expand Down Expand Up @@ -562,7 +594,45 @@ async fn inner_connect_with<F: FnOnce(&TcpSocket) -> Result<()>>(
/// `IP_BIND_ADDRESS_NO_PORT` is used
/// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`].
pub async fn connect(addr: &SocketAddr, bind_to: Option<&BindTo>) -> Result<TcpStream> {
connect_with(addr, bind_to, |_| Ok(())).await
connect_with(addr, bind_to, false, false, |_| Ok(())).await
}

fn new_tcp_socket(addr: &SocketAddr) -> Result<TcpSocket> {
if addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
.or_err(SocketError, "failed to create socket")
}

#[cfg(target_os = "linux")]
fn new_mptcp_socket(addr: &SocketAddr) -> Result<TcpSocket> {
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};

let socket = Socket::new(
domain,
Type::STREAM,
Some(Protocol::from(libc::IPPROTO_MPTCP)),
)
.or_err(SocketError, "failed to create MPTCP socket")?;
socket
.set_nonblocking(true)
.or_err(SocketError, "failed to set MPTCP socket nonblocking")?;

Ok(TcpSocket::from_std_stream(socket.into()))
}

#[cfg(not(target_os = "linux"))]
fn new_mptcp_socket(_addr: &SocketAddr) -> Result<TcpSocket> {
Error::e_explain(
SocketError,
"MPTCP upstream sockets are only supported on Linux",
)
}

/// connect() to the given Unix domain socket
Expand Down Expand Up @@ -660,16 +730,24 @@ mod test {
use std::time::Instant;

// connect once to make sure their is a SYN cookie to use for TFO
connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| {
set_tcp_fastopen_connect(socket.as_raw_fd())
})
connect_with(
&"1.1.1.1:80".parse().unwrap(),
None,
false,
false,
|socket| set_tcp_fastopen_connect(socket.as_raw_fd()),
)
.await
.unwrap();

let start = Instant::now();
connect_with(&"1.1.1.1:80".parse().unwrap(), None, |socket| {
set_tcp_fastopen_connect(socket.as_raw_fd())
})
connect_with(
&"1.1.1.1:80".parse().unwrap(),
None,
false,
false,
|socket| set_tcp_fastopen_connect(socket.as_raw_fd()),
)
.await
.unwrap();
let connection_time = start.elapsed();
Expand Down
42 changes: 42 additions & 0 deletions pingora-core/src/upstreams/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ pub trait Peer: Display + Clone {
.unwrap_or_default()
}

/// Whether upstream sockets should be opened with MPTCP when supported.
fn tcp_mptcp(&self) -> bool {
self.get_peer_options()
.map(|o| o.tcp_mptcp)
.unwrap_or_default()
}

/// Whether an MPTCP socket creation or connect failure should retry over plain TCP.
fn tcp_mptcp_fallback(&self) -> bool {
self.get_peer_options()
.map(|o| o.tcp_mptcp_fallback)
.unwrap_or(true)
}

#[cfg(unix)]
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
self.address().check_fd_match(fd)
Expand Down Expand Up @@ -448,6 +462,14 @@ pub struct PeerOptions {
pub second_keyshare: bool,
// whether to enable TCP fast open
pub tcp_fast_open: bool,
/// Whether to create upstream sockets with `IPPROTO_MPTCP` when supported.
///
/// MPTCP sockets are currently only supported on Linux. When enabled on other platforms, the
/// connection will fail unless `tcp_mptcp_fallback` is also enabled.
pub tcp_mptcp: bool,
/// Whether to retry the upstream connection with a plain TCP socket when the initial MPTCP
/// attempt fails.
pub tcp_mptcp_fallback: bool,
// use Arc because Clone is required but not allowed in trait object
pub tracer: Option<Tracer>,
// A custom L4 connector to use to establish new L4 connections
Expand Down Expand Up @@ -499,6 +521,8 @@ impl PeerOptions {
curves: None,
second_keyshare: true, // default true and noop when not using PQ curves
tcp_fast_open: false,
tcp_mptcp: false,
tcp_mptcp_fallback: true,
tracer: None,
custom_l4: None,
upstream_tcp_sock_tweak_hook: None,
Expand Down Expand Up @@ -538,6 +562,12 @@ impl Display for PeerOptions {
if let Some(cn) = &self.alternative_cn {
write!(f, "alt_cn: {},", cn)?;
}
if self.tcp_mptcp {
write!(f, "tcp_mptcp: true,")?;
if self.tcp_mptcp_fallback {
write!(f, "tcp_mptcp_fallback: true,")?;
}
}
write!(f, "alpn: {},", self.alpn)?;
if let Some(cas) = &self.ca {
for ca in cas.iter() {
Expand Down Expand Up @@ -783,3 +813,15 @@ impl Display for Proxy {
)
}
}

#[cfg(test)]
mod tests {
use super::PeerOptions;

#[test]
fn peer_options_disable_mptcp_by_default() {
let options = PeerOptions::new();
assert!(!options.tcp_mptcp);
assert!(options.tcp_mptcp_fallback);
}
}
4 changes: 2 additions & 2 deletions pingora-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ regex = "1"
rand = "0.8"

[dev-dependencies]
reqwest = { version = "0.11", features = [
reqwest = { version = "0.12", features = [
"gzip",
"rustls-tls",
], default-features = false }
httparse = { workspace = true }
tokio-test = "0.4"
env_logger = "0.11"
hyper = "0.14"
hyper = { version = "0.14", features = ["full"] }
tokio-tungstenite = "0.20.1"
pingora-limits = { version = "0.8.0", path = "../pingora-limits" }
pingora-load-balancing = { version = "0.8.0", path = "../pingora-load-balancing", default-features=false }
Expand Down
12 changes: 6 additions & 6 deletions pingora-proxy/tests/test_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ async fn test_h2c_to_h2c() {
req.headers_mut()
.insert("x-h2", HeaderValue::from_bytes(b"true").unwrap());
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
assert_eq!(res.status(), hyper::StatusCode::OK);
assert_eq!(res.version(), hyper::Version::HTTP_2);

let body = res.into_body().data().await.unwrap().unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
Expand All @@ -194,8 +194,8 @@ async fn test_h1_on_h2c_port() {
req.headers_mut()
.insert("x-h2", HeaderValue::from_bytes(b"true").unwrap());
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_11);
assert_eq!(res.status(), hyper::StatusCode::OK);
assert_eq!(res.version(), hyper::Version::HTTP_11);

let body = res.into_body().data().await.unwrap().unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
Expand Down Expand Up @@ -307,11 +307,11 @@ async fn test_simple_proxy_uds() {

let res = client.get(url).await.unwrap();

assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.status(), hyper::StatusCode::OK);
let (resp, body) = res.into_parts();

let headers = &resp.headers;
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers[hyper::header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock");
assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS

Expand Down
Loading
Loading