Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions agentix/agents/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ async def run(agent_input: AgentInput) -> AgentOutput

from __future__ import annotations

from typing import Any, Callable, Coroutine, TypedDict
from collections.abc import Callable, Coroutine
from typing import Any, TypedDict

from agentix.trajectory import Trajectory


class Step(TypedDict, total=False):
Expand All @@ -29,7 +32,8 @@ class AgentOutput(TypedDict, total=False):
exit_code: int # required
stdout: str # raw output
stderr: str # raw errors
trajectory: list[Step] # structured steps (optional)
trajectory: list[Step] # structured steps (optional, lightweight)
atif_trajectory: Trajectory | None # full ATIF trajectory for training (optional)


RunFn = Callable[[AgentInput], Coroutine[Any, Any, AgentOutput]]
17 changes: 14 additions & 3 deletions agentix/deployment/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@ async def create(self, config: SandboxConfig) -> SandboxInfo:
"docker", "run", "-d",
"--name", sandbox_id,
"-v", "/nix/store:/nix/store:ro",
"-e", f"PATH={config.agent_closure}/bin:{config.runtime_closure}/bin:/usr/local/bin:/usr/bin:/bin",
"-e", (
f"PATH={config.agent_closure}/bin:"
f"{config.runtime_closure}/bin:"
"/usr/local/bin:/usr/bin:/bin"
),
]
if config.dataset_closure:
cmd.extend(["-e", f"PYTHONPATH={config.dataset_closure}/lib/python3.12/site-packages"])
cmd.extend([
"-e",
f"PYTHONPATH={config.dataset_closure}/lib/python3.12/site-packages",
])
cmd.extend([
"-p", f"{port}:8000",
config.task_image,
Expand Down Expand Up @@ -117,7 +124,11 @@ async def update(self, sandbox_id: str, config: SandboxConfig,
if agent_changed:
# In-place: update PATH to point to new agent closure, restart server
logger.info("In-place agent update for sandbox %s", sandbox_id)
new_path = f"{config.agent_closure}/bin:{config.runtime_closure}/bin:/usr/local/bin:/usr/bin:/bin"
new_path = (
f"{config.agent_closure}/bin:"
f"{config.runtime_closure}/bin:"
"/usr/local/bin:/usr/bin:/bin"
)
await self._exec_in_container(sandbox_id, f"export PATH={new_path}")
# Restart agentix-server to pick up new PATH
await self._exec_in_container(sandbox_id, "pkill -f agentix-server || true")
Expand Down
5 changes: 3 additions & 2 deletions agentix/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import BaseModel, Field


# ── Runtime server API ────────────────────────────────────────────


Expand Down Expand Up @@ -39,7 +38,9 @@ class SandboxConfig(BaseModel):
task_image: str = Field(description="Docker image for the task environment")
runtime_closure: str = Field(description="Nix store path for agentix runtime")
agent_closure: str = Field(description="Nix store path for agent binary")
dataset_closure: str | None = Field(default=None, description="Nix store path for dataset eval code")
dataset_closure: str | None = Field(
default=None, description="Nix store path for dataset eval code",
)


class SandboxInfo(BaseModel):
Expand Down
3 changes: 3 additions & 0 deletions agentix/orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from agentix.orchestrator.orchestrator import Orchestrator, RunConfig, RunRecord

__all__ = ["Orchestrator", "RunConfig", "RunRecord"]
108 changes: 108 additions & 0 deletions agentix/orchestrator/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""CLI entry point for agentix orchestrator.

Usage:
agentix-run --task-image IMG --runtime-closure PATH --agent-closure PATH \
--agent-input '{"instruction": "fix the bug"}' \
[--output-dir results] [--concurrency 4] [--timeout 600]

agentix-run --batch batch.jsonl \
[--output-dir results] [--concurrency 4]

agentix-run --summary --output-dir results
"""

from __future__ import annotations

import argparse
import asyncio
import json
import sys

from agentix.deployment.docker import DockerDeployment
from agentix.orchestrator.orchestrator import Orchestrator, RunConfig


def main() -> None:
parser = argparse.ArgumentParser(
prog="agentix-run",
description="Run agents in sandboxes and collect trajectories",
)
sub = parser.add_subparsers(dest="command", required=True)

# ── run: single or batch ──────────────────────────────────
run_p = sub.add_parser("run", help="Execute agent run(s)")
run_p.add_argument("--task-image", help="Docker image for task env")
run_p.add_argument("--runtime-closure", help="Nix store path for runtime")
run_p.add_argument("--agent-closure", help="Nix store path for agent")
run_p.add_argument("--dataset-closure", default=None, help="Nix store path for dataset")
run_p.add_argument("--agent-input", help="JSON string for agent_input")
run_p.add_argument("--batch", help="Path to JSONL file with RunConfig per line")
run_p.add_argument("--output-dir", default="results", help="Output directory")
run_p.add_argument("--concurrency", type=int, default=4)
run_p.add_argument("--timeout", type=float, default=600)

# ── summary ───────────────────────────────────────────────
sum_p = sub.add_parser("summary", help="Print run summary")
sum_p.add_argument("--output-dir", default="results")

args = parser.parse_args()

if args.command == "summary":
deployment = DockerDeployment()
orch = Orchestrator(deployment, output_dir=args.output_dir)
summary = orch.summary()
print(json.dumps(summary, indent=2))
return

# command == "run"
if args.batch:
configs = _load_batch(args.batch)
elif args.task_image and args.agent_input:
agent_input = json.loads(args.agent_input)
run_id = RunConfig.make_run_id(args.task_image, args.agent_closure, agent_input)
configs = [RunConfig(
run_id=run_id,
task_image=args.task_image,
runtime_closure=args.runtime_closure,
agent_closure=args.agent_closure,
dataset_closure=args.dataset_closure,
agent_input=agent_input,
timeout=args.timeout,
)]
else:
parser.error("Provide --batch or (--task-image + --agent-input)")
return

deployment = DockerDeployment()
orch = Orchestrator(
deployment,
output_dir=args.output_dir,
concurrency=args.concurrency,
)
records = asyncio.run(orch.run_batch(configs))

# Print summary
success = sum(1 for r in records if r.status == "success")
print(f"\nDone: {success}/{len(records)} succeeded")
for r in records:
if r.status != "success":
print(f" FAIL [{r.run_id}]: {r.error or r.stderr[:100]}")


def _load_batch(path: str) -> list[RunConfig]:
"""Load RunConfig list from a JSONL file."""
configs = []
with open(path) as f:
for i, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
configs.append(RunConfig.model_validate_json(line))
except Exception as e:
print(f"Warning: skipping line {i}: {e}", file=sys.stderr)
return configs


if __name__ == "__main__":
main()
Loading