Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ public func run<
return ExecutionRecord(
processIdentifier: result.value.processIdentifier,
terminationStatus: result.terminationStatus,
resourceUsage: result.resourceUsage,
standardOutput: result.value.standardOutput,
standardError: result.value.standardError
)
Expand Down Expand Up @@ -567,6 +568,7 @@ public func run<
return ExecutionRecord(
processIdentifier: result.value.processIdentifier,
terminationStatus: result.terminationStatus,
resourceUsage: result.resourceUsage,
standardOutput: result.value.standardOutput,
standardError: result.value.standardError
)
Expand Down
3 changes: 2 additions & 1 deletion Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ public struct Configuration: Sendable {
// even if `body` throws, and we are not leaving zombie processes in the
// process table which will cause the process termination monitoring thread
// to effectively hang due to the pid never being awaited
let terminationStatus = try await monitorProcessTermination(
let (terminationStatus, resourceUsage) = try await monitorProcessTermination(
for: execution.processIdentifier
)

return ExecutionOutcome(
terminationStatus: terminationStatus,
resourceUsage: resourceUsage,
value: try result.get()
)
} onCleanup: {
Expand Down
15 changes: 7 additions & 8 deletions Sources/Subprocess/Platforms/Subprocess+BSD.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ internal import Dispatch
@Sendable
internal func monitorProcessTermination(
for processIdentifier: ProcessIdentifier
) async throws(SubprocessError) -> TerminationStatus {
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try processIdentifier.reap() }) {
case let .success(status?):
return status
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try processIdentifier.reap() }) {
case let .success(result?):
return result
case .success(nil):
break
case let .failure(error):
Expand All @@ -50,10 +50,9 @@ internal func monitorProcessTermination(

do throws(Errno) {
// NOTE_EXIT may be delivered slightly before the process becomes reapable,
// so we must call waitid without WNOHANG to avoid a narrow possibility of a race condition.
// If waitid does block, it won't do so for very long at all.
let status = try processIdentifier.blockingReap()
continuation.resume(returning: status)
// so we must call wait4 without WNOHANG to avoid a narrow possibility of a race condition.
// If wait4 does block, it won't do so for very long at all.
continuation.resume(returning: try processIdentifier.blockingReap())
} catch {
let subprocessError: SubprocessError = .failedToMonitor(withUnderlyingError: error)
continuation.resume(throwing: subprocessError)
Expand Down
51 changes: 30 additions & 21 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ extension Int32 {
@Sendable
internal func monitorProcessTermination(
for processIdentifier: ProcessIdentifier
) async throws(SubprocessError) -> TerminationStatus {
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
return try await _castError {
return try await withCheckedThrowingContinuation { continuation in
let status = _processMonitorState.withLock { state -> Result<TerminationStatus, SubprocessError>? in
let status = _processMonitorState.withLock { state -> Result<(TerminationStatus, ResourceUsage), SubprocessError>? in
switch state {
case .notStarted:
let error: SubprocessError = .failedToMonitor(withUnderlyingError: nil)
Expand Down Expand Up @@ -123,9 +123,9 @@ internal func monitorProcessTermination(
// Since Linux coalesce signals, it's possible by the time we request
// monitoring the process has already exited. Check to make sure that
// is not the case and only save continuation then.
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try processIdentifier.reap() }) {
case let .success(status?):
return .success(status)
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try processIdentifier.reap() }) {
case let .success(result?):
return .success(result)
case .success(nil):
// Save this continuation to be called by signal handler
var newState = storage
Expand Down Expand Up @@ -158,7 +158,7 @@ private enum ProcessMonitorState {
let epollFileDescriptor: CInt
let shutdownFileDescriptor: CInt
let monitorThread: pthread_t
var continuations: [PlatformFileDescriptor: CheckedContinuation<TerminationStatus, any Error>]
var continuations: [PlatformFileDescriptor: CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>]
}

case notStarted
Expand Down Expand Up @@ -243,8 +243,8 @@ private func monitorThreadFunc(context: MonitorThreadContext) {
let error: SubprocessError = .failedToMonitor(
withUnderlyingError: Errno(rawValue: pwaitErrno)
)
let continuations = _processMonitorState.withLock { state -> [CheckedContinuation<TerminationStatus, any Error>] in
let result: [CheckedContinuation<TerminationStatus, any Error>]
let continuations = _processMonitorState.withLock { state -> [CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>] in
let result: [CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>]
if case .started(let storage) = state {
result = Array(storage.continuations.values)
} else {
Expand Down Expand Up @@ -407,8 +407,17 @@ internal func _setupMonitorSignalHandler() {
}

private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorThreadContext) {
var terminationStatus = Result(catching: { () throws(Errno) in
try TerminationStatus(_waitid(idtype: idtype_t(UInt32(P_PIDFD)), id: id_t(pidfd), flags: WEXITED))
var terminationResult = Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage) in
while true {
var siginfo = siginfo_t()
var usage = rusage()
let rc = linux_waitid(idtype_t(UInt32(P_PIDFD)), id_t(pidfd), &siginfo, WEXITED, &usage)
if rc != -1 {
return (TerminationStatus(siginfo), ResourceUsage(usage))
} else if errno != EINTR {
throw Errno(rawValue: errno)
}
}
}).mapError { underlyingError in
return SubprocessError.failedToMonitor(withUnderlyingError: underlyingError)
}
Expand All @@ -422,14 +431,14 @@ private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorTh
)
if rc != 0 {
let epollErrno = errno
terminationStatus = .failure(
terminationResult = .failure(
SubprocessError.failedToMonitor(
withUnderlyingError: Errno(rawValue: epollErrno)
)
)
}
// Notify the continuation
let continuation = _processMonitorState.withLock { state -> CheckedContinuation<TerminationStatus, any Error>? in
let continuation = _processMonitorState.withLock { state -> CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>? in
guard case .started(let storage) = state,
let continuation = storage.continuations[pidfd]
else {
Expand All @@ -441,13 +450,13 @@ private func _blockAndWaitForProcessDescriptor(_ pidfd: CInt, context: MonitorTh
state = .started(newStorage)
return continuation
}
continuation?.resume(with: terminationStatus)
continuation?.resume(with: terminationResult)
}

// On older kernels, fall back to using signal handlers
private typealias ResultContinuation = (
result: Result<TerminationStatus, SubprocessError>,
continuation: CheckedContinuation<TerminationStatus, any Error>
result: Result<(TerminationStatus, ResourceUsage), SubprocessError>,
continuation: CheckedContinuation<(TerminationStatus, ResourceUsage), any Error>
)
private func _reapAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThreadContext) {
guard signalFd == _signalPipe.readEnd else {
Expand All @@ -467,19 +476,19 @@ private func _reapAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThrea
// Since Linux coalesce signals, we need to loop through all known child process
// to check if they exited.
loop: for (knownChildPID, continuation) in storage.continuations {
let terminationStatus: Result<TerminationStatus, SubprocessError>
switch Result(catching: { () throws(Errno) -> TerminationStatus? in try _reap(pid: knownChildPID) }) {
case let .success(status?):
terminationStatus = .success(status)
let terminationResult: Result<(TerminationStatus, ResourceUsage), SubprocessError>
switch Result(catching: { () throws(Errno) -> (TerminationStatus, ResourceUsage)? in try _reap(pid: knownChildPID) }) {
case let .success(result?):
terminationResult = .success(result)
case .success(nil):
// Move on to the next child
continue loop
case let .failure(error):
terminationStatus = .failure(
terminationResult = .failure(
SubprocessError.failedToMonitor(withUnderlyingError: error)
)
}
results.append((result: terminationStatus, continuation: continuation))
results.append((result: terminationResult, continuation: continuation))
// Now we have the exit code, remove saved continuations
updatedContinuations.removeValue(forKey: knownChildPID)
}
Expand Down
74 changes: 57 additions & 17 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -676,37 +676,63 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
extension ProcessIdentifier {
/// Reaps the zombie for the exited process. This function may block.
@available(*, noasync)
internal func blockingReap() throws(Errno) -> TerminationStatus {
internal func blockingReap() throws(Errno) -> (TerminationStatus, ResourceUsage) {
try _blockingReap(pid: value)
}

/// Reaps the zombie for the exited process, or returns `nil` if the process is still running. This function will not block.
internal func reap() throws(Errno) -> TerminationStatus? {
internal func reap() throws(Errno) -> (TerminationStatus, ResourceUsage)? {
try _reap(pid: value)
}
}

@available(*, noasync)
internal func _blockingReap(pid: pid_t) throws(Errno) -> TerminationStatus {
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED)
return TerminationStatus(siginfo)
}

internal func _reap(pid: pid_t) throws(Errno) -> TerminationStatus? {
let siginfo = try _waitid(idtype: P_PID, id: id_t(pid), flags: WEXITED | WNOHANG)
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
return nil
internal func _blockingReap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage) {
while true {
var usage = rusage()
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
var status: CInt = 0
let rc = wait4(pid, &status, 0, &usage)
#elseif os(Linux) || os(Android)
var siginfo = siginfo_t()
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED, &usage)
#endif
if rc >= 0 {
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
#elseif os(Linux) || os(Android)
return (TerminationStatus(siginfo), ResourceUsage(usage))
#endif
} else if errno != EINTR {
throw Errno(rawValue: errno)
}
}
return TerminationStatus(siginfo)
}

internal func _waitid(idtype: idtype_t, id: id_t, flags: Int32) throws(Errno) -> siginfo_t {
internal func _reap(pid: pid_t) throws(Errno) -> (TerminationStatus, ResourceUsage)? {
while true {
var usage = rusage()
#if os(macOS) || os(FreeBSD) || os(OpenBSD)
var status: CInt = 0
let rc = wait4(pid, &status, WNOHANG, &usage)
if rc > 0 {
return (TerminationStatus(waitStatus: status), ResourceUsage(usage))
} else if rc == 0 {
return nil // Child still running
}
#elseif os(Linux) || os(Android)
var siginfo = siginfo_t()
if waitid(idtype, id, &siginfo, flags) != -1 {
return siginfo
} else if errno != EINTR {
let rc = linux_waitid(P_PID, id_t(pid), &siginfo, WEXITED | WNOHANG, &usage)
if rc != -1 {
// If si_pid and si_signo are both 0, the child is still running since we used WNOHANG
if siginfo.si_pid == 0 && siginfo.si_signo == 0 {
return nil
}
return (TerminationStatus(siginfo), ResourceUsage(usage))
}
#endif
// rc == -1: either EINTR (retry) or a real error
if errno != EINTR {
throw Errno(rawValue: errno)
}
}
Expand All @@ -725,6 +751,20 @@ internal extension TerminationStatus {
}
}

#if os(macOS) || os(FreeBSD) || os(OpenBSD)
internal extension TerminationStatus {
init(waitStatus: CInt) {
if _was_process_exited(waitStatus) != 0 {
self = .exited(CInt(_get_exit_code(waitStatus)))
} else if _was_process_signaled(waitStatus) != 0 {
self = .signaled(CInt(_get_signal_code(waitStatus)))
} else {
fatalError("Unexpected wait status: \(waitStatus)")
}
}
}
#endif

#if os(OpenBSD) || os(Linux) || os(Android)
internal extension siginfo_t {
var si_status: Int32 {
Expand Down
7 changes: 5 additions & 2 deletions Sources/Subprocess/Platforms/Subprocess+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible
@Sendable
internal func monitorProcessTermination(
for processIdentifier: ProcessIdentifier
) async throws(SubprocessError) -> TerminationStatus {
) async throws(SubprocessError) -> (TerminationStatus, ResourceUsage) {
// Once the continuation resumes, it will need to unregister the wait, so
// yield the wait handle back to the calling scope.
var waitHandle: HANDLE?
Expand Down Expand Up @@ -655,14 +655,17 @@ internal func monitorProcessTermination(
}
}

// Collect resource usage while the process handle is still valid
let resourceUsage = ResourceUsage(processHandle: processIdentifier.processDescriptor)

var status: DWORD = 0
guard GetExitCodeProcess(processIdentifier.processDescriptor, &status) else {
throw SubprocessError.failedToMonitor(
withUnderlyingError: .init(rawValue: GetLastError())
)
}

return .exited(status)
return (.exited(status), resourceUsage)
}

// MARK: - Subprocess Control
Expand Down
Loading
Loading