Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,19 @@ def _run_benchmarks(

experiments_to_run.append(related_experiment)

with ThreadPoolExecutor(max_workers=10) as pool:
list(
pool.map(
lambda exp: _run_container_benchmark(
experiment=exp, run_id=run_id, benchmark_run=benchmark_run
),
experiments_to_run,
def _safe_run(exp: dict[str, str | int | list[str]]) -> None:
try:
_run_container_benchmark(
experiment=exp, run_id=run_id, benchmark_run=benchmark_run
)
)
except Exception as exc:
logger.error(
f"Experiment '{exp['id']}' failed at orchestrator level; "
f"continuing with remaining experiments. Error: {exc!r}"
)

with ThreadPoolExecutor(max_workers=10) as pool:
list(pool.map(_safe_run, experiments_to_run))

for exp in experiments_to_run:
completed_experiments.append(str(exp["id"]))
Expand Down
225 changes: 161 additions & 64 deletions src/application/common/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def wrapper(*args, **kwargs):
f"Starting benchmark for query '{query_id}' with run ID '{run_id}'."
)

ingress_sum: int = 0
egress_sum: int = 0
start_time = datetime.datetime.now(datetime.UTC)
failure: Exception | None = None
Comment on lines +47 to +50
failure_iteration: int | None = None
failure_started_at: datetime.datetime | None = None
failure_ended_at: datetime.datetime | None = None
failure_partial_sample: dict | None = None

if skip_warmup:
logger.info(
f"Executing {benchmark_iteration.value} benchmark run(s) (no warmup)."
Expand All @@ -53,78 +62,159 @@ def wrapper(*args, **kwargs):
f"Executing {Config.BENCHMARK_WARMUP_ITERATIONS} warmup runs."
)
for _ in range(Config.BENCHMARK_WARMUP_ITERATIONS):
func(*args, **kwargs)
logger.info(
f"Warmup runs completed. Starting {benchmark_iteration.value} benchmark runs."
)

ingress_sum: int = 0
egress_sum: int = 0
start_time = datetime.datetime.now(datetime.UTC)

for i in range(benchmark_iteration.value):
iteration = i + 1

started_at = datetime.datetime.now(datetime.UTC)
(
result,
wall_elapsed_time,
net_bytes_sent,
net_bytes_received,
cpu_time_user_seconds,
cpu_time_system_seconds,
) = _measure_io(func, *args, **kwargs)
ended_at = datetime.datetime.now(datetime.UTC)

executor_input_bytes_read = None
executor_run_time_ms = None
shuffle_read_bytes = None
shuffle_write_bytes = None
driver_collection_time_ms = None
stage_durations_ms = None

if elapsed_from_result:
if isinstance(result, DatabricksRunResult):
elapsed_time = result.execution_duration_s
result_cardinality = result.cardinality
executor_input_bytes_read = result.executor_input_bytes_read
executor_run_time_ms = result.executor_run_time_ms
shuffle_read_bytes = result.shuffle_read_bytes
shuffle_write_bytes = result.shuffle_write_bytes
driver_collection_time_ms = result.driver_collection_time_ms
stage_durations_ms = result.stage_durations_ms
warmup_started_at = datetime.datetime.now(datetime.UTC)
(
_,
w_elapsed,
w_sent,
w_recv,
w_cpu_u,
w_cpu_s,
warmup_exc,
) = _measure_io(func, *args, **kwargs)
warmup_ended_at = datetime.datetime.now(datetime.UTC)
if warmup_exc is not None:
failure = warmup_exc
failure_iteration = 0
failure_started_at = warmup_started_at
failure_ended_at = warmup_ended_at
failure_partial_sample = {
"network_bytes_sent": w_sent,
"network_bytes_received": w_recv,
"cpu_time_user_seconds": w_cpu_u,
"cpu_time_system_seconds": w_cpu_s,
"wall_elapsed_time": w_elapsed,
}
logger.error(
f"Warmup raised for query '{query_id}': {warmup_exc!r}. "
f"Skipping timed iterations."
)
break
if failure is None:
logger.info(
f"Warmup runs completed. Starting {benchmark_iteration.value} benchmark runs."
)

if failure is None:
for i in range(benchmark_iteration.value):
iteration = i + 1

started_at = datetime.datetime.now(datetime.UTC)
(
result,
wall_elapsed_time,
net_bytes_sent,
net_bytes_received,
cpu_time_user_seconds,
cpu_time_system_seconds,
iter_exc,
) = _measure_io(func, *args, **kwargs)
ended_at = datetime.datetime.now(datetime.UTC)

if iter_exc is not None:
failure = iter_exc
failure_iteration = iteration
failure_started_at = started_at
failure_ended_at = ended_at
failure_partial_sample = {
"network_bytes_sent": net_bytes_sent,
"network_bytes_received": net_bytes_received,
"cpu_time_user_seconds": cpu_time_user_seconds,
"cpu_time_system_seconds": cpu_time_system_seconds,
"wall_elapsed_time": wall_elapsed_time,
}
ingress_sum += net_bytes_received
egress_sum += net_bytes_sent
logger.error(
f"Iteration {iteration} raised for query '{query_id}': "
f"{iter_exc!r}. Failing fast; skipping remaining iterations."
)
break

executor_input_bytes_read = None
executor_run_time_ms = None
shuffle_read_bytes = None
shuffle_write_bytes = None
driver_collection_time_ms = None
stage_durations_ms = None

if elapsed_from_result:
if isinstance(result, DatabricksRunResult):
elapsed_time = result.execution_duration_s
result_cardinality = result.cardinality
executor_input_bytes_read = result.executor_input_bytes_read
executor_run_time_ms = result.executor_run_time_ms
shuffle_read_bytes = result.shuffle_read_bytes
shuffle_write_bytes = result.shuffle_write_bytes
driver_collection_time_ms = result.driver_collection_time_ms
stage_durations_ms = result.stage_durations_ms
else:
elapsed_time, result_cardinality = result
else:
elapsed_time, result_cardinality = result
else:
elapsed_time = wall_elapsed_time
result_cardinality = len(result) if result is not None else -1

ingress_sum += net_bytes_received
egress_sum += net_bytes_sent

elapsed_time = wall_elapsed_time
result_cardinality = len(result) if result is not None else -1

ingress_sum += net_bytes_received
egress_sum += net_bytes_sent

_save_run(
run_id=run_id,
benchmark_run=benchmark_run,
query_id=query_id,
iteration=iteration,
total_iterations=benchmark_iteration.value,
samples=[
{
"status": "success",
"failure_reason": None,
"elapsed_time": elapsed_time,
"network_bytes_sent": net_bytes_sent,
"network_bytes_received": net_bytes_received,
"started_at": started_at.isoformat(),
"ended_at": ended_at.isoformat(),
"cpu_time_user_seconds": cpu_time_user_seconds,
"cpu_time_system_seconds": cpu_time_system_seconds,
"result_cardinality": result_cardinality,
"executor_input_bytes_read": executor_input_bytes_read,
"executor_run_time_ms": executor_run_time_ms,
"shuffle_read_bytes": shuffle_read_bytes,
"shuffle_write_bytes": shuffle_write_bytes,
"driver_collection_time_ms": driver_collection_time_ms,
"stage_durations_ms": stage_durations_ms,
"schema_version": SchemaVersion.V4.value,
}
],
)

if failure is not None:
assert failure_started_at is not None
assert failure_ended_at is not None
assert failure_partial_sample is not None
Comment on lines +190 to +192
_save_run(
run_id=run_id,
benchmark_run=benchmark_run,
query_id=query_id,
iteration=iteration,
iteration=failure_iteration or 1,
total_iterations=benchmark_iteration.value,
samples=[
{
"elapsed_time": elapsed_time,
"network_bytes_sent": net_bytes_sent,
"network_bytes_received": net_bytes_received,
"started_at": started_at.isoformat(),
"ended_at": ended_at.isoformat(),
"cpu_time_user_seconds": cpu_time_user_seconds,
"cpu_time_system_seconds": cpu_time_system_seconds,
"result_cardinality": result_cardinality,
"executor_input_bytes_read": executor_input_bytes_read,
"executor_run_time_ms": executor_run_time_ms,
"shuffle_read_bytes": shuffle_read_bytes,
"shuffle_write_bytes": shuffle_write_bytes,
"driver_collection_time_ms": driver_collection_time_ms,
"stage_durations_ms": stage_durations_ms,
"schema_version": SchemaVersion.V3.value,
"status": "failed",
"failure_reason": str(failure),
"elapsed_time": None,
"network_bytes_sent": failure_partial_sample["network_bytes_sent"],
"network_bytes_received": failure_partial_sample["network_bytes_received"],
"started_at": failure_started_at.isoformat(),
"ended_at": failure_ended_at.isoformat(),
"cpu_time_user_seconds": failure_partial_sample["cpu_time_user_seconds"],
"cpu_time_system_seconds": failure_partial_sample["cpu_time_system_seconds"],
"result_cardinality": None,
"executor_input_bytes_read": None,
"executor_run_time_ms": None,
"shuffle_read_bytes": None,
"shuffle_write_bytes": None,
"driver_collection_time_ms": None,
"stage_durations_ms": None,
"schema_version": SchemaVersion.V4.value,
}
],
)
Expand All @@ -146,6 +236,13 @@ def wrapper(*args, **kwargs):
operation_type=BlobOperationType.READ,
)

