Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions src/quant_strategy_plugins/ai_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,98 @@ def _complete_with_endpoint(
timeout_seconds: float,
) -> str:
endpoint = endpoint.normalized()

# Route through AiGateway when CODEX_AUDIT_SERVICE_URL is configured.
# API keys live on the VPS — no keys in plugin config needed.
gateway_url = os.environ.get("CODEX_AUDIT_SERVICE_URL", "").strip()
if gateway_url:
prompt = "\n\n".join(
f"{str(m.get('role') or 'user').upper()}:\n{str(m.get('content') or '').strip()}"
for m in messages if str(m.get("content") or "").strip()
)
if endpoint.provider == PROVIDER_CODEX:
return _codex_via_gateway(prompt, endpoint.model, timeout_seconds)
return _llm_via_gateway(prompt, endpoint.model, endpoint.provider, timeout_seconds)

# Fallback: direct API / subprocess calls
if endpoint.provider == PROVIDER_CODEX:
return _codex_exec_completion(endpoint, messages, timeout_seconds)
if endpoint.provider == PROVIDER_ANTHROPIC:
return _anthropic_messages_completion(endpoint, messages, timeout_seconds)
return _openai_compatible_chat_completion(endpoint, messages, timeout_seconds)


def _codex_via_gateway(prompt: str, model: str, timeout_seconds: float) -> str:
"""Execute via AiGateway service — delegates to CodexAdapter on VPS."""
try:
from ai_gateway_client import AiGatewayClient, GatewayConfig
config = GatewayConfig.from_env()
client = AiGatewayClient(config)
result = client.execute(prompt, mode="review_only", model=model, timeout=timeout_seconds)
if result.success:
return result.output
raise AiAuditError(result.error)
except ImportError:
return _codex_exec_direct(prompt, timeout_seconds)
except Exception as exc:
_logger.warning("ai_audit gateway codex call failed: %s; falling back to direct", exc)
return _codex_exec_direct(prompt, timeout_seconds)


def _llm_via_gateway(prompt: str, model: str, provider: str, timeout_seconds: float) -> str:
"""Analyze via AiGateway service — delegates to LlmAdapter on VPS."""
try:
from ai_gateway_client import AiGatewayClient, GatewayConfig
config = GatewayConfig.from_env()
client = AiGatewayClient(config)
result = client.analyze(prompt, model=model, timeout=timeout_seconds)
if result.success:
return result.output
raise AiAuditError(result.error)
except ImportError:
return _llm_direct(prompt, model, provider, timeout_seconds)
except Exception as exc:
_logger.warning("ai_audit gateway analyze call failed: %s; falling back to direct", exc)
return _llm_direct(prompt, model, provider, timeout_seconds)


def _llm_direct(prompt: str, model: str, provider: str, timeout_seconds: float) -> str:
"""Direct API call fallback when gateway is unavailable."""
endpoint = AiAuditEndpoint(
name="fallback", api_key="", provider=provider,
base_url="", model=model,
).normalized()
messages: tuple[Mapping[str, str], ...] = ({"role": "user", "content": prompt},)
if provider == PROVIDER_ANTHROPIC:
return _anthropic_messages_completion(endpoint, messages, timeout_seconds)
return _openai_compatible_chat_completion(endpoint, messages, timeout_seconds)


def _codex_exec_direct(prompt: str, timeout_seconds: float) -> str:
"""Direct codex exec fallback when gateway is unavailable."""
with tempfile.TemporaryDirectory(prefix="qsp-ai-audit-") as temp_dir:
output_path = Path(temp_dir) / "codex-final-message.md"
command = ["codex", "exec", "--cd", temp_dir, "--output-last-message", str(output_path), "-"]
try:
result = subprocess.run(
command, input=prompt, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
timeout=float(timeout_seconds), check=False, env=_scrubbed_codex_env(),
)
except FileNotFoundError as exc:
raise AiAuditError("codex command was not found") from exc
except subprocess.TimeoutExpired as exc:
raise AiAuditError(f"codex command timed out after {timeout_seconds:g}s") from exc
if result.returncode != 0:
detail = _bounded_text(result.stdout or "", limit=300)
raise AiAuditError(f"codex command failed with exit code {result.returncode}: {detail}")
text = output_path.read_text(encoding="utf-8").strip() if output_path.exists() else ""
if not text:
text = str(result.stdout or "").strip()
if not text:
raise AiAuditError("codex command returned empty output")
return text


def _scrubbed_codex_env() -> dict[str, str]:
secret_markers = ("TOKEN", "SECRET", "PASSWORD", "PRIVATE_KEY", "CREDENTIAL", "API_KEY")
return {
Expand Down Expand Up @@ -673,6 +758,62 @@ def _failure_text(exc: BaseException) -> str:
return _bounded_text(f"{type(exc).__name__}: {exc}", limit=300)


def _report_shadow_disagreement(
*,
audit_kind: str,
ai_verdict: str,
ai_confidence: float,
deterministic_route: str,
) -> None:
"""Fire-and-forget report of AI vs deterministic disagreement to AiGateway.

When AI shadow audit disagrees with the deterministic route, report it
so the gateway can track cumulative disagreements and auto-escalate.
Only sends if CODEX_AUDIT_SERVICE_URL is configured.
"""
import urllib.request as _ur
service_url = os.environ.get("CODEX_AUDIT_SERVICE_URL", "").strip()
if not service_url:
return
# Only report if AI disagrees (verdict is not "agree")
if ai_verdict == "agree":
return
try:
# Map audit kind to plugin name
plugin_map = {
"crisis_response_shadow": "crisis_response",
"taco_rebound_shadow": "taco_rebound",
}
plugin = plugin_map.get(audit_kind, audit_kind)
token = _env(
"ACTIONS_ID_TOKEN_REQUEST_TOKEN",
"CODEX_AUDIT_SERVICE_TOKEN",
) or ""
if not token:
token = os.environ.get("CODEX_AUDIT_SERVICE_TOKEN", "")
if not token:
return # No auth available, skip silently
payload = json.dumps({
"plugin": plugin,
"ai_verdict": ai_verdict,
"ai_confidence": ai_confidence,
"deterministic_route": deterministic_route,
"source_repository": os.environ.get("AI_GATEWAY_SOURCE_REPO", ""),
}).encode("utf-8")
req = _ur.Request(
f"{service_url.rstrip('/')}/v1/ai/feedback/shadow",
data=payload, method="POST",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "quant-strategy-plugins",
},
)
_ur.urlopen(req, timeout=5)
except Exception:
pass # Fire-and-forget — never block the main audit flow


def build_disabled_ai_audit(*, audit_kind: str = "strategy_plugin") -> dict[str, Any]:
return {
"schema_version": AI_AUDIT_SCHEMA_VERSION,
Expand Down Expand Up @@ -754,6 +895,16 @@ def _run_ai_audit(
raw_response = client(endpoint, messages, float(timeout_seconds))
audit_response = _normalize_ai_audit_response(_extract_json_object(raw_response))
attempts.append({**endpoint.report(), "status": "ok"})

# Phase 3: report AI vs deterministic disagreement to AiGateway
_report_shadow_disagreement(
audit_kind=audit_kind,
ai_verdict=audit_response.get("verdict", ""),
ai_confidence=audit_response.get("confidence") or 0.0,
deterministic_route=str(deterministic_payload.get("canonical_route") or
deterministic_payload.get("suggested_action") or ""),
)

return {
**base_payload,
"status": "ok",
Expand Down
Loading