From c8d18e656f7cadb64aaaebdd6d3192ac9380dab1 Mon Sep 17 00:00:00 2001 From: Jathavaan Shankarr Date: Wed, 20 May 2026 08:27:29 +0200 Subject: [PATCH] #284 Added failure handling for benchmarks Signed-off-by: Jathavaan Shankarr --- main.py | 20 +- src/application/common/monitor.py | 225 +++++++++++++----- src/application/common/monitor_utils.py | 12 +- src/domain/enums/schema_version.py | 1 + .../services/databricks_service.py | 32 +++ 5 files changed, 216 insertions(+), 74 deletions(-) diff --git a/main.py b/main.py index 9a4b2f0a..81745684 100644 --- a/main.py +++ b/main.py @@ -78,15 +78,19 @@ def _run_benchmarks( experiments_to_run.append(related_experiment) - with ThreadPoolExecutor(max_workers=10) as pool: - list( - pool.map( - lambda exp: _run_container_benchmark( - experiment=exp, run_id=run_id, benchmark_run=benchmark_run - ), - experiments_to_run, + def _safe_run(exp: dict[str, str | int | list[str]]) -> None: + try: + _run_container_benchmark( + experiment=exp, run_id=run_id, benchmark_run=benchmark_run ) - ) + except Exception as exc: + logger.error( + f"Experiment '{exp['id']}' failed at orchestrator level; " + f"continuing with remaining experiments. Error: {exc!r}" + ) + + with ThreadPoolExecutor(max_workers=10) as pool: + list(pool.map(_safe_run, experiments_to_run)) for exp in experiments_to_run: completed_experiments.append(str(exp["id"])) diff --git a/src/application/common/monitor.py b/src/application/common/monitor.py index facefef7..31306f02 100644 --- a/src/application/common/monitor.py +++ b/src/application/common/monitor.py @@ -44,6 +44,15 @@ def wrapper(*args, **kwargs): f"Starting benchmark for query '{query_id}' with run ID '{run_id}'." ) + ingress_sum: int = 0 + egress_sum: int = 0 + start_time = datetime.datetime.now(datetime.UTC) + failure: Exception | None = None + failure_iteration: int | None = None + failure_started_at: datetime.datetime | None = None + failure_ended_at: datetime.datetime | None = None + failure_partial_sample: dict | None = None + if skip_warmup: logger.info( f"Executing {benchmark_iteration.value} benchmark run(s) (no warmup)." @@ -53,78 +62,159 @@ def wrapper(*args, **kwargs): f"Executing {Config.BENCHMARK_WARMUP_ITERATIONS} warmup runs." ) for _ in range(Config.BENCHMARK_WARMUP_ITERATIONS): - func(*args, **kwargs) - logger.info( - f"Warmup runs completed. Starting {benchmark_iteration.value} benchmark runs." - ) - - ingress_sum: int = 0 - egress_sum: int = 0 - start_time = datetime.datetime.now(datetime.UTC) - - for i in range(benchmark_iteration.value): - iteration = i + 1 - - started_at = datetime.datetime.now(datetime.UTC) - ( - result, - wall_elapsed_time, - net_bytes_sent, - net_bytes_received, - cpu_time_user_seconds, - cpu_time_system_seconds, - ) = _measure_io(func, *args, **kwargs) - ended_at = datetime.datetime.now(datetime.UTC) - - executor_input_bytes_read = None - executor_run_time_ms = None - shuffle_read_bytes = None - shuffle_write_bytes = None - driver_collection_time_ms = None - stage_durations_ms = None - - if elapsed_from_result: - if isinstance(result, DatabricksRunResult): - elapsed_time = result.execution_duration_s - result_cardinality = result.cardinality - executor_input_bytes_read = result.executor_input_bytes_read - executor_run_time_ms = result.executor_run_time_ms - shuffle_read_bytes = result.shuffle_read_bytes - shuffle_write_bytes = result.shuffle_write_bytes - driver_collection_time_ms = result.driver_collection_time_ms - stage_durations_ms = result.stage_durations_ms + warmup_started_at = datetime.datetime.now(datetime.UTC) + ( + _, + w_elapsed, + w_sent, + w_recv, + w_cpu_u, + w_cpu_s, + warmup_exc, + ) = _measure_io(func, *args, **kwargs) + warmup_ended_at = datetime.datetime.now(datetime.UTC) + if warmup_exc is not None: + failure = warmup_exc + failure_iteration = 0 + failure_started_at = warmup_started_at + failure_ended_at = warmup_ended_at + failure_partial_sample = { + "network_bytes_sent": w_sent, + "network_bytes_received": w_recv, + "cpu_time_user_seconds": w_cpu_u, + "cpu_time_system_seconds": w_cpu_s, + "wall_elapsed_time": w_elapsed, + } + logger.error( + f"Warmup raised for query '{query_id}': {warmup_exc!r}. " + f"Skipping timed iterations." + ) + break + if failure is None: + logger.info( + f"Warmup runs completed. Starting {benchmark_iteration.value} benchmark runs." + ) + + if failure is None: + for i in range(benchmark_iteration.value): + iteration = i + 1 + + started_at = datetime.datetime.now(datetime.UTC) + ( + result, + wall_elapsed_time, + net_bytes_sent, + net_bytes_received, + cpu_time_user_seconds, + cpu_time_system_seconds, + iter_exc, + ) = _measure_io(func, *args, **kwargs) + ended_at = datetime.datetime.now(datetime.UTC) + + if iter_exc is not None: + failure = iter_exc + failure_iteration = iteration + failure_started_at = started_at + failure_ended_at = ended_at + failure_partial_sample = { + "network_bytes_sent": net_bytes_sent, + "network_bytes_received": net_bytes_received, + "cpu_time_user_seconds": cpu_time_user_seconds, + "cpu_time_system_seconds": cpu_time_system_seconds, + "wall_elapsed_time": wall_elapsed_time, + } + ingress_sum += net_bytes_received + egress_sum += net_bytes_sent + logger.error( + f"Iteration {iteration} raised for query '{query_id}': " + f"{iter_exc!r}. Failing fast; skipping remaining iterations." + ) + break + + executor_input_bytes_read = None + executor_run_time_ms = None + shuffle_read_bytes = None + shuffle_write_bytes = None + driver_collection_time_ms = None + stage_durations_ms = None + + if elapsed_from_result: + if isinstance(result, DatabricksRunResult): + elapsed_time = result.execution_duration_s + result_cardinality = result.cardinality + executor_input_bytes_read = result.executor_input_bytes_read + executor_run_time_ms = result.executor_run_time_ms + shuffle_read_bytes = result.shuffle_read_bytes + shuffle_write_bytes = result.shuffle_write_bytes + driver_collection_time_ms = result.driver_collection_time_ms + stage_durations_ms = result.stage_durations_ms + else: + elapsed_time, result_cardinality = result else: - elapsed_time, result_cardinality = result - else: - elapsed_time = wall_elapsed_time - result_cardinality = len(result) if result is not None else -1 - - ingress_sum += net_bytes_received - egress_sum += net_bytes_sent - + elapsed_time = wall_elapsed_time + result_cardinality = len(result) if result is not None else -1 + + ingress_sum += net_bytes_received + egress_sum += net_bytes_sent + + _save_run( + run_id=run_id, + benchmark_run=benchmark_run, + query_id=query_id, + iteration=iteration, + total_iterations=benchmark_iteration.value, + samples=[ + { + "status": "success", + "failure_reason": None, + "elapsed_time": elapsed_time, + "network_bytes_sent": net_bytes_sent, + "network_bytes_received": net_bytes_received, + "started_at": started_at.isoformat(), + "ended_at": ended_at.isoformat(), + "cpu_time_user_seconds": cpu_time_user_seconds, + "cpu_time_system_seconds": cpu_time_system_seconds, + "result_cardinality": result_cardinality, + "executor_input_bytes_read": executor_input_bytes_read, + "executor_run_time_ms": executor_run_time_ms, + "shuffle_read_bytes": shuffle_read_bytes, + "shuffle_write_bytes": shuffle_write_bytes, + "driver_collection_time_ms": driver_collection_time_ms, + "stage_durations_ms": stage_durations_ms, + "schema_version": SchemaVersion.V4.value, + } + ], + ) + + if failure is not None: + assert failure_started_at is not None + assert failure_ended_at is not None + assert failure_partial_sample is not None _save_run( run_id=run_id, benchmark_run=benchmark_run, query_id=query_id, - iteration=iteration, + iteration=failure_iteration or 1, total_iterations=benchmark_iteration.value, samples=[ { - "elapsed_time": elapsed_time, - "network_bytes_sent": net_bytes_sent, - "network_bytes_received": net_bytes_received, - "started_at": started_at.isoformat(), - "ended_at": ended_at.isoformat(), - "cpu_time_user_seconds": cpu_time_user_seconds, - "cpu_time_system_seconds": cpu_time_system_seconds, - "result_cardinality": result_cardinality, - "executor_input_bytes_read": executor_input_bytes_read, - "executor_run_time_ms": executor_run_time_ms, - "shuffle_read_bytes": shuffle_read_bytes, - "shuffle_write_bytes": shuffle_write_bytes, - "driver_collection_time_ms": driver_collection_time_ms, - "stage_durations_ms": stage_durations_ms, - "schema_version": SchemaVersion.V3.value, + "status": "failed", + "failure_reason": str(failure), + "elapsed_time": None, + "network_bytes_sent": failure_partial_sample["network_bytes_sent"], + "network_bytes_received": failure_partial_sample["network_bytes_received"], + "started_at": failure_started_at.isoformat(), + "ended_at": failure_ended_at.isoformat(), + "cpu_time_user_seconds": failure_partial_sample["cpu_time_user_seconds"], + "cpu_time_system_seconds": failure_partial_sample["cpu_time_system_seconds"], + "result_cardinality": None, + "executor_input_bytes_read": None, + "executor_run_time_ms": None, + "shuffle_read_bytes": None, + "shuffle_write_bytes": None, + "driver_collection_time_ms": None, + "stage_durations_ms": None, + "schema_version": SchemaVersion.V4.value, } ], ) @@ -146,6 +236,13 @@ def wrapper(*args, **kwargs): operation_type=BlobOperationType.READ, ) + if failure is not None: + logger.warning( + f"Benchmark run {benchmark_run} for query '{query_id}' recorded as failed; " + f"reason: {failure!r}" + ) + return None + logger.info(f"Benchmark run {benchmark_run} completed.") return result diff --git a/src/application/common/monitor_utils.py b/src/application/common/monitor_utils.py index 76d1c009..15da4d46 100644 --- a/src/application/common/monitor_utils.py +++ b/src/application/common/monitor_utils.py @@ -165,13 +165,20 @@ def _create_global_iteration( return iteration + total_iterations * (benchmark_run - 1) -def _measure_io(func, *args, **kwargs) -> tuple[Any, float, int, int, float, float]: +def _measure_io( + func, *args, **kwargs +) -> tuple[Any, float, int, int, float, float, Exception | None]: process = psutil.Process() net_before = psutil.net_io_counters() cpu_before = process.cpu_times() start_time = time.perf_counter() - result = func(*args, **kwargs) + result: Any = None + exception: Exception | None = None + try: + result = func(*args, **kwargs) + except Exception as exc: + exception = exc end_time = time.perf_counter() cpu_after = process.cpu_times() @@ -190,4 +197,5 @@ def _measure_io(func, *args, **kwargs) -> tuple[Any, float, int, int, float, flo network_bytes_received, cpu_time_user_seconds, cpu_time_system_seconds, + exception, ) diff --git a/src/domain/enums/schema_version.py b/src/domain/enums/schema_version.py index c7e97558..bb9f8701 100644 --- a/src/domain/enums/schema_version.py +++ b/src/domain/enums/schema_version.py @@ -4,3 +4,4 @@ class SchemaVersion(Enum): V2 = "v2" V3 = "v3" + V4 = "v4" diff --git a/src/infra/infrastructure/services/databricks_service.py b/src/infra/infrastructure/services/databricks_service.py index 202dbe88..fb67dd99 100644 --- a/src/infra/infrastructure/services/databricks_service.py +++ b/src/infra/infrastructure/services/databricks_service.py @@ -429,9 +429,16 @@ def _wait_for_run(self, run_id: str) -> str: if life_cycle_state in DatabricksRunLifecycleState.terminal_values(): if result_state != DatabricksRunResultState.SUCCESS.value: + error_details = "" + tasks = data.get("tasks") or [] + if tasks: + failed_task_run_id = str(tasks[0].get("run_id")) + error_details = self._fetch_run_error(failed_task_run_id) + suffix = f"\nNotebook error:\n{error_details}" if error_details else "" raise RuntimeError( f"Databricks run {run_id} finished with result_state='{result_state}'. " f"State message: {state.get('state_message', '')}" + f"{suffix}" ) execution_duration_ms = data.get("execution_duration", 0) setup_duration_ms = data.get("setup_duration", 0) @@ -456,6 +463,31 @@ def _wait_for_run(self, run_id: str) -> str: time.sleep(Config.DATABRICKS_POLL_INTERVAL_SECONDS) + def _fetch_run_error(self, task_run_id: str) -> str: + """Best-effort fetch of error/error_trace from runs/get-output for a failed task run. + + Returns an empty string if the endpoint is unreachable or returns no error fields; + callers should treat this as supplementary diagnostics, not authoritative. + """ + try: + response = requests.get( + f"{self._host}/api/2.1/jobs/runs/get-output", + headers=self._headers, + params={"run_id": task_run_id}, + timeout=Config.DATABRICKS_HTTP_TIMEOUT_SECONDS, + ) + response.raise_for_status() + except requests.RequestException as exc: + logger.warning( + f"Could not fetch run output for failed task run {task_run_id}: {exc}" + ) + return "" + payload = response.json() + error = str(payload.get("error") or "").strip() + error_trace = str(payload.get("error_trace") or "").strip() + parts = [p for p in (error, error_trace) if p] + return "\n".join(parts) + def _fetch_notebook_output(self, run_id: str) -> DatabricksRunResult: """Fetch the notebook's dbutils.notebook.exit JSON payload and return a DatabricksRunResult.