diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c9c77d7 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.git +.github +.pytest_cache +.runtime +.venv +__pycache__ +*.py[cod] +.env diff --git a/Dockerfile b/Dockerfile index 02cd0f1..8a86d7d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.12-slim +FROM python:3.12-slim-bookworm ENV PYTHONUNBUFFERED=1 \ PIP_NO_CACHE_DIR=1 \ @@ -12,8 +12,15 @@ RUN apt-get update \ COPY requirements.txt ./ RUN python -m pip install --upgrade pip \ - && python -m pip install -r requirements.txt + && python -m pip install -r requirements.txt \ + && apt-get purge -y git \ + && apt-get autoremove -y --purge \ + && rm -rf /var/lib/apt/lists/* COPY . . +RUN useradd --create-home --uid 1000 appuser \ + && chown -R appuser:appuser /app +USER appuser + CMD ["gunicorn", "--bind", ":8080", "--workers", "1", "--threads", "1", "--timeout", "300", "main:app"] diff --git a/application/execution_service.py b/application/execution_service.py index 6915384..4c35b17 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -8,7 +8,7 @@ import time from collections.abc import Mapping from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -949,7 +949,7 @@ def _build_execution_lock_payload( "snapshot_date": snapshot_date, "mode": execution_mode, "target_hash": target_hash, - "created_at": datetime.utcnow().isoformat(timespec="seconds") + "Z", + "created_at": datetime.now(timezone.utc).isoformat(timespec="seconds") + "Z", } diff --git a/application/monitor_dispatcher.py b/application/monitor_dispatcher.py index e9e9625..fe5cc25 100644 --- a/application/monitor_dispatcher.py +++ b/application/monitor_dispatcher.py @@ -1,300 +1,52 @@ -"""Dispatch shared monitor windows to configured Cloud Run targets.""" - +"""IBKR monitor dispatcher — wraps platform_runner with IBKR-specific names & env helpers.""" from __future__ import annotations -import datetime as dt import json import os -from concurrent.futures import ThreadPoolExecutor, as_completed -from collections.abc import Callable, Mapping -from dataclasses import dataclass from typing import Any -from zoneinfo import ZoneInfo - -import requests -from google.auth.transport.requests import Request as GoogleAuthRequest -from google.oauth2 import id_token - - -MONITOR_TARGETS_ENV = "IBKR_MONITOR_DISPATCH_TARGETS_JSON" -DEFAULT_LOOKBACK_MINUTES = 4 -DEFAULT_TIMEOUT_SECONDS = 120 -DEFAULT_MAX_WORKERS = 4 - -@dataclass(frozen=True) -class MonitorWindow: - name: str - path: str - scheduler_key: str - - -MONITOR_WINDOWS = ( - MonitorWindow("probe", "/probe", "probe_time"), - MonitorWindow("precheck", "/dry-run", "precheck_time"), +from quant_platform_kit.common.platform_runner.monitor import ( + dispatch_due_monitors as _dispatch_due_monitors, + load_monitor_targets as _load_targets_from_env, ) -def load_monitor_targets(raw_json: str | None = None) -> list[dict[str, Any]]: - text = str(raw_json if raw_json is not None else os.environ.get(MONITOR_TARGETS_ENV) or "").strip() - if not text: - return [] - payload = json.loads(text) - targets = payload.get("targets") if isinstance(payload, Mapping) else payload - if not isinstance(targets, list): - raise ValueError(f"{MONITOR_TARGETS_ENV} must be a JSON array or object with targets") - return [target for target in targets if isinstance(target, dict)] - - -def due_monitor_dispatches( - targets: list[dict[str, Any]], - *, - now: dt.datetime | None = None, - lookback_minutes: int = DEFAULT_LOOKBACK_MINUTES, -) -> list[dict[str, Any]]: - now = _normalize_now(now) - since = now - dt.timedelta(minutes=max(0, int(lookback_minutes))) - dispatches: list[dict[str, Any]] = [] - for target in targets: - if not _target_enabled(target): - continue - service_url = str(target.get("service_url") or "").strip().rstrip("/") - service_name = str(target.get("service_name") or target.get("service") or "").strip() - if not service_url: - continue - scheduler = target.get("scheduler") if isinstance(target.get("scheduler"), Mapping) else {} - timezone = _target_timezone(scheduler) - for window in MONITOR_WINDOWS: - schedule = str(scheduler.get(window.scheduler_key) or "").strip() - if not schedule: - continue - if _schedule_due_between(schedule, timezone=timezone, since=since, now=now): - dispatches.append( - { - "window": window.name, - "path": window.path, - "service_name": service_name, - "service_url": service_url, - "url": f"{service_url}{window.path}", - "audience": service_url, - "schedule": schedule, - "timezone": getattr(timezone, "key", str(timezone)), - "strategy_profile": str(target.get("strategy_profile") or "").strip(), - } - ) - return dispatches - - -def dispatch_due_monitor_targets( - targets: list[dict[str, Any]], - *, - now: dt.datetime | None = None, - lookback_minutes: int = DEFAULT_LOOKBACK_MINUTES, - timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, - max_workers: int = DEFAULT_MAX_WORKERS, - request_fn: Callable[..., Any] | None = None, - token_fetcher: Callable[[str], str] | None = None, -) -> dict[str, Any]: - dispatches = due_monitor_dispatches(targets, now=now, lookback_minutes=lookback_minutes) - request_fn = request_fn or requests.post - token_fetcher = token_fetcher or fetch_identity_token - token_cache: dict[str, str] = {} - results = [] - if not dispatches: - return { - "dispatches_due": 0, - "dispatches_sent": 0, - "results": [], - } - for dispatch in dispatches: - audience = str(dispatch["audience"]) - token = token_cache.get(audience) - if token is None: - token = token_fetcher(audience) - token_cache[audience] = token - - def send(dispatch: Mapping[str, Any]) -> dict[str, Any]: - token = token_cache[str(dispatch["audience"])] - result = { - **{key: dispatch[key] for key in ("window", "service_name", "url", "schedule", "timezone", "strategy_profile")}, - } - try: - response = request_fn( - dispatch["url"], - headers={ - "Authorization": f"Bearer {token}", - "User-Agent": "ibkr-monitor-dispatcher", - }, - timeout=timeout_seconds, - ) - except Exception as exc: # noqa: BLE001 - return { - **result, - "status_code": 0, - "ok": False, - "error_type": type(exc).__name__, - "error_message": str(exc)[:300], - } - status_code = int(getattr(response, "status_code", 0) or 0) - body = str(getattr(response, "text", "") or "") - return { - **result, - "status_code": status_code, - "ok": 200 <= status_code < 300, - "body_preview": body[:200], - } - - worker_count = max(1, min(int(max_workers), len(dispatches))) - with ThreadPoolExecutor(max_workers=worker_count) as executor: - futures = [executor.submit(send, dispatch) for dispatch in dispatches] - for future in as_completed(futures): - results.append(future.result()) - return { - "dispatches_due": len(dispatches), - "dispatches_sent": len(results), - "results": results, - } - - -def fetch_identity_token(audience: str) -> str: - return id_token.fetch_id_token(GoogleAuthRequest(), audience) - +# ── IBKR env var helpers (were part of the original IBKR monitor_dispatcher) ── def lookback_minutes_from_env() -> int: - raw_value = os.environ.get("IBKR_MONITOR_DISPATCH_LOOKBACK_MINUTES", str(DEFAULT_LOOKBACK_MINUTES)) - try: - return max(0, int(raw_value)) - except (TypeError, ValueError): - return DEFAULT_LOOKBACK_MINUTES - + return int(os.environ.get("IBKR_MONITOR_DISPATCH_LOOKBACK_MINUTES", "4")) def timeout_seconds_from_env() -> int: - raw_value = os.environ.get("IBKR_MONITOR_DISPATCH_TIMEOUT_SECONDS", str(DEFAULT_TIMEOUT_SECONDS)) - try: - return max(1, int(raw_value)) - except (TypeError, ValueError): - return DEFAULT_TIMEOUT_SECONDS - + return int(os.environ.get("IBKR_MONITOR_DISPATCH_TIMEOUT_SECONDS", "120")) def max_workers_from_env() -> int: - raw_value = os.environ.get("IBKR_MONITOR_DISPATCH_MAX_WORKERS", str(DEFAULT_MAX_WORKERS)) - try: - return max(1, int(raw_value)) - except (TypeError, ValueError): - return DEFAULT_MAX_WORKERS - - -def _target_enabled(target: Mapping[str, Any]) -> bool: - value = target.get("runtime_target_enabled") - if value is None: - return True - return str(value).strip().lower() not in {"0", "false", "no", "off", "disabled"} - - -def _normalize_now(now: dt.datetime | None) -> dt.datetime: - value = now or dt.datetime.now(dt.timezone.utc) - if value.tzinfo is None: - value = value.replace(tzinfo=dt.timezone.utc) - return value.astimezone(dt.timezone.utc) - + return int(os.environ.get("IBKR_MONITOR_DISPATCH_MAX_WORKERS", "4")) -def _target_timezone(scheduler: Mapping[str, Any]) -> ZoneInfo | dt.tzinfo: - try: - return ZoneInfo(str(scheduler.get("timezone") or "UTC")) - except Exception: # noqa: BLE001 - return dt.timezone.utc +# ── IBKR-compatible function names ── -def _cron_token_value(token: str, *, names: dict[str, int] | None = None) -> int: - normalized = token.strip().lower() - if names and normalized in names: - return names[normalized] - return int(normalized) - - -def _cron_field_values( - field: str, - *, - minimum: int, - maximum: int, - names: dict[str, int] | None = None, -) -> set[int] | None: - text = str(field or "").strip().lower() - if text in {"", "*"}: - return None - values: set[int] = set() - for raw_part in text.split(","): - part = raw_part.strip() - if not part: - continue - base, raw_step = part, "1" - if "/" in part: - base, raw_step = part.split("/", 1) - step = max(1, int(raw_step)) - if base == "*": - start, end = minimum, maximum - elif "-" in base: - raw_start, raw_end = base.split("-", 1) - start = _cron_token_value(raw_start, names=names) - end = _cron_token_value(raw_end, names=names) - else: - start = end = _cron_token_value(base, names=names) - for value in range(start, end + 1, step): - if minimum <= value <= maximum: - values.add(value) - elif maximum == 6 and value == 7: - values.add(0) - return values - - -def _cron_matches(schedule: str, value: dt.datetime) -> bool: - fields = str(schedule or "").split() - if len(fields) != 5: - return False - minute, hour, day_of_month, month, day_of_week = fields - dow_names = { - "sun": 0, - "mon": 1, - "tue": 2, - "wed": 3, - "thu": 4, - "fri": 5, - "sat": 6, - } - minute_values = _cron_field_values(minute, minimum=0, maximum=59) - hour_values = _cron_field_values(hour, minimum=0, maximum=23) - dom_values = _cron_field_values(day_of_month, minimum=1, maximum=31) - month_values = _cron_field_values(month, minimum=1, maximum=12) - dow_values = _cron_field_values(day_of_week, minimum=0, maximum=6, names=dow_names) - if minute_values is not None and value.minute not in minute_values: - return False - if hour_values is not None and value.hour not in hour_values: - return False - if month_values is not None and value.month not in month_values: - return False - - dom_matches = dom_values is None or value.day in dom_values - cron_weekday = value.isoweekday() % 7 - dow_matches = dow_values is None or cron_weekday in dow_values - if dom_values is not None and dow_values is not None: - return dom_matches or dow_matches - return dom_matches and dow_matches +def load_monitor_targets(raw_json: str | None = None) -> list[dict[str, Any]]: + """Read target config from env or from a direct JSON string.""" + if raw_json: + return json.loads(raw_json) + env = os.environ if "IBKR_MONITOR_DISPATCH_TARGETS_JSON" in os.environ else None + return _load_targets_from_env(env=env) -def _schedule_due_between( - schedule: str, +def dispatch_due_monitor_targets( + targets: list[dict[str, Any]], *, - timezone: dt.tzinfo, - since: dt.datetime, - now: dt.datetime, -) -> bool: - since_utc = since.astimezone(dt.timezone.utc) - now_utc = now.astimezone(dt.timezone.utc) - cursor = since_utc.replace(second=0, microsecond=0) - if cursor < since_utc: - cursor += dt.timedelta(minutes=1) - while cursor <= now_utc: - if _cron_matches(schedule, cursor.astimezone(timezone)): - return True - cursor += dt.timedelta(minutes=1) - return False + now: str | None = None, + lookback_minutes: int | None = None, + timeout_seconds: int | None = None, + max_workers: int | None = None, +) -> dict[str, Any]: + from datetime import datetime, timezone + + return _dispatch_due_monitors( + targets, + now=datetime.fromisoformat(now) if now else None, + lookback_minutes=lookback_minutes or lookback_minutes_from_env(), + timeout_seconds=timeout_seconds or timeout_seconds_from_env(), + max_workers=max_workers or max_workers_from_env(), + ) diff --git a/main.py b/main.py index a91e94b..06c3bc4 100644 --- a/main.py +++ b/main.py @@ -8,15 +8,14 @@ from datetime import datetime from zoneinfo import ZoneInfo -import google.auth import pandas as pd import requests from flask import Flask, request try: - from google.cloud import compute_v1 + from quant_platform_kit.cloud import get_compute_discovery except ImportError: - compute_v1 = None + get_compute_discovery = None from application.cycle_result import coerce_strategy_cycle_result from application.runtime_broker_adapters import build_runtime_broker_adapters @@ -89,8 +88,9 @@ def get_project_id(): try: - _, project_id = google.auth.default() - return project_id if project_id else os.getenv("GOOGLE_CLOUD_PROJECT") + from quant_platform_kit.cloud import get_deployment_context + + return get_deployment_context().project_id except Exception: return os.getenv("GOOGLE_CLOUD_PROJECT") @@ -106,32 +106,18 @@ def get_ib_gateway_ip_mode(): def resolve_gce_instance_ip(instance_name, zone): - if not compute_v1: - print(f"google-cloud-compute not installed, using {instance_name} as host directly", flush=True) + if not get_compute_discovery: + print(f"quant_platform_kit.cloud not installed, using {instance_name} as host directly", flush=True) return instance_name try: ip_mode = get_ib_gateway_ip_mode() project = get_project_id() - client = compute_v1.InstancesClient() - instance = client.get(project=project, zone=zone, instance=instance_name) - internal_ip = None - external_ip = None - for iface in instance.network_interfaces: - if iface.network_i_p: - internal_ip = iface.network_i_p - for ac in iface.access_configs: - if ac.nat_i_p: - external_ip = ac.nat_i_p - - candidates = ( - (("internal", internal_ip), ("external", external_ip)) - if ip_mode == "internal" - else (("external", external_ip), ("internal", internal_ip)) + return get_compute_discovery().resolve_instance_ip( + instance_name, + zone, + project_id=project, + prefer_internal=ip_mode == "internal", ) - for label, ip in candidates: - if ip: - print(f"Resolved {instance_name} → {ip} ({label}, mode={ip_mode})", flush=True) - return ip except Exception as exc: print(f"GCE resolve failed for {instance_name}: {exc}, using as hostname", flush=True) return instance_name diff --git a/requirements.txt b/requirements.txt index 4f93f8e..fa57cc9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ flask gunicorn -quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@dfdbef6b58ab46f357d67800510bb9e8c4a01182 +quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@e86554b us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@a9546362a27abdfc9cd5184b30ca8d26fd774187 hk-equity-strategies @ git+https://github.com/QuantStrategyLab/HkEquityStrategies.git@dbbefb688cd144837aa59581b1930a14c11411ad pandas diff --git a/runtime_config_support.py b/runtime_config_support.py index c8eca4e..46c7125 100644 --- a/runtime_config_support.py +++ b/runtime_config_support.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import Any, Callable +from quant_platform_kit.cloud import get_secret_store from quant_platform_kit.common.runtime_config import ( first_non_empty, resolve_bool_value, @@ -781,18 +782,7 @@ def load_secret_payload( *, secret_client_factory: Callable[[], Any] | None = None, ) -> str: - if secret_client_factory is None: - try: - import google.cloud.secretmanager_v1 as secret_manager - except ImportError: - from google.cloud import secret_manager - - secret_client_factory = secret_manager.SecretManagerServiceClient - - client = secret_client_factory() - resource_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest" - response = client.access_secret_version(request={"name": resource_name}) - return response.payload.data.decode("UTF-8") + return get_secret_store().get_secret(secret_name, project_id=project_id) def parse_account_ids(raw_value: Any) -> tuple[str, ...]: