Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions polymarket/high-throughput-paired-basis-maker/scripts/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@
"and liquidity can vanish. Backtests are hypothetical and do not guarantee future "
"performance. Use dry-run first and only trade with risk capital."
)
SEREN_POLYMARKET_PUBLISHER_PREFIX = "https://api.serendb.com/publishers/polymarket-data/"
SEREN_POLYMARKET_PUBLISHER_PREFIX = "https://api.serendb.com/publishers/"
SEREN_POLYMARKET_DATA_PUBLISHER_PREFIX = f"{SEREN_POLYMARKET_PUBLISHER_PREFIX}polymarket-data/"
SEREN_POLYMARKET_TRADING_PUBLISHER_PREFIX = f"{SEREN_POLYMARKET_PUBLISHER_PREFIX}polymarket-trading-serenai/"
SEREN_ALLOWED_POLYMARKET_PUBLISHER_PREFIXES = (
SEREN_POLYMARKET_DATA_PUBLISHER_PREFIX,
SEREN_POLYMARKET_TRADING_PUBLISHER_PREFIX,
)
MISSING_RUNTIME_AUTH_ERROR = (
"missing_runtime_auth: set API_KEY (Seren Desktop runtime) or SEREN_API_KEY; "
"missing_seren_api_key: set SEREN_API_KEY"
)


@dataclass(frozen=True)
Expand Down Expand Up @@ -348,14 +358,14 @@ def _parse_iso_ts(value: Any) -> int | None:


