Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0a28d7f
fix: harden streaming function against client disconnects
vrtornisiello May 14, 2026
2c79de2
test: add test cases for streaming function
vrtornisiello May 14, 2026
07fd377
feat: replace DISCONNECTED with MODEL_CALL_LIMIT status
vrtornisiello May 15, 2026
9d1bdff
refactor: move streaming helpers to agent_runner module
vrtornisiello May 15, 2026
932d0b5
feat: add run_agent producer that owns persistence
vrtornisiello May 15, 2026
edc22df
refactor: replace stream_response with sse_forwarder
vrtornisiello May 15, 2026
58caf8b
feat: spawn run_agent as background task, track on app.state
vrtornisiello May 15, 2026
dd059d2
feat: track and drain in-flight agent runs on shutdown
vrtornisiello May 15, 2026
5aa9f18
test: integration test for disconnect resilience
vrtornisiello May 15, 2026
dec50e0
fix: guarantee complete event when DB write fails, correct route resp…
vrtornisiello May 15, 2026
67c6611
fix: signal persistence failure via complete.error_details, not a fak…
vrtornisiello May 15, 2026
7a11b41
refactor: inject running_runs via dependency instead of Request
vrtornisiello May 15, 2026
ca728af
refactor: type run_id as uuid.UUID consistently
vrtornisiello May 15, 2026
721cb3a
refactor: type run_id as str consistently
vrtornisiello May 15, 2026
f772dde
refactor: decouple agent execution from event streaming
vrtornisiello May 15, 2026
2d28ce5
test: cover decoupled producer/consumer streaming
vrtornisiello May 15, 2026
a1a2489
test: simplify disconnect test to query messages via HTTP
vrtornisiello May 15, 2026
59e8ed9
chore: exclude _cleanup callback from coverage
vrtornisiello May 15, 2026
70505f4
feat: enable LOG_ENQUEUE by default for non-blocking logging
vrtornisiello May 15, 2026
8e8bffe
fix: stop leaking exception strings via error_details
vrtornisiello May 15, 2026
d8e258c
chore: add `.agents` folder to gitignore
vrtornisiello May 15, 2026
77fe6fe
feat: disable LOG_ENQUEUE by default
vrtornisiello May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Claude
# Agents
.agents/
.claude/

# Python-generated files
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Add MODEL_CALL_LIMIT message status.

Revision ID: 19e2c92563e2
Revises: 1c6556bb74f2
Create Date: 2026-05-14 16:02:45.217643
"""

from typing import Sequence, Union

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "19e2c92563e2"
down_revision: Union[str, Sequence[str], None] = "1c6556bb74f2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add MODEL_CALL_LIMIT to the messagestatus enum.

Note: `ALTER TYPE ... ADD VALUE` cannot run inside a transaction block
on PostgreSQL pre-v12, so we use an autocommit block.
"""
with op.get_context().autocommit_block():
op.execute(
"ALTER TYPE messagestatus ADD VALUE IF NOT EXISTS 'MODEL_CALL_LIMIT'"
)


def downgrade() -> None:
"""Downgrade is intentionally unsupported.

Postgres cannot drop enum values, and rebuilding the type would require
remapping existing MODEL_CALL_LIMIT rows to another status — losing the
diagnostic signal this migration was added to capture.
"""
raise NotImplementedError(
"Downgrade not supported: removing MODEL_CALL_LIMIT would silently rewrite rows."
)
4 changes: 3 additions & 1 deletion app/api/dependencies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .agent import Agent, get_agent
from .agent import Agent, RunningRuns, get_agent, get_running_runs
from .auth import UserID, get_user_id
from .db import AsyncDB, get_database, get_session
from .feedback import FeedbackSender, get_feedback_sender
Expand All @@ -7,10 +7,12 @@
"Agent",
"AsyncDB",
"FeedbackSender",
"RunningRuns",
"UserID",
"get_agent",
"get_database",
"get_feedback_sender",
"get_running_runs",
"get_session",
"get_user_id",
]
6 changes: 6 additions & 0 deletions app/api/dependencies/agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Annotated

from fastapi import Depends, Request
Expand All @@ -8,4 +9,9 @@ def get_agent(request: Request) -> CompiledStateGraph:
return request.app.state.agent


def get_running_runs(request: Request) -> dict[str, asyncio.Task]:
return request.app.state.running_runs


Agent = Annotated[CompiledStateGraph, Depends(get_agent)]
RunningRuns = Annotated[dict[str, asyncio.Task], Depends(get_running_runs)]
46 changes: 36 additions & 10 deletions app/api/routers/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from fastapi import APIRouter, BackgroundTasks, HTTPException, status
from fastapi.responses import StreamingResponse
from loguru import logger

from app.api.dependencies import Agent, AsyncDB, FeedbackSender, UserID
from app.api.dependencies import Agent, AsyncDB, FeedbackSender, RunningRuns, UserID
from app.api.schemas import ConfigDict, UserMessage
from app.api.streaming import stream_response
from app.api.streaming import run_agent, stream_events
from app.api.streaming.schemas import StreamEvent
from app.db.models import (
FeedbackCreate,
FeedbackPayload,
Expand Down Expand Up @@ -96,14 +98,19 @@ async def list_messages(
return await database.get_messages(thread.id, order_by)


@router.post("/threads/{thread_id}/messages")
@router.post(
"/threads/{thread_id}/messages",
response_class=StreamingResponse,
status_code=status.HTTP_201_CREATED,
)
async def send_message(
thread_id: str,
user_message: UserMessage,
agent: Agent,
database: AsyncDB,
agent: Agent,
running_runs: RunningRuns,
user_id: UserID,
) -> Message:
) -> StreamingResponse:
run_id = str(uuid.uuid4())

config = ConfigDict(
Expand All @@ -120,18 +127,37 @@ async def send_message(

message = await database.create_message(message_create)

return StreamingResponse(
stream_response(
database=database,
queue: asyncio.Queue[StreamEvent] = asyncio.Queue()

task = asyncio.create_task(
run_agent(
agent=agent,
user_message=message,
config=config,
thread_id=thread_id,
user_message=message,
model_uri=settings.MODEL_URI,
queue=queue,
),
status_code=status.HTTP_201_CREATED,
name=f"run_agent:{run_id}",
)

running_runs[run_id] = task

def _cleanup(task: asyncio.Task): # pragma: no cover
del running_runs[run_id]
if task.cancelled():
logger.warning(f"run_agent task {run_id} was cancelled before persisting")
return
e = task.exception()
if e is not None:
logger.opt(exception=e).error(
f"run_agent task {run_id} crashed without persisting"
)

task.add_done_callback(_cleanup)

return StreamingResponse(stream_events(queue), status_code=status.HTTP_201_CREATED)


@router.put("/messages/{message_id}/feedback", response_model=FeedbackPublic)
async def upsert_feedback(
Expand Down
5 changes: 3 additions & 2 deletions app/api/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from app.api.streaming.stream import stream_response
from app.api.streaming.agent_runner import run_agent
from app.api.streaming.stream import stream_events

__all__ = ["stream_response"]
__all__ = ["run_agent", "stream_events"]
Loading