diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 2cf74173..3f338e13 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -41,7 +41,7 @@ To resolve this, manually link the upstream and pull all tags:: git remote add upstream https://github.com/con/duct git fetch upstream - + Testing ------- diff --git a/README.md b/README.md index 1da438e0..b3d33490 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,9 @@ usage: con-duct run [-h] [-l {NONE,CRITICAL,ERROR,WARNING,INFO,DEBUG}] [-q] [-o {all,none,stdout,stderr}] [-t {all,system-summary,processes-samples}] [-m MESSAGE] [--mode {new-session,current-session}] + [--instruments INSTRUMENTS] + [--gpu-sample-interval GPU_SAMPLE_INTERVAL] + [--gpu-timeout GPU_TIMEOUT] command [command_args ...] ... duct is a lightweight wrapper that collects execution data for an arbitrary @@ -105,6 +108,9 @@ environment variables: DUCT_REPORT_INTERVAL: see --report-interval DUCT_CAPTURE_OUTPUTS: see --capture-outputs DUCT_MESSAGE: see --message + DUCT_INSTRUMENTS: see --instruments (e.g., "cpu,mem,gpu" or "all") + DUCT_GPU_SAMPLE_INTERVAL: see --gpu-sample-interval + DUCT_GPU_TIMEOUT: see --gpu-timeout DUCT_CONFIG_PATHS: paths to .env files separated by platform path separator (':' on Unix) (see below) @@ -235,6 +241,21 @@ options: current session instead of starting a new one. Useful for tracking slurm jobs or other commands that should run in the current session. (default: new-session) + --instruments INSTRUMENTS + Comma-separated list of instruments to enable: cpu, + mem, gpu, or 'all'. You can also provide value via + DUCT_INSTRUMENTS env variable. (default: cpu,mem) + (default: {, }) + --gpu-sample-interval GPU_SAMPLE_INTERVAL + Interval in seconds between GPU status checks. If not + specified or 0, uses --sample-interval. Useful when + nvidia-smi calls are slow. You can also provide value + via DUCT_GPU_SAMPLE_INTERVAL env variable. (default: + 0.0) + --gpu-timeout GPU_TIMEOUT + Timeout in seconds for nvidia-smi calls. (default: + 5.0) (default: 5.0) ``` diff --git a/src/con_duct/cli.py b/src/con_duct/cli.py index 0db11781..aa537024 100644 --- a/src/con_duct/cli.py +++ b/src/con_duct/cli.py @@ -10,11 +10,13 @@ from con_duct.duct_main import ( DUCT_OUTPUT_PREFIX, EXECUTION_SUMMARY_FORMAT, + GPU_SAMPLE_TIMEOUT, Outputs, RecordTypes, SessionMode, ) from con_duct.duct_main import execute as duct_execute +from con_duct.duct_main import instruments_from_str from con_duct.ls import LS_FIELD_CHOICES, ls from con_duct.plot import matplotlib_plot from con_duct.pprint_json import pprint_json @@ -146,6 +148,9 @@ def _replay_early_logs(log_buffer: List[tuple[str, str]]) -> None: DUCT_REPORT_INTERVAL: see --report-interval DUCT_CAPTURE_OUTPUTS: see --capture-outputs DUCT_MESSAGE: see --message + DUCT_INSTRUMENTS: see --instruments (e.g., "cpu,mem,gpu" or "all") + DUCT_GPU_SAMPLE_INTERVAL: see --gpu-sample-interval + DUCT_GPU_TIMEOUT: see --gpu-timeout DUCT_CONFIG_PATHS: paths to .env files separated by platform path separator (':' on Unix) (see below) @@ -366,6 +371,30 @@ def _create_run_parser() -> argparse.ArgumentParser: "'current-session' tracks the current session instead of starting a new one. " "Useful for tracking slurm jobs or other commands that should run in the current session.", ) + parser.add_argument( + "--instruments", + type=instruments_from_str, + default=instruments_from_str(os.getenv("DUCT_INSTRUMENTS", "cpu,mem")), + help="Comma-separated list of instruments to enable: cpu, mem, gpu, or 'all'. " + "You can also provide value via DUCT_INSTRUMENTS env variable. " + "(default: cpu,mem)", + ) + parser.add_argument( + "--gpu-sample-interval", + type=float, + default=float(os.getenv("DUCT_GPU_SAMPLE_INTERVAL", "0")), + help="Interval in seconds between GPU status checks. " + "If not specified or 0, uses --sample-interval. " + "Useful when nvidia-smi calls are slow. " + "You can also provide value via DUCT_GPU_SAMPLE_INTERVAL env variable.", + ) + parser.add_argument( + "--gpu-timeout", + type=float, + default=float(os.getenv("DUCT_GPU_TIMEOUT", str(GPU_SAMPLE_TIMEOUT))), + help="Timeout in seconds for nvidia-smi calls. " + f"(default: {GPU_SAMPLE_TIMEOUT})", + ) return parser diff --git a/src/con_duct/duct_main.py b/src/con_duct/duct_main.py index 591fd469..7c89a937 100755 --- a/src/con_duct/duct_main.py +++ b/src/con_duct/duct_main.py @@ -25,7 +25,7 @@ import warnings __version__ = version("con-duct") -__schema_version__ = "0.2.2" +__schema_version__ = "0.3.0" _true_set = {"yes", "true", "t", "y", "1"} _false_set = {"no", "false", "f", "n", "0"} @@ -131,6 +131,56 @@ def __str__(self) -> str: return self.value +class Instruments(str, Enum): + """Instruments to enable for monitoring.""" + + CPU = "cpu" + MEM = "mem" + GPU = "gpu" + + def __str__(self) -> str: + return self.value + + @classmethod + def parse_list(cls, value: str) -> set[Instruments]: + """Parse comma-separated instrument list. + + Args: + value: Comma-separated string like "cpu,mem,gpu" or "all" + + Returns: + Set of Instruments enum values + + Raises: + ValueError: If invalid instrument specified + """ + if value.lower() == "all": + return {cls.CPU, cls.MEM, cls.GPU} + + instruments: set[Instruments] = set() + for item in value.lower().split(","): + item = item.strip() + if not item: + continue + try: + instruments.add(cls(item)) + except ValueError: + valid = ", ".join(i.value for i in cls) + raise ValueError( + f"Invalid instrument '{item}'. Valid options: {valid}, all" + ) + return instruments + + +def instruments_from_str(value: str) -> set[Instruments]: + """Type converter for argparse.""" + return Instruments.parse_list(value) + + +# Default timeout for nvidia-smi calls +GPU_SAMPLE_TIMEOUT = 5.0 + + @dataclass class SystemInfo: cpu_total: int @@ -289,6 +339,167 @@ def from_sample(cls, sample: Sample) -> Averages: ) +@dataclass +class GpuStats: + """Statistics for a single GPU device.""" + + index: int + utilization_gpu: float # GPU utilization percentage (0-100) + utilization_memory: float # Memory utilization percentage (0-100) + memory_used: int # Memory used in bytes + memory_total: int # Total memory in bytes + timestamp: str + + def aggregate(self, other: GpuStats) -> GpuStats: + """Aggregate with another sample, keeping peak values.""" + return GpuStats( + index=self.index, + utilization_gpu=max(self.utilization_gpu, other.utilization_gpu), + utilization_memory=max(self.utilization_memory, other.utilization_memory), + memory_used=max(self.memory_used, other.memory_used), + memory_total=max(self.memory_total, other.memory_total), + timestamp=max(self.timestamp, other.timestamp), + ) + + def for_json(self) -> dict[str, Any]: + return asdict(self) + + def __post_init__(self) -> None: + assert_num( + self.utilization_gpu, + self.utilization_memory, + self.memory_used, + self.memory_total, + ) + + +@dataclass +class GpuAverages: + """Running averages for GPU metrics.""" + + utilization_gpu: Optional[float] = None + utilization_memory: Optional[float] = None + memory_used: Optional[float] = None + num_samples: int = 0 + + def update(self, sample: GpuSample) -> None: + """Update running averages with new sample.""" + if not self.num_samples: + self.num_samples = 1 + self.utilization_gpu = sample.total_utilization_gpu + self.utilization_memory = sample.total_utilization_memory + self.memory_used = float(sample.total_memory_used or 0) + else: + self.num_samples += 1 + n = self.num_samples + if ( + sample.total_utilization_gpu is not None + and self.utilization_gpu is not None + ): + self.utilization_gpu += ( + sample.total_utilization_gpu - self.utilization_gpu + ) / n + if ( + sample.total_utilization_memory is not None + and self.utilization_memory is not None + ): + self.utilization_memory += ( + sample.total_utilization_memory - self.utilization_memory + ) / n + if sample.total_memory_used is not None and self.memory_used is not None: + self.memory_used += (sample.total_memory_used - self.memory_used) / n + + +@dataclass +class GpuSample: + """Sample of GPU statistics across all GPUs.""" + + stats: dict[int, GpuStats] = field(default_factory=dict) + averages: GpuAverages = field(default_factory=GpuAverages) + total_utilization_gpu: Optional[float] = None # Sum across all GPUs + total_utilization_memory: Optional[float] = None + total_memory_used: Optional[int] = None + total_memory_total: Optional[int] = None + timestamp: str = "" + + def add_gpu(self, gpu_index: int, stats: GpuStats) -> None: + """Add stats for a GPU device.""" + assert gpu_index not in self.stats + self.total_utilization_gpu = ( + self.total_utilization_gpu or 0.0 + ) + stats.utilization_gpu + self.total_utilization_memory = ( + self.total_utilization_memory or 0.0 + ) + stats.utilization_memory + self.total_memory_used = (self.total_memory_used or 0) + stats.memory_used + self.total_memory_total = (self.total_memory_total or 0) + stats.memory_total + + self.stats[gpu_index] = stats + self.timestamp = max(self.timestamp, stats.timestamp) + + def aggregate(self, other: GpuSample) -> GpuSample: + """Aggregate with another sample, keeping peak values.""" + output = GpuSample() + for gpu_idx in self.stats.keys() | other.stats.keys(): + if (mine := self.stats.get(gpu_idx)) is not None: + if (theirs := other.stats.get(gpu_idx)) is not None: + output.add_gpu(gpu_idx, mine.aggregate(theirs)) + else: + output.add_gpu(gpu_idx, mine) + else: + output.add_gpu(gpu_idx, other.stats[gpu_idx]) + + output.total_utilization_gpu = max( + self.total_utilization_gpu or 0.0, + other.total_utilization_gpu or 0.0, + ) + output.total_utilization_memory = max( + self.total_utilization_memory or 0.0, + other.total_utilization_memory or 0.0, + ) + output.total_memory_used = max( + self.total_memory_used or 0, + other.total_memory_used or 0, + ) + output.total_memory_total = max( + self.total_memory_total or 0, + other.total_memory_total or 0, + ) + output.averages = self.averages + output.averages.update(other) + return output + + @classmethod + def from_stats(cls, stats: dict[int, GpuStats]) -> GpuSample: + """Create a GpuSample from a dict of GpuStats.""" + sample = cls() + for gpu_idx, gpu_stats in stats.items(): + sample.add_gpu(gpu_idx, gpu_stats) + sample.averages = GpuAverages( + utilization_gpu=sample.total_utilization_gpu, + utilization_memory=sample.total_utilization_memory, + memory_used=float(sample.total_memory_used or 0), + num_samples=1, + ) + return sample + + def for_json(self) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "num_samples": self.averages.num_samples, + "gpus": {str(idx): stats.for_json() for idx, stats in self.stats.items()}, + "totals": { + "utilization_gpu": self.total_utilization_gpu, + "utilization_memory": self.total_utilization_memory, + "memory_used": self.total_memory_used, + "memory_total": self.total_memory_total, + }, + "averages": ( + asdict(self.averages) if self.averages.num_samples >= 1 else {} + ), + } + + @dataclass class Sample: stats: dict[int, ProcessStats] = field(default_factory=dict) @@ -298,6 +509,7 @@ class Sample: total_pmem: Optional[float] = None total_pcpu: Optional[float] = None timestamp: str = "" # TS of last sample collected + gpu_sample: Optional[GpuSample] = None # Optional GPU metrics def add_pid(self, pid: int, stats: ProcessStats) -> None: # We do not calculate averages when we add a pid because we require all pids first @@ -331,10 +543,19 @@ def aggregate(self: Sample, other: Sample) -> Sample: output.total_vsz = max(self.total_vsz or 0, other.total_vsz) output.averages = self.averages output.averages.update(other) + + # Aggregate GPU samples if present + if self.gpu_sample is not None and other.gpu_sample is not None: + output.gpu_sample = self.gpu_sample.aggregate(other.gpu_sample) + elif other.gpu_sample is not None: + output.gpu_sample = other.gpu_sample + elif self.gpu_sample is not None: + output.gpu_sample = self.gpu_sample + return output def for_json(self) -> dict[str, Any]: - d = { + d: dict[str, Any] = { "timestamp": self.timestamp, "num_samples": self.averages.num_samples, "processes": { @@ -348,6 +569,9 @@ def for_json(self) -> dict[str, Any]: }, "averages": asdict(self.averages) if self.averages.num_samples >= 1 else {}, } + # Include GPU data if present + if self.gpu_sample is not None: + d["gpu"] = self.gpu_sample.for_json() return d @@ -471,6 +695,73 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]: _get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment] +def _get_gpu_sample(timeout: float = GPU_SAMPLE_TIMEOUT) -> Optional[GpuSample]: + """Collect GPU utilization metrics via nvidia-smi. + + Args: + timeout: Maximum seconds to wait for nvidia-smi response + + Returns: + GpuSample with stats for all GPUs, or None if unavailable/failed + """ + if shutil.which("nvidia-smi") is None: + lgr.debug("nvidia-smi not found, skipping GPU sample") + return None + + nvidia_smi_command = [ + "nvidia-smi", + "--query-gpu=index,utilization.gpu,utilization.memory,memory.used,memory.total", + "--format=csv,noheader,nounits", + ] + + try: + output = subprocess.check_output( + nvidia_smi_command, + text=True, + timeout=timeout, + ) + except subprocess.TimeoutExpired: + lgr.warning("nvidia-smi timed out after %.1f seconds", timeout) + return None + except subprocess.CalledProcessError as exc: + lgr.debug("nvidia-smi failed: %s", exc) + return None + except FileNotFoundError: + lgr.debug("nvidia-smi not found") + return None + + stats: dict[int, GpuStats] = {} + timestamp = datetime.now().astimezone().isoformat() + + for line in output.strip().splitlines(): + if not line.strip(): + continue + try: + parts = [p.strip() for p in line.split(",")] + if len(parts) != 5: + lgr.debug("Unexpected nvidia-smi output format: %s", line) + continue + + index, util_gpu, util_mem, mem_used, mem_total = parts + + stats[int(index)] = GpuStats( + index=int(index), + utilization_gpu=float(util_gpu), + utilization_memory=float(util_mem), + memory_used=int(mem_used) * 1024 * 1024, # MiB to bytes + memory_total=int(mem_total) * 1024 * 1024, # MiB to bytes + timestamp=timestamp, + ) + except (ValueError, IndexError) as exc: + lgr.debug("Error parsing nvidia-smi line '%s': %s", line, exc) + continue + + if not stats: + return None + + return GpuSample.from_stats(stats) + + class Report: """Top level report""" @@ -485,6 +776,7 @@ def __init__( clobber: bool = False, process: subprocess.Popen | None = None, message: str = "", + instruments: set[Instruments] | None = None, ) -> None: self._command = command self.arguments = arguments @@ -493,6 +785,7 @@ def __init__( self.clobber = clobber self.colors = colors self.message = message + self.instruments = instruments or {Instruments.CPU, Instruments.MEM} # Defaults to be set later self.start_time: float | None = None self.process = process @@ -507,6 +800,9 @@ def __init__( self.run_time_seconds: str | None = None self.usage_file: TextIO | None = None self.working_directory: str = working_directory + # GPU run stats (when GPU monitoring is enabled) + self.full_gpu_stats: Optional[GpuSample] = None + self.current_gpu_sample: Optional[GpuSample] = None def __del__(self) -> None: safe_close_files([self.usage_file]) @@ -597,6 +893,19 @@ def update_from_sample(self, sample: Sample) -> None: self.current_sample = self.current_sample.aggregate(sample) assert self.current_sample is not None + # Track GPU stats separately for execution summary + if sample.gpu_sample is not None: + if self.full_gpu_stats is None: + self.full_gpu_stats = sample.gpu_sample + else: + self.full_gpu_stats = self.full_gpu_stats.aggregate(sample.gpu_sample) + if self.current_gpu_sample is None: + self.current_gpu_sample = sample.gpu_sample + else: + self.current_gpu_sample = self.current_gpu_sample.aggregate( + sample.gpu_sample + ) + def write_subreport(self) -> None: assert self.current_sample is not None if self.usage_file is None: @@ -611,7 +920,7 @@ def execution_summary(self) -> dict[str, Any]: if self.process and self.process.returncode < 0: self.process.returncode = 128 + abs(self.process.returncode) # prepare the base, but enrich if we did get process running - return { + summary: dict[str, Any] = { "exit_code": self.process.returncode if self.process else None, "command": self.command, "logs_prefix": self.log_paths.prefix if self.log_paths else "", @@ -631,6 +940,27 @@ def execution_summary(self) -> dict[str, Any]: "working_directory": self.working_directory, } + # Add GPU summary if GPU monitoring was enabled and we have data + if Instruments.GPU in self.instruments and self.full_gpu_stats is not None: + summary.update( + { + "peak_gpu_utilization": self.full_gpu_stats.total_utilization_gpu, + "average_gpu_utilization": ( + self.full_gpu_stats.averages.utilization_gpu + if self.full_gpu_stats.averages + else None + ), + "peak_gpu_memory_used": self.full_gpu_stats.total_memory_used, + "average_gpu_memory_used": ( + self.full_gpu_stats.averages.memory_used + if self.full_gpu_stats.averages + else None + ), + } + ) + + return summary + def dump_json(self) -> str: return json.dumps( { @@ -817,6 +1147,9 @@ def monitor_process( report_interval: float, sample_interval: float, stop_event: threading.Event, + instruments: set[Instruments] | None = None, + gpu_sample_interval: float | None = None, + gpu_timeout: float = GPU_SAMPLE_TIMEOUT, ) -> None: lgr.debug( "Starting monitoring of the process %s on sample interval %f for report interval %f", @@ -824,13 +1157,42 @@ def monitor_process( sample_interval, report_interval, ) + + # Default instruments to CPU and MEM + if instruments is None: + instruments = {Instruments.CPU, Instruments.MEM} + + # GPU sampling state + gpu_enabled = Instruments.GPU in instruments + gpu_interval = gpu_sample_interval if gpu_sample_interval else sample_interval + last_gpu_sample_time = 0.0 + while True: if process.poll() is not None: lgr.debug( "Breaking out of the monitor since the passthrough command has finished" ) break - sample = report.collect_sample() + + # Collect CPU/mem sample if requested + sample: Sample | None = None + if Instruments.CPU in instruments or Instruments.MEM in instruments: + sample = report.collect_sample() + else: + # Create empty sample for GPU-only case + sample = Sample() + sample.timestamp = datetime.now().astimezone().isoformat() + + # Collect GPU sample at its own interval + if gpu_enabled: + current_time = time.time() + if current_time - last_gpu_sample_time >= gpu_interval: + gpu_sample = _get_gpu_sample(timeout=gpu_timeout) + if gpu_sample is not None: + + sample.gpu_sample = gpu_sample + last_gpu_sample_time = current_time + # Report averages should be updated prior to sample aggregation if ( sample is None @@ -843,6 +1205,14 @@ def monitor_process( break # process is still running, but we could not collect sample continue + + # Skip empty samples (no CPU/mem and no GPU data) + if not sample.stats and sample.gpu_sample is None: + if stop_event.wait(timeout=sample_interval): + lgr.debug("Breaking out because stop event was set") + break + continue + report.update_from_sample(sample) if ( report.start_time @@ -850,6 +1220,7 @@ def monitor_process( ): report.write_subreport() report.current_sample = None + report.current_gpu_sample = None report.number += 1 if stop_event.wait(timeout=sample_interval): lgr.debug("Breaking out because stop event was set") @@ -1001,11 +1372,25 @@ def execute( colors: bool, mode: SessionMode, message: str = "", + instruments: set[Instruments] | None = None, + gpu_sample_interval: float = 0.0, + gpu_timeout: float = GPU_SAMPLE_TIMEOUT, ) -> int: """A wrapper to execute a command, monitor and log the process details. Returns exit code of the executed process. """ + # Default instruments + if instruments is None: + instruments = {Instruments.CPU, Instruments.MEM} + + # Warn if GPU monitoring requested but nvidia-smi not available + if Instruments.GPU in instruments: + if shutil.which("nvidia-smi") is None: + lgr.warning( + "GPU monitoring requested but nvidia-smi not found. " + "GPU metrics will not be collected." + ) if report_interval < sample_interval: raise ValueError( "--report-interval must be greater than or equal to --sample-interval." @@ -1038,6 +1423,7 @@ def execute( colors, clobber, message=message, + instruments=instruments, ) files_to_close.append(report.usage_file) @@ -1077,15 +1463,20 @@ def execute( pass stop_event = threading.Event() if record_types.has_processes_samples(): - monitoring_args = [ - report, - process, - report_interval, - sample_interval, - stop_event, - ] + monitoring_kwargs = { + "report": report, + "process": process, + "report_interval": report_interval, + "sample_interval": sample_interval, + "stop_event": stop_event, + "instruments": instruments, + "gpu_sample_interval": ( + gpu_sample_interval if gpu_sample_interval > 0 else None + ), + "gpu_timeout": gpu_timeout, + } monitoring_thread = threading.Thread( - target=monitor_process, args=monitoring_args + target=monitor_process, kwargs=monitoring_kwargs ) monitoring_thread.start() else: diff --git a/src/con_duct/ls.py b/src/con_duct/ls.py index a0742e71..bfc28935 100644 --- a/src/con_duct/ls.py +++ b/src/con_duct/ls.py @@ -37,6 +37,11 @@ "peak_vsz": "{value!S}", "start_time": "{value:.2f!N}", "wall_clock_time": "{value:.3f} sec", + # GPU fields (added in schema 0.3.0) + "peak_gpu_utilization": "{value:.2f!N}%", + "average_gpu_utilization": "{value:.2f!N}%", + "peak_gpu_memory_used": "{value!S}", + "average_gpu_memory_used": "{value!S}", } NON_TRANSFORMED_FIELDS: List[str] = [ @@ -63,6 +68,13 @@ LS_FIELD_CHOICES: List[str] = ( list(VALUE_TRANSFORMATION_MAP.keys()) + NON_TRANSFORMED_FIELDS ) +# GPU fields are only present when GPU monitoring was enabled during execution +OPTIONAL_GPU_FIELDS: List[str] = [ + "peak_gpu_utilization", + "average_gpu_utilization", + "peak_gpu_memory_used", + "average_gpu_memory_used", +] MINIMUM_SCHEMA_VERSION: str = "0.2.0" @@ -116,6 +128,8 @@ def ensure_compliant_schema(info_dict: dict) -> None: # message field added in 0.2.2 if parse_version(info_dict["schema_version"]) < parse_version("0.2.2"): info_dict["message"] = "" + # GPU fields added in 0.3.0 (optional, only present if GPU monitoring was enabled) + # No default values needed - they simply won't be present in older schemas def process_run_data( diff --git a/test/duct_main/test_gpu.py b/test/duct_main/test_gpu.py new file mode 100644 index 00000000..6d868bbc --- /dev/null +++ b/test/duct_main/test_gpu.py @@ -0,0 +1,240 @@ +"""Tests for GPU monitoring functionality.""" + +from __future__ import annotations +from copy import deepcopy +import subprocess +from unittest import mock +import pytest +from con_duct.duct_main import ( + GPU_SAMPLE_TIMEOUT, + GpuAverages, + GpuSample, + GpuStats, + _get_gpu_sample, +) + +# Sample GPU stats for testing +gpu_stat0 = GpuStats( + index=0, + utilization_gpu=45.0, + utilization_memory=23.0, + memory_used=4096 * 1024 * 1024, # 4GB + memory_total=16384 * 1024 * 1024, # 16GB + timestamp="2024-06-11T10:09:37-04:00", +) + +gpu_stat1 = GpuStats( + index=0, + utilization_gpu=80.0, + utilization_memory=45.0, + memory_used=8192 * 1024 * 1024, # 8GB + memory_total=16384 * 1024 * 1024, + timestamp="2024-06-11T10:13:23-04:00", +) + + +class TestGpuStats: + @pytest.mark.ai_generated + def test_aggregate_keeps_peak_values(self) -> None: + result = gpu_stat0.aggregate(gpu_stat1) + assert result.utilization_gpu == 80.0 + assert result.utilization_memory == 45.0 + assert result.memory_used == 8192 * 1024 * 1024 + assert result.timestamp == "2024-06-11T10:13:23-04:00" + + @pytest.mark.ai_generated + def test_for_json(self) -> None: + json_dict = gpu_stat0.for_json() + assert json_dict["index"] == 0 + assert json_dict["utilization_gpu"] == 45.0 + assert json_dict["memory_used"] == 4096 * 1024 * 1024 + + +class TestGpuSample: + @pytest.mark.ai_generated + def test_add_gpu(self) -> None: + sample = GpuSample() + sample.add_gpu(0, deepcopy(gpu_stat0)) + assert 0 in sample.stats + assert sample.total_utilization_gpu == 45.0 + assert sample.total_memory_used == 4096 * 1024 * 1024 + + @pytest.mark.ai_generated + def test_multi_gpu_totals(self) -> None: + sample = GpuSample() + sample.add_gpu(0, deepcopy(gpu_stat0)) + gpu1 = deepcopy(gpu_stat0) + gpu1.index = 1 + sample.add_gpu(1, gpu1) + # Utilization is summed across GPUs + assert sample.total_utilization_gpu == 90.0 + # Memory is summed across GPUs + assert sample.total_memory_used == 2 * 4096 * 1024 * 1024 + + @pytest.mark.ai_generated + def test_aggregate(self) -> None: + sample1 = GpuSample() + sample1.add_gpu(0, deepcopy(gpu_stat0)) + sample1.averages = GpuAverages( + utilization_gpu=sample1.total_utilization_gpu, + utilization_memory=sample1.total_utilization_memory, + memory_used=float(sample1.total_memory_used or 0), + num_samples=1, + ) + + sample2 = GpuSample() + sample2.add_gpu(0, deepcopy(gpu_stat1)) + + result = sample1.aggregate(sample2) + # Peak values should be kept + assert result.total_utilization_gpu == 80.0 + assert result.total_memory_used == 8192 * 1024 * 1024 + # Averages should be updated + assert result.averages.num_samples == 2 + + @pytest.mark.ai_generated + def test_from_stats(self) -> None: + stats = {0: deepcopy(gpu_stat0)} + sample = GpuSample.from_stats(stats) + assert sample.total_utilization_gpu == 45.0 + assert sample.averages.num_samples == 1 + + @pytest.mark.ai_generated + def test_for_json(self) -> None: + sample = GpuSample.from_stats({0: deepcopy(gpu_stat0)}) + json_dict = sample.for_json() + assert "gpus" in json_dict + assert "0" in json_dict["gpus"] + assert "totals" in json_dict + assert json_dict["totals"]["utilization_gpu"] == 45.0 + + +class TestGpuAverages: + @pytest.mark.ai_generated + def test_update(self) -> None: + sample1 = GpuSample.from_stats({0: deepcopy(gpu_stat0)}) + averages = GpuAverages( + utilization_gpu=sample1.total_utilization_gpu, + utilization_memory=sample1.total_utilization_memory, + memory_used=float(sample1.total_memory_used or 0), + num_samples=1, + ) + + sample2 = GpuSample.from_stats({0: deepcopy(gpu_stat1)}) + averages.update(sample2) + + assert averages.num_samples == 2 + # Average of 45.0 and 80.0 + assert averages.utilization_gpu == pytest.approx(62.5) + + +class TestGetGpuSample: + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + def test_returns_none_when_nvidia_smi_not_found( + self, mock_which: mock.MagicMock + ) -> None: + mock_which.return_value = None + result = _get_gpu_sample() + assert result is None + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_parses_nvidia_smi_output( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.return_value = "0, 45, 23, 4096, 16384\n" + + result = _get_gpu_sample() + + assert result is not None + assert 0 in result.stats + assert result.stats[0].utilization_gpu == 45.0 + assert result.stats[0].memory_used == 4096 * 1024 * 1024 + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_handles_timeout( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.side_effect = subprocess.TimeoutExpired( + cmd="nvidia-smi", timeout=GPU_SAMPLE_TIMEOUT + ) + + result = _get_gpu_sample() + assert result is None + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_handles_called_process_error( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.side_effect = subprocess.CalledProcessError( + returncode=1, cmd="nvidia-smi" + ) + + result = _get_gpu_sample() + assert result is None + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_multi_gpu_parsing( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.return_value = ( + "0, 45, 23, 4096, 16384\n" "1, 30, 15, 2048, 16384\n" + ) + + result = _get_gpu_sample() + + assert result is not None + assert len(result.stats) == 2 + assert result.total_utilization_gpu == 75.0 # 45 + 30 + assert result.total_memory_used == (4096 + 2048) * 1024 * 1024 + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_handles_malformed_output( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.return_value = "malformed output\n" + + result = _get_gpu_sample() + assert result is None + + @pytest.mark.ai_generated + @mock.patch("con_duct.duct_main.shutil.which") + @mock.patch("con_duct.duct_main.subprocess.check_output") + def test_custom_timeout( + self, + mock_check_output: mock.MagicMock, + mock_which: mock.MagicMock, + ) -> None: + mock_which.return_value = "/usr/bin/nvidia-smi" + mock_check_output.return_value = "0, 45, 23, 4096, 16384\n" + + _get_gpu_sample(timeout=10.0) + + mock_check_output.assert_called_once() + call_kwargs = mock_check_output.call_args[1] + assert call_kwargs["timeout"] == 10.0 diff --git a/test/test_instruments.py b/test/test_instruments.py new file mode 100644 index 00000000..322e4caf --- /dev/null +++ b/test/test_instruments.py @@ -0,0 +1,78 @@ +"""Tests for Instruments enum and parsing.""" + +from __future__ import annotations +import pytest +from con_duct.duct_main import Instruments, instruments_from_str + + +class TestInstruments: + @pytest.mark.ai_generated + def test_parse_single_cpu(self) -> None: + result = instruments_from_str("cpu") + assert result == {Instruments.CPU} + + @pytest.mark.ai_generated + def test_parse_single_mem(self) -> None: + result = instruments_from_str("mem") + assert result == {Instruments.MEM} + + @pytest.mark.ai_generated + def test_parse_single_gpu(self) -> None: + result = instruments_from_str("gpu") + assert result == {Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_multiple(self) -> None: + result = instruments_from_str("cpu,mem,gpu") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_cpu_mem(self) -> None: + result = instruments_from_str("cpu,mem") + assert result == {Instruments.CPU, Instruments.MEM} + + @pytest.mark.ai_generated + def test_parse_all(self) -> None: + result = instruments_from_str("all") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_all_uppercase(self) -> None: + result = instruments_from_str("ALL") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_with_spaces(self) -> None: + result = instruments_from_str("cpu, mem, gpu") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_case_insensitive(self) -> None: + result = instruments_from_str("CPU,MEM,GPU") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_mixed_case(self) -> None: + result = instruments_from_str("Cpu,Mem,Gpu") + assert result == {Instruments.CPU, Instruments.MEM, Instruments.GPU} + + @pytest.mark.ai_generated + def test_parse_invalid_raises_error(self) -> None: + with pytest.raises(ValueError, match="Invalid instrument"): + instruments_from_str("invalid") + + @pytest.mark.ai_generated + def test_parse_partially_invalid_raises_error(self) -> None: + with pytest.raises(ValueError, match="Invalid instrument"): + instruments_from_str("cpu,invalid,mem") + + @pytest.mark.ai_generated + def test_parse_empty_items_ignored(self) -> None: + result = instruments_from_str("cpu,,mem") + assert result == {Instruments.CPU, Instruments.MEM} + + @pytest.mark.ai_generated + def test_str_method(self) -> None: + assert str(Instruments.CPU) == "cpu" + assert str(Instruments.MEM) == "mem" + assert str(Instruments.GPU) == "gpu" diff --git a/test/test_schema.py b/test/test_schema.py index a1160faa..bbed0b26 100644 --- a/test/test_schema.py +++ b/test/test_schema.py @@ -3,14 +3,15 @@ from pathlib import Path from utils import run_duct_command from con_duct.duct_main import SUFFIXES -from con_duct.ls import LS_FIELD_CHOICES, _flatten_dict +from con_duct.ls import LS_FIELD_CHOICES, OPTIONAL_GPU_FIELDS, _flatten_dict def test_info_fields(temp_output_dir: str) -> None: """ Generate the list of fields users can request when viewing info files. - Fails when schema changes-- commit the new version and bump schema version + Fails when schema changes -- commit the new version and bump schema version. + GPU fields are optional and only present when GPU monitoring is enabled. """ assert ( run_duct_command( @@ -26,7 +27,19 @@ def test_info_fields(temp_output_dir: str) -> None: os.remove(Path(temp_output_dir, SUFFIXES["usage"])) info_file = Path(temp_output_dir, SUFFIXES["info"]) - actual_info_schema = _flatten_dict(json.loads(info_file.read_text())).keys() + actual_info_schema = set(_flatten_dict(json.loads(info_file.read_text())).keys()) os.remove(info_file) - assert set(actual_info_schema) == set(LS_FIELD_CHOICES) + # GPU fields are optional - they only appear when GPU monitoring is enabled + expected_required_fields = set(LS_FIELD_CHOICES) - set(OPTIONAL_GPU_FIELDS) + expected_optional_fields = set(OPTIONAL_GPU_FIELDS) + + # All required fields must be present + assert ( + expected_required_fields <= actual_info_schema + ), f"Missing required fields: {expected_required_fields - actual_info_schema}" + # Any extra fields must be from the optional set + extra_fields = actual_info_schema - expected_required_fields + assert ( + extra_fields <= expected_optional_fields + ), f"Unexpected fields: {extra_fields - expected_optional_fields}" diff --git a/test/utils.py b/test/utils.py index 7dcd9577..3473074b 100644 --- a/test/utils.py +++ b/test/utils.py @@ -17,6 +17,8 @@ def run_duct_command(cli_args: list[str], **kwargs: Any) -> int: from con_duct.duct_main import ( DUCT_OUTPUT_PREFIX, EXECUTION_SUMMARY_FORMAT, + GPU_SAMPLE_TIMEOUT, + Instruments, Outputs, RecordTypes, SessionMode, @@ -39,6 +41,9 @@ def run_duct_command(cli_args: list[str], **kwargs: Any) -> int: "colors": False, "mode": SessionMode.NEW_SESSION, "message": "", + "instruments": {Instruments.CPU, Instruments.MEM}, + "gpu_sample_interval": 0.0, + "gpu_timeout": GPU_SAMPLE_TIMEOUT, } defaults.update(kwargs)