From 12b02be4bc11c99aa0b6a4259682e01f36c796b0 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Thu, 2 Jul 2026 00:23:18 +0800 Subject: [PATCH] feat: add strategy lifecycle CLI - Add quant-lifecycle console script backed by QuantPlatformKit lifecycle modules - Keep the CLI dependency-free by using argparse - Cover command delegation and compatibility wrapper behavior Co-Authored-By: Codex --- README.md | 13 + README.zh-CN.md | 13 + pyproject.toml | 3 + .../strategy_lifecycle/cli.py | 224 ++++++++++++++++++ tests/test_lifecycle_cli.py | 160 +++++++++++++ 5 files changed, 413 insertions(+) create mode 100644 src/quant_platform_kit/strategy_lifecycle/cli.py create mode 100644 tests/test_lifecycle_cli.py diff --git a/README.md b/README.md index ce9ff76..03a881e 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,19 @@ python -m pip install -e . python -m pytest -q ``` +## Strategy lifecycle CLI + +`quant-lifecycle` provides the shared lifecycle entrypoint formerly wrapped by `QuantStrategyLifecycle`. +Production schedules should live in domain repositories and call this CLI or the underlying +`quant_platform_kit.strategy_lifecycle` modules directly. + +```bash +quant-lifecycle monitor --domain us_equity +quant-lifecycle drift --domain us_equity +quant-lifecycle autopilot --domain us_equity --dry-run +quant-lifecycle dashboard --format all +``` + ## Cloud provider abstraction QuantPlatformKit includes a cloud provider abstraction layer at `quant_platform_kit.cloud`. It defines protocol interfaces for common cloud services — secret management, object storage, document databases, compute discovery, and deployment context — so that platform code can be written without hard-wiring to a specific cloud provider. diff --git a/README.zh-CN.md b/README.zh-CN.md index 5e1f012..01e610f 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -31,6 +31,19 @@ python -m pip install -e . python -m pytest -q ``` +## 策略生命周期 CLI + +`quant-lifecycle` 是原 `QuantStrategyLifecycle` wrapper 职责迁入后的共享入口。 +生产定时任务应放在各 domain 仓库,并调用这个 CLI 或底层 +`quant_platform_kit.strategy_lifecycle` 模块。 + +```bash +quant-lifecycle monitor --domain us_equity +quant-lifecycle drift --domain us_equity +quant-lifecycle autopilot --domain us_equity --dry-run +quant-lifecycle dashboard --format all +``` + ## 云服务抽象层 `quant_platform_kit.cloud` 包为常用云服务定义了协议接口——密钥管理、对象存储、文档数据库、计算发现和部署上下文。平台代码可以通过这些接口编写,无需硬编码到特定云厂商。 diff --git a/pyproject.toml b/pyproject.toml index 5fe70d4..eaaf2f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,9 @@ authors = [ ] dependencies = [] +[project.scripts] +quant-lifecycle = "quant_platform_kit.strategy_lifecycle.cli:main" + [tool.setuptools] package-dir = { "" = "src" } diff --git a/src/quant_platform_kit/strategy_lifecycle/cli.py b/src/quant_platform_kit/strategy_lifecycle/cli.py new file mode 100644 index 0000000..dfbd087 --- /dev/null +++ b/src/quant_platform_kit/strategy_lifecycle/cli.py @@ -0,0 +1,224 @@ +"""Command-line entrypoint for strategy lifecycle operations.""" + +from __future__ import annotations + +import argparse +import importlib +import sys +from collections.abc import Callable, Sequence +from typing import Any + + +def _load_callable(module_name: str, function_name: str) -> Callable[..., Any]: + module = importlib.import_module(module_name) + return getattr(module, function_name) + + +def _print(message: str) -> None: + print(message) + + +def _run_monitor(args: argparse.Namespace) -> int: + _print(f"[monitor] Running performance monitor for domain={args.domain}") + run_monitor = _load_callable( + "quant_platform_kit.strategy_lifecycle.performance_monitor", + "run_monitor", + ) + snapshots = run_monitor( + domain=args.domain, + strategy_profile=args.strategy, + output_dir=args.output_dir, + ) + _print(f"[monitor] Generated {len(snapshots)} performance snapshots") + return 0 + + +def _run_drift(args: argparse.Namespace) -> int: + _print(f"[drift] Running drift detection for domain={args.domain}") + run_drift_detection = _load_callable( + "quant_platform_kit.strategy_lifecycle.drift_detector", + "run_drift_detection", + ) + results = run_drift_detection(domain=args.domain, strategy_profile=args.strategy) + critical_count = sum(1 for item in results if getattr(getattr(item, "status", None), "value", None) == "critical") + review_count = sum(1 for item in results if getattr(getattr(item, "status", None), "value", None) == "review") + _print(f"[drift] {len(results)} strategies checked, {critical_count} critical, {review_count} review") + if not getattr(args, "no_alerts", False): + build_drift_alert = _load_callable( + "quant_platform_kit.strategy_lifecycle.drift_alerts", + "build_drift_alert", + ) + publish_drift_alerts = _load_callable( + "quant_platform_kit.strategy_lifecycle.drift_alerts", + "publish_drift_alerts", + ) + events = [event for event in (build_drift_alert(result) for result in results) if event is not None] + counts = publish_drift_alerts(events, dry_run=getattr(args, "dry_run_alerts", False)) + _print(f"[drift] Alerts published: {sum(counts.values())}") + return 0 + + +def _run_optimize(args: argparse.Namespace) -> int: + _print(f"[optimize] Optimizing {args.strategy} with method={args.method}") + run_optimization = _load_callable( + "quant_platform_kit.strategy_lifecycle.param_optimizer", + "run_optimization", + ) + proposal = run_optimization(strategy_profile=args.strategy, method=args.method) + _print(f"[optimize] Recommendation: {getattr(proposal, 'recommendation', '')}") + improvement_score = getattr(proposal, "improvement_score", None) + if improvement_score is not None: + _print(f"[optimize] Improvement score: {improvement_score:.3f}") + return 0 + + +def _run_update(args: argparse.Namespace) -> int: + _print(f"[update] Processing proposal: {args.proposal}") + process_update = _load_callable( + "quant_platform_kit.strategy_lifecycle.update_orchestrator", + "process_update", + ) + result = process_update(proposal_path=args.proposal, auto_approve=args.auto_approve) + _print(f"[update] Result: stage={result.get('stage')}, reason={result.get('reason', '')}") + return 1 if result.get("stage") == "error" else 0 + + +def _run_dashboard(args: argparse.Namespace) -> int: + _print(f"[dashboard] Building health dashboard (format={args.output_format})") + build_dashboard = _load_callable( + "quant_platform_kit.strategy_lifecycle.health_dashboard", + "build_dashboard", + ) + result = build_dashboard(output_dir=args.output_dir, output_format=args.output_format) + _print(f"[dashboard] Dashboard built with {result.get('strategy_count', 0)} strategies") + return 0 + + +def _run_autopilot(args: argparse.Namespace) -> int: + _print(f"[autopilot] Running auto-pilot cycle for domain={args.domain} (dry_run={args.dry_run})") + run_auto_pilot_cycle = _load_callable( + "quant_platform_kit.strategy_lifecycle.codex_integration", + "run_auto_pilot_cycle", + ) + summary = run_auto_pilot_cycle( + args.domain, + dry_run=args.dry_run, + create_issues=not args.no_issues, + trigger_optimization=True, + ) + _print(f"[autopilot] Snapshots: {summary.get('snapshots_count', 0)}") + _print(f"[autopilot] Drifts checked: {summary.get('drifts_checked', 0)}") + _print(f"[autopilot] Drifts alerting: {summary.get('drifts_alerting', 0)}") + _print(f"[autopilot] Issues created: {summary.get('issues_created', 0)}") + _print(f"[autopilot] Actions: {len(summary.get('actions', []))}") + for action in summary.get("actions", []): + decision = action.get("ai_decision", {}) + _print( + f" - {action['strategy']}: drift={action['drift_status']}, " + f"optimize={decision.get('optimization_needed')}, " + f"method={decision.get('recommended_method', 'none')}" + ) + return 0 + + +def _run_lifecycle(args: argparse.Namespace) -> int: + _print(f"[lifecycle] Running full lifecycle for domain={args.domain}") + _print("[lifecycle] Step: monitor") + monitor_status = _run_monitor(argparse.Namespace(domain=args.domain, strategy=None, output_dir=None)) + if monitor_status != 0: + return monitor_status + + _print("[lifecycle] Step: drift") + drift_status = _run_drift( + argparse.Namespace( + domain=args.domain, + strategy=None, + no_alerts=args.no_alerts, + dry_run_alerts=args.dry_run_alerts, + ) + ) + if drift_status != 0: + return drift_status + + if not args.skip_optimization: + if args.strategy: + _print("[lifecycle] Step: optimize") + optimize_status = _run_optimize(argparse.Namespace(strategy=args.strategy, method=args.method)) + if optimize_status != 0: + return optimize_status + else: + _print("[lifecycle] Step: optimize skipped (no --strategy provided)") + + _print("[lifecycle] Step: dashboard") + dashboard_status = _run_dashboard(argparse.Namespace(output_dir=None, output_format=args.output_format)) + if dashboard_status != 0: + return dashboard_status + + _print("[lifecycle] Full lifecycle complete") + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="quant-lifecycle", description="Quant strategy lifecycle CLI.") + parser.add_argument("--version", action="version", version="quant-lifecycle 0.10.0") + subparsers = parser.add_subparsers(dest="command", required=True) + + monitor = subparsers.add_parser("monitor", help="Run the continuous performance monitor for one domain.") + monitor.add_argument("--domain", default="us_equity") + monitor.add_argument("--strategy", default=None) + monitor.add_argument("--output-dir", default=None) + monitor.set_defaults(func=_run_monitor) + + drift = subparsers.add_parser("drift", help="Run drift detection and publish drift alerts.") + drift.add_argument("--domain", default="us_equity") + drift.add_argument("--strategy", default=None) + drift.add_argument("--no-alerts", action="store_true") + drift.add_argument("--dry-run-alerts", action="store_true") + drift.set_defaults(func=_run_drift) + + optimize = subparsers.add_parser("optimize", help="Run parameter optimization for one strategy.") + optimize.add_argument("--strategy", required=True) + optimize.add_argument("--method", default="grid_search") + optimize.set_defaults(func=_run_optimize) + + update = subparsers.add_parser("update", help="Process a parameter update proposal.") + update.add_argument("--proposal", required=True) + update.add_argument("--auto-approve", action="store_true") + update.set_defaults(func=_run_update) + + dashboard = subparsers.add_parser("dashboard", help="Build the unified strategy health dashboard.") + dashboard.add_argument("--output-dir", default=None) + dashboard.add_argument("--format", dest="output_format", default="all") + dashboard.set_defaults(func=_run_dashboard) + + autopilot = subparsers.add_parser("autopilot", help="Run a full auto-pilot cycle.") + autopilot.add_argument("--domain", default="us_equity") + autopilot.add_argument("--dry-run", action="store_true") + autopilot.add_argument("--no-issues", action="store_true") + autopilot.set_defaults(func=_run_autopilot) + + lifecycle = subparsers.add_parser("lifecycle", help="Run the full lifecycle pipeline.") + lifecycle.add_argument("--domain", default="us_equity") + lifecycle.add_argument("--strategy", default=None) + lifecycle.add_argument("--method", default="grid_search") + lifecycle.add_argument("--format", dest="output_format", default="all") + lifecycle.add_argument("--skip-optimization", action="store_true") + lifecycle.add_argument("--no-alerts", action="store_true") + lifecycle.add_argument("--dry-run-alerts", action="store_true") + lifecycle.set_defaults(func=_run_lifecycle) + + return parser + + +def main(argv: Sequence[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + return int(args.func(args)) + except Exception as exc: # noqa: BLE001 + print(f"[{args.command}] Error: {exc}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_lifecycle_cli.py b/tests/test_lifecycle_cli.py new file mode 100644 index 0000000..e6a8f06 --- /dev/null +++ b/tests/test_lifecycle_cli.py @@ -0,0 +1,160 @@ +"""Tests for strategy_lifecycle.cli.""" + +from __future__ import annotations + +import unittest +from types import SimpleNamespace +from unittest.mock import patch + +from quant_platform_kit.strategy_lifecycle import cli + + +class LifecycleCliTests(unittest.TestCase): + + def test_autopilot_command_delegates_to_qpk_lifecycle(self) -> None: + calls = [] + + def fake_load_callable(module_name: str, function_name: str): + calls.append((module_name, function_name)) + + def fake_run_auto_pilot_cycle(domain: str, **kwargs): + return { + "domain": domain, + "snapshots_count": 1, + "drifts_checked": 2, + "drifts_alerting": 0, + "issues_created": 0, + "actions": [], + "kwargs": kwargs, + } + + return fake_run_auto_pilot_cycle + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main(["autopilot", "--domain", "us_equity", "--dry-run", "--no-issues"]) + + self.assertEqual(result, 0) + self.assertEqual( + calls, + [ + ( + "quant_platform_kit.strategy_lifecycle.codex_integration", + "run_auto_pilot_cycle", + ) + ], + ) + + def test_monitor_command_passes_optional_filters(self) -> None: + observed = {} + + def fake_load_callable(_module_name: str, _function_name: str): + def fake_run_monitor(**kwargs): + observed.update(kwargs) + return [object()] + + return fake_run_monitor + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main([ + "monitor", + "--domain", + "hk_equity", + "--strategy", + "hk_combo", + "--output-dir", + "out", + ]) + + self.assertEqual(result, 0) + self.assertEqual( + observed, + { + "domain": "hk_equity", + "strategy_profile": "hk_combo", + "output_dir": "out", + }, + ) + + def test_drift_command_counts_status_values(self) -> None: + statuses = [ + SimpleNamespace(status=SimpleNamespace(value="critical")), + SimpleNamespace(status=SimpleNamespace(value="review")), + SimpleNamespace(status=SimpleNamespace(value="healthy")), + ] + published = {} + + def fake_load_callable(_module_name: str, function_name: str): + if function_name == "run_drift_detection": + return lambda **_kwargs: statuses + if function_name == "build_drift_alert": + return lambda result: result if result.status.value != "healthy" else None + if function_name == "publish_drift_alerts": + def fake_publish(events, **kwargs): + published["events"] = list(events) + published["kwargs"] = kwargs + return {"telegram": len(published["events"])} + + return fake_publish + raise AssertionError(function_name) + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main(["drift", "--domain", "crypto"]) + + self.assertEqual(result, 0) + self.assertEqual(len(published["events"]), 2) + + def test_update_returns_non_zero_for_error_stage(self) -> None: + def fake_load_callable(_module_name: str, _function_name: str): + return lambda **_kwargs: {"stage": "error", "reason": "missing proposal"} + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main(["update", "--proposal", "gs://missing/proposal.json"]) + + self.assertEqual(result, 1) + + def test_lifecycle_command_runs_real_steps(self) -> None: + calls = [] + + def fake_load_callable(_module_name: str, function_name: str): + if function_name == "run_monitor": + def fake_monitor(**_kwargs): + calls.append("monitor") + return [] + + return fake_monitor + if function_name == "run_drift_detection": + def fake_drift(**_kwargs): + calls.append("drift") + return [] + + return fake_drift + if function_name == "publish_drift_alerts": + return lambda events, **_kwargs: {} + if function_name == "build_drift_alert": + return lambda _result: None + if function_name == "build_dashboard": + def fake_dashboard(**_kwargs): + calls.append("dashboard") + return {"strategy_count": 0} + + return fake_dashboard + raise AssertionError(function_name) + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main(["lifecycle", "--domain", "cn_equity", "--skip-optimization"]) + + self.assertEqual(result, 0) + self.assertEqual(calls, ["monitor", "drift", "dashboard"]) + + def test_error_returns_non_zero(self) -> None: + def fake_load_callable(_module_name: str, _function_name: str): + raise RuntimeError("boom") + + with patch.object(cli, "_load_callable", fake_load_callable): + result = cli.main(["dashboard"]) + + self.assertEqual(result, 1) + + +if __name__ == "__main__": + unittest.main()