def _http_get_json(url: str, timeout: int = 30) -> dict[str, Any] | list[Any]:
if not url.startswith(SEREN_POLYMARKET_PUBLISHER_PREFIX):
if not any(url.startswith(prefix) for prefix in SEREN_ALLOWED_POLYMARKET_PUBLISHER_PREFIXES):
raise ValueError(
"policy_violation: backtest data source must use Seren Polymarket publisher "
f"({SEREN_POLYMARKET_PUBLISHER_PREFIX}); got {url}"
f"({', '.join(SEREN_ALLOWED_POLYMARKET_PUBLISHER_PREFIXES)}); got {url}"
)
api_key = os.getenv("SEREN_API_KEY", "").strip()
api_key = os.getenv("API_KEY", "").strip() or os.getenv("SEREN_API_KEY", "").strip()
if not api_key:
raise ValueError("missing_seren_api_key: set SEREN_API_KEY to query Seren publishers.")
raise ValueError(MISSING_RUNTIME_AUTH_ERROR)
req = Request(
url,
headers={
Expand Down
4 changes: 2 additions & 2 deletions polymarket/liquidity-paired-basis-maker/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Live execution requires both:

- `scripts/agent.py` - basis backtest + paired trade-intent runtime
- `config.example.json` - strategy parameters, live backtest defaults, and trade-mode sample markets
- `.env.example` - optional fallback auth/env template (`SEREN_API_KEY` only if MCP is unavailable)
- `.env.example` - optional auth/env template (`API_KEY` in Seren Desktop runtime or `SEREN_API_KEY` standalone)

## Quick Start

Expand All @@ -51,7 +51,7 @@ cp config.example.json config.json
python3 scripts/agent.py --config config.json
```

If you are logged into Seren Desktop, the runtime uses local `seren-mcp` auth automatically.
If you are logged into Seren Desktop, the runtime can use injected `API_KEY` auth automatically.

## Run Trade Mode (Backtest-First)

Expand Down
215 changes: 14 additions & 201 deletions polymarket/liquidity-paired-basis-maker/scripts/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import json
import math
import os
import select
import shlex
import subprocess
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -36,7 +33,10 @@
{SEREN_POLYMARKET_DATA_PUBLISHER, SEREN_POLYMARKET_TRADING_PUBLISHER}
)
POLICY_VIOLATION_BACKTEST_SOURCE = "policy_violation: backtest data source must use Seren Polymarket publisher"
MISSING_SEREN_API_KEY_ERROR = "missing_seren_api_key: set SEREN_API_KEY"
MISSING_RUNTIME_AUTH_ERROR = (
"missing_runtime_auth: set API_KEY (Seren Desktop runtime) or SEREN_API_KEY; "
"missing_seren_api_key: set SEREN_API_KEY"
)


DISCLAIMER = (
Expand Down Expand Up @@ -382,12 +382,6 @@ def _parse_iso_ts(value: Any) -> int | None:
return None


def _is_truthy(value: str | None) -> bool:
if value is None:
return False
return value.strip().lower() in {"1", "true", "yes", "y", "on"}


def _seren_publisher_target(url: str) -> tuple[str, str]:
parsed = urlparse(url)
if parsed.scheme != "https" or parsed.netloc != SEREN_POLYMARKET_PUBLISHER_HOST:
Expand Down Expand Up @@ -416,172 +410,12 @@ def _seren_publisher_target(url: str) -> tuple[str, str]:
return publisher_slug, publisher_path


def _read_mcp_exact(fd: int, size: int, timeout_seconds: float) -> bytes:
buf = bytearray()
while len(buf) < size:
ready, _, _ = select.select([fd], [], [], timeout_seconds)
if not ready:
raise TimeoutError("Timed out waiting for response from seren-mcp.")
chunk = os.read(fd, size - len(buf))
if not chunk:
raise RuntimeError("seren-mcp closed stdout before completing a response.")
buf.extend(chunk)
return bytes(buf)


def _read_mcp_message(proc: subprocess.Popen[bytes], timeout_seconds: float) -> dict[str, Any]:
if proc.stdout is None:
raise RuntimeError("seren-mcp stdout is not available.")
fd = proc.stdout.fileno()
header_buf = bytearray()
while b"\r\n\r\n" not in header_buf:
header_buf.extend(_read_mcp_exact(fd, 1, timeout_seconds))
if len(header_buf) > 16384:
raise RuntimeError("Invalid MCP header: too large.")
header_raw, _ = header_buf.split(b"\r\n\r\n", 1)
headers: dict[str, str] = {}
for line in header_raw.decode("ascii", errors="ignore").split("\r\n"):
if ":" not in line:
continue
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = _safe_int(headers.get("content-length"), -1)
if content_length < 0:
raise RuntimeError("Invalid MCP header: missing content-length.")
body = _read_mcp_exact(fd, content_length, timeout_seconds)
parsed = json.loads(body.decode("utf-8"))
if not isinstance(parsed, dict):
raise RuntimeError("Invalid MCP response payload.")
return parsed


def _write_mcp_message(proc: subprocess.Popen[bytes], payload: dict[str, Any]) -> None:
if proc.stdin is None:
raise RuntimeError("seren-mcp stdin is not available.")
body = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
proc.stdin.write(header)
proc.stdin.write(body)
proc.stdin.flush()


def _mcp_request(
proc: subprocess.Popen[bytes],
request_id: int,
method: str,
params: dict[str, Any] | None,
timeout_seconds: float,
) -> dict[str, Any]:
request: dict[str, Any] = {"jsonrpc": "2.0", "id": request_id, "method": method}
if params is not None:
request["params"] = params
_write_mcp_message(proc, request)
while True:
message = _read_mcp_message(proc, timeout_seconds)
if message.get("id") != request_id:
continue
error = message.get("error")
if isinstance(error, dict):
raise RuntimeError(_safe_str(error.get("message"), "MCP request failed."))
result = message.get("result")
if isinstance(result, dict):
return result
return {"value": result}


def _extract_call_publisher_body(result: dict[str, Any]) -> dict[str, Any] | list[Any]:
structured = result.get("structuredContent")
if isinstance(structured, dict):
body = structured.get("body")
if isinstance(body, dict | list):
return body
if isinstance(structured, dict | list):
return structured

content = result.get("content")
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if _safe_str(item.get("type"), "") != "text":
continue
text = _safe_str(item.get("text"), "")
if not text:
continue
try:
parsed = json.loads(text)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
body = parsed.get("body")
if isinstance(body, dict | list):
return body
return parsed
if isinstance(parsed, list):
return parsed
if isinstance(result.get("body"), dict | list):
return result["body"]
raise RuntimeError("Unable to parse call_publisher MCP response payload.")


def _http_get_json_via_mcp(url: str, timeout: int = 30) -> dict[str, Any] | list[Any]:
publisher_slug, publisher_path = _seren_publisher_target(url)
command_raw = _safe_str(os.getenv("SEREN_MCP_COMMAND"), "seren-mcp").strip() or "seren-mcp"
command = shlex.split(command_raw)
if not command:
raise RuntimeError("SEREN_MCP_COMMAND is empty.")

timeout_seconds = max(1.0, float(timeout))
proc = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
_mcp_request(
proc=proc,
request_id=1,
method="initialize",
params={
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "liquidity-paired-basis-maker", "version": "1.1"},
},
timeout_seconds=timeout_seconds,
)
_write_mcp_message(
proc,
{
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {},
},
)
result = _mcp_request(
proc=proc,
request_id=2,
method="tools/call",
params={
"name": "call_publisher",
"arguments": {
"publisher": publisher_slug,
"method": "GET",
"path": publisher_path,
"response_format": "json",
},
},
timeout_seconds=timeout_seconds,
)
return _extract_call_publisher_body(result)
finally:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=1)
def _runtime_api_key() -> str:
for env_name in ("API_KEY", "SEREN_API_KEY"):
token = _safe_str(os.getenv(env_name), "").strip()
if token:
return token
return ""


def _http_get_json_via_api_key(url: str, api_key: str, timeout: int = 30) -> dict[str, Any] | list[Any]:
Expand All @@ -600,31 +434,10 @@ def _http_get_json_via_api_key(url: str, api_key: str, timeout: int = 30) -> dic
def _http_get_json(url: str, timeout: int = 30) -> dict[str, Any] | list[Any]:
_seren_publisher_target(url)

api_key = _safe_str(os.getenv("SEREN_API_KEY"), "").strip()
prefer_mcp = _is_truthy(os.getenv("SEREN_USE_MCP")) or not api_key
mcp_error: Exception | None = None

if prefer_mcp:
try:
return _http_get_json_via_mcp(url, timeout=timeout)
except Exception as exc:
mcp_error = exc
if not api_key:
raise RuntimeError(
"Failed to fetch Polymarket data from Seren MCP. "
"Ensure Seren Desktop is logged in (or set SEREN_MCP_COMMAND), "
"or provide SEREN_API_KEY for direct gateway auth "
f"({MISSING_SEREN_API_KEY_ERROR})."
) from exc

try:
return _http_get_json_via_api_key(url, api_key=api_key, timeout=timeout)
except Exception as exc:
if mcp_error is not None:
raise RuntimeError(
f"Failed via MCP ({mcp_error}) and API key fallback ({exc})."
) from exc
raise
api_key = _runtime_api_key()
if not api_key:
raise RuntimeError(MISSING_RUNTIME_AUTH_ERROR)
return _http_get_json_via_api_key(url, api_key=api_key, timeout=timeout)


def _align_histories(primary: list[tuple[int, float]], secondary: list[tuple[int, float]]) -> tuple[list[tuple[int, float]], list[tuple[int, float]]]:
Expand Down
Loading