if failure is not None:
logger.warning(
f"Benchmark run {benchmark_run} for query '{query_id}' recorded as failed; "
f"reason: {failure!r}"
)
return None

logger.info(f"Benchmark run {benchmark_run} completed.")
return result

Expand Down
12 changes: 10 additions & 2 deletions src/application/common/monitor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,20 @@ def _create_global_iteration(
return iteration + total_iterations * (benchmark_run - 1)


def _measure_io(func, *args, **kwargs) -> tuple[Any, float, int, int, float, float]:
def _measure_io(
func, *args, **kwargs
) -> tuple[Any, float, int, int, float, float, Exception | None]:
process = psutil.Process()
net_before = psutil.net_io_counters()
cpu_before = process.cpu_times()
start_time = time.perf_counter()

result = func(*args, **kwargs)
result: Any = None
exception: Exception | None = None
try:
result = func(*args, **kwargs)
except Exception as exc:
exception = exc

end_time = time.perf_counter()
cpu_after = process.cpu_times()
Expand All @@ -190,4 +197,5 @@ def _measure_io(func, *args, **kwargs) -> tuple[Any, float, int, int, float, flo
network_bytes_received,
cpu_time_user_seconds,
cpu_time_system_seconds,
exception,
)
1 change: 1 addition & 0 deletions src/domain/enums/schema_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
class SchemaVersion(Enum):
V2 = "v2"
V3 = "v3"
V4 = "v4"
32 changes: 32 additions & 0 deletions src/infra/infrastructure/services/databricks_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,16 @@ def _wait_for_run(self, run_id: str) -> str:

if life_cycle_state in DatabricksRunLifecycleState.terminal_values():
if result_state != DatabricksRunResultState.SUCCESS.value:
error_details = ""
tasks = data.get("tasks") or []
if tasks:
failed_task_run_id = str(tasks[0].get("run_id"))
error_details = self._fetch_run_error(failed_task_run_id)
suffix = f"\nNotebook error:\n{error_details}" if error_details else ""
raise RuntimeError(
f"Databricks run {run_id} finished with result_state='{result_state}'. "
f"State message: {state.get('state_message', '')}"
f"{suffix}"
)
execution_duration_ms = data.get("execution_duration", 0)
setup_duration_ms = data.get("setup_duration", 0)
Expand All @@ -456,6 +463,31 @@ def _wait_for_run(self, run_id: str) -> str:

time.sleep(Config.DATABRICKS_POLL_INTERVAL_SECONDS)

def _fetch_run_error(self, task_run_id: str) -> str:
"""Best-effort fetch of error/error_trace from runs/get-output for a failed task run.

Returns an empty string if the endpoint is unreachable or returns no error fields;
callers should treat this as supplementary diagnostics, not authoritative.
"""
try:
response = requests.get(
f"{self._host}/api/2.1/jobs/runs/get-output",
headers=self._headers,
params={"run_id": task_run_id},
timeout=Config.DATABRICKS_HTTP_TIMEOUT_SECONDS,
)
response.raise_for_status()
except requests.RequestException as exc:
logger.warning(
f"Could not fetch run output for failed task run {task_run_id}: {exc}"
)
return ""
payload = response.json()
error = str(payload.get("error") or "").strip()
error_trace = str(payload.get("error_trace") or "").strip()
parts = [p for p in (error, error_trace) if p]
return "\n".join(parts)
Comment on lines +485 to +489

def _fetch_notebook_output(self, run_id: str) -> DatabricksRunResult:
"""Fetch the notebook's dbutils.notebook.exit JSON payload and return a DatabricksRunResult.

Expand Down
Loading