Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ var (
dockerEnforceNetworkValidationBool bool
pingInterval time.Duration
pingTimeout time.Duration
udpProxyIdleTimeout time.Duration
publicKey wgtypes.Key
pingStopChan chan struct{}
stopFunc func()
Expand Down Expand Up @@ -261,6 +262,7 @@ func runNewtMain(ctx context.Context) {
dockerSocket = os.Getenv("DOCKER_SOCKET")
pingIntervalStr := os.Getenv("PING_INTERVAL")
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
udpProxyIdleTimeoutStr := os.Getenv("NEWT_UDP_PROXY_IDLE_TIMEOUT")
dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION")
healthFile = os.Getenv("HEALTH_FILE")
// authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE")
Expand Down Expand Up @@ -337,6 +339,9 @@ func runNewtMain(ctx context.Context) {
if pingTimeoutStr == "" {
flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)")
}
if udpProxyIdleTimeoutStr == "" {
flag.StringVar(&udpProxyIdleTimeoutStr, "udp-proxy-idle-timeout", "90s", "Idle timeout for UDP proxied client flows before cleanup")
}
// load the prefer endpoint just as a flag
flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)")
if provisioningKey == "" {
Expand Down Expand Up @@ -386,6 +391,16 @@ func runNewtMain(ctx context.Context) {
pingTimeout = 7 * time.Second
}

if udpProxyIdleTimeoutStr != "" {
udpProxyIdleTimeout, err = time.ParseDuration(udpProxyIdleTimeoutStr)
if err != nil || udpProxyIdleTimeout <= 0 {
fmt.Printf("Invalid NEWT_UDP_PROXY_IDLE_TIMEOUT/--udp-proxy-idle-timeout value: %s, using default 90 seconds\n", udpProxyIdleTimeoutStr)
udpProxyIdleTimeout = 90 * time.Second
}
} else {
udpProxyIdleTimeout = 90 * time.Second
}

if dockerEnforceNetworkValidation == "" {
flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)")
}
Expand Down Expand Up @@ -896,6 +911,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
// Create proxy manager
pm = proxy.NewProxyManager(tnet)
pm.SetAsyncBytes(metricsAsyncBytes)
pm.SetUDPIdleTimeout(udpProxyIdleTimeout)
// Set tunnel_id for metrics (WireGuard peer public key)
pm.SetTunnelID(wgData.PublicKey)

Expand Down
24 changes: 24 additions & 0 deletions proxy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
errUnsupportedProtoFmt = "unsupported protocol: %s"
maxUDPPacketSize = 65507 // Maximum UDP packet size
defaultUDPIdleTimeout = 90 * time.Second
)

// udpBufferPool provides reusable buffers for UDP packet handling.
Expand Down Expand Up @@ -68,6 +69,7 @@ type ProxyManager struct {
tunnels map[string]*tunnelEntry
asyncBytes bool
flushStop chan struct{}
udpIdleTimeout time.Duration
}

// tunnelEntry holds per-tunnel attributes and (optional) async counters.
Expand Down Expand Up @@ -153,6 +155,7 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
tunnels: make(map[string]*tunnelEntry),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}

Expand Down Expand Up @@ -230,6 +233,7 @@ func NewProxyManagerWithoutTNet() *ProxyManager {
udpTargets: make(map[string]map[int]string),
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}

Expand Down Expand Up @@ -366,6 +370,17 @@ func (pm *ProxyManager) SetAsyncBytes(b bool) {
go pm.flushLoop()
}
}

// SetUDPIdleTimeout configures when idle UDP client flows are reclaimed.
func (pm *ProxyManager) SetUDPIdleTimeout(d time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if d <= 0 {
pm.udpIdleTimeout = defaultUDPIdleTimeout
return
}
pm.udpIdleTimeout = d
}
func (pm *ProxyManager) flushLoop() {
flushInterval := 2 * time.Second
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
Expand Down Expand Up @@ -646,6 +661,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err))
continue
}
// Prevent idle UDP client goroutines from living forever and
// retaining large per-connection buffers.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
tunnelID := pm.currentTunnelID
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
Expand Down Expand Up @@ -683,6 +701,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
for {
n, _, err := targetConn.ReadFromUDP(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return
}
// Connection closed is normal during cleanup
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // defer will handle cleanup, result stays "success"
Expand Down Expand Up @@ -725,6 +747,8 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
delete(clientConns, clientKey)
clientsMutex.Unlock()
} else if pm.currentTunnelID != "" && written > 0 {
// Extend idle timeout whenever client traffic is observed.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(written))
Expand